In [1]:
import datetime
import hopsworks
import json
import warnings
from helpers import config
from helpers.utils import read_csv_folder
import xarray as xr
import numpy as np
import polars as pl
from pathlib import Path
warnings.filterwarnings("ignore")

In [2]:
settings = config.HopsworksSettings(_env_file="./.env")
project = hopsworks.login(engine="python")

HopsworksSettings initialized!
2025-12-20 13:56:00,670 INFO: Initializing external client
2025-12-20 13:56:00,671 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2025-12-20 13:56:02,189 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/3197


# Weather Data

In [3]:
GRIB_PATH = Path("./historical_data/weather/era5_sweden.grib")
ds = xr.open_dataset(
    GRIB_PATH,
    engine="cfgrib"
)

ds = ds.assign(
    temperature_c = ds["t2m"] - 273.15,
    wind_speed = np.sqrt(ds["u10"]**2 + ds["v10"]**2)
)

ds_sweden = ds[["temperature_c", "wind_speed"]].mean(
    dim=["latitude", "longitude"]
)

weather_features = pl.DataFrame(
    {
        "datetime": ds_sweden["time"].values,
        "temperature": ds_sweden["temperature_c"].values,
        "wind_speed": ds_sweden["wind_speed"].values,
    }
).sort("datetime")

# Carbon Intensity Data

In [4]:
ci_raw = read_csv_folder(
    Path("historical_data/carbon_intensity"),
    parse_datetime_cols={"Datetime (UTC)": "datetime"}
)

ci_features = (
    ci_raw
    .rename({
        "Carbon intensity gCO₂eq/kWh (direct)": "CI_direct",
        "Carbon intensity gCO₂eq/kWh (Life cycle)": "CI_lifecycle",
        "Datetime (UTC)": "datetime"
    })
    .select(["datetime", "CI_direct", "CI_lifecycle"])
    .sort("datetime")
    .with_columns([
        pl.col("CI_lifecycle").shift(1).alias("CI_t_minus_1"),
        pl.col("CI_lifecycle").shift(24).alias("CI_t_minus_24"),
    ])
)


# Electricity Load Data

In [5]:
load_raw = read_csv_folder(
    Path("historical_data/electricity_load"),
    delimiter="\t",
    parse_datetime_cols={"DateTime(UTC)": "datetime"}
)

load_features = (
    load_raw
    .rename({"TotalLoad[MW]": "load_mw"})
    .rename({"DateTime(UTC)": "datetime"})
    .rename({"AreaDisplayName": "area"})
    .filter(pl.col("area") == 'Sweden (SE)')
    .select(["datetime", "load_mw"])
    .sort("datetime")
    .with_columns(
        (pl.col("load_mw") - pl.col("load_mw").shift(1))
        .alias("load_delta_1h")
    )
)


# Electricity Generation Types Data

In [6]:
WIND_TYPES = {
    "Wind Offshore",
    "Wind Onshore",
}

HYDRO_TYPES = {
    "Hydro Run-of-river and poundage",
    "Hydro Water Reservoir",
    "Hydro Pumped Storage",
}

FOSSIL_TYPES = {
    "Fossil Gas",
    "Fossil Hard coal",
    "Fossil Brown coal/Lignite",
    "Fossil Oil",
    "Fossil Oil shale",
    "Fossil Peat",
    "Fossil Coal-derived gas",
}

generation_types_raw = read_csv_folder(
    Path("historical_data/generation_types"),
    delimiter="\t",
    parse_datetime_cols={"DateTime(UTC)": "datetime"}
)

gen_hourly = (
    generation_types_raw
    .rename({"DateTime(UTC)": "datetime"})
    .filter(pl.col("AreaMapCode") == "SE")
    .select([
        "datetime",
        "ProductionType",
        pl.col("ActualGenerationOutput[MW]").alias("mw")
    ])
    .group_by(["datetime", "ProductionType"])
    .agg(pl.sum("mw"))
)

gen_features = (
    gen_hourly
    .group_by("datetime")
    .agg([
        pl.col("mw").sum().alias("total_gen_mw"),

        pl.when(pl.col("ProductionType").is_in(WIND_TYPES))
          .then(pl.col("mw"))
          .otherwise(0)
          .sum()
          .alias("wind_mw"),

        pl.when(pl.col("ProductionType").is_in(HYDRO_TYPES))
          .then(pl.col("mw"))
          .otherwise(0)
          .sum()
          .alias("hydro_mw"),

        pl.when(pl.col("ProductionType").is_in(FOSSIL_TYPES))
          .then(pl.col("mw"))
          .otherwise(0)
          .sum()
          .alias("fossil_mw"),
    ])
    .with_columns([
        pl.when(pl.col("total_gen_mw") > 0)
        .then(pl.col("wind_mw") / pl.col("total_gen_mw"))
        .otherwise(None)
        .alias("wind_share"),

        pl.when(pl.col("total_gen_mw") > 0)
        .then(pl.col("hydro_mw") / pl.col("total_gen_mw"))
        .otherwise(None)
        .alias("hydro_share"),
        
        pl.when(pl.col("total_gen_mw") > 0)
        .then(pl.col("fossil_mw") / pl.col("total_gen_mw"))
        .otherwise(None)
        .alias("fossil_share"),
    ])
    .select([
        "datetime",
        "wind_share",
        "hydro_share",
        "fossil_share",
    ])
    .sort("datetime")
)

# Electricity flows Data

In [7]:
flows_raw = read_csv_folder(
    Path("historical_data/physical_flows"),
    delimiter="\t",
    parse_datetime_cols={"DateTime(UTC)": "datetime"}
)

flows = (
    flows_raw
    .rename({"DateTime(UTC)": "datetime"})
    .with_columns(
        pl.col("Flow[MW]").cast(pl.Float64).alias("flow_mw")
    )
)

net_imports = (
    flows
    .group_by("datetime")
    .agg([
        pl.when(pl.col("InAreaMapCode") == "SE")
          .then(pl.col("flow_mw"))
          .otherwise(0)
          .sum()
          .alias("imports_mw"),

        pl.when(pl.col("OutAreaMapCode") == "SE")
          .then(pl.col("flow_mw"))
          .otherwise(0)
          .sum()
          .alias("exports_mw"),
    ])
    .with_columns(
        (pl.col("imports_mw") - pl.col("exports_mw"))
        .alias("net_imports_mw")
    )
    .select(["datetime", "net_imports_mw"])
    .filter(pl.col("net_imports_mw") != 0)
)

import_dependency = (
    net_imports
    .join(load_features, on="datetime", how="left")
    .with_columns(
        pl.when(pl.col("net_imports_mw") > 0)
          .then(pl.col("net_imports_mw") / pl.col("load_mw"))
          .otherwise(0.0)
          .alias("import_dependency")
    )
    .select(["datetime", "import_dependency"])
)

flows_features = (
    net_imports
    .join(import_dependency, on="datetime", how="left")
    .sort("datetime")
)

# TODO: Data Validation

In [8]:
# import great_expectations as ge

# # Data validation rules for air quality data
# aq_expectation_suite = ge.core.ExpectationSuite(
#     expectation_suite_name="aq_expectation_suite"
# )

# aq_expectation_suite.add_expectation(
#     ge.core.ExpectationConfiguration(
#         expectation_type="expect_column_min_to_be_between",
#         kwargs={
#             "column":"pm25",
#             "min_value":-0.1,
#             "max_value":500.0,
#             "strict_min":True
#         }
#     )
# )

# # Data validation rules for weather data
# weather_expectation_suite = ge.core.ExpectationSuite(
#     expectation_suite_name="weather_expectation_suite"
# )

# def expect_greater_than_zero(col):
#     weather_expectation_suite.add_expectation(
#         ge.core.ExpectationConfiguration(
#             expectation_type="expect_column_min_to_be_between",
#             kwargs={
#                 "column":col,
#                 "min_value":-0.1,
#                 "max_value":1000.0,
#                 "strict_min":True
#             }
#         )
#     )
# expect_greater_than_zero("precipitation_sum")
# expect_greater_than_zero("wind_speed_10m_max")

In [9]:
fs = project.get_feature_store() 

weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each hour',
    version=1,
    primary_key=['datetime'],
    event_time="datetime"
) 

carbon_intensity_fg = fs.get_or_create_feature_group(
    name='carbon_intensity',
    description='Carbon Intensity characteristics of each hour',
    version=1,
    primary_key=['datetime'],
    event_time="datetime"
)

load_fg = fs.get_or_create_feature_group(
    name='electricity_load',
    description='Electricity Load characteristics of each hour',
    version=1,
    primary_key=['datetime'],
    event_time="datetime"
)

generation_types_fg = fs.get_or_create_feature_group(
    name='generation_types',
    description='Electricity Generation Types characteristics of each hour',
    version=1,
    primary_key=['datetime'],
    event_time="datetime"
)

flows_fg = fs.get_or_create_feature_group(
    name='electricity_flows',
    description='Electricity Flow characteristics of each hour',
    version=1,
    primary_key=['datetime'],
    event_time="datetime"
)


In [10]:
weather_fg.insert(weather_features)
carbon_intensity_fg.insert(ci_features, wait=True)
load_fg.insert(load_features, wait=True)
generation_types_fg.insert(gen_features, wait=True)
flows_fg.insert(flows_features, wait=True)

2025-12-20 13:58:24,828 INFO: Computing insert statistics
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3197/fs/3143/fg/3158
2025-12-20 13:58:37,971 INFO: Computing insert statistics
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3197/fs/3143/fg/2144
2025-12-20 13:58:51,288 INFO: Computing insert statistics
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3197/fs/3143/fg/3159
2025-12-20 13:59:04,794 INFO: Computing insert statistics
Feature Group created successfully, explore it at 
https://eu-west.cloud.hopsworks.ai:443/p/3197/fs/3143/fg/3160
2025-12-20 13:59:17,686 INFO: Computing insert statistics


(None, None)

# TODO: Feature Descriptions

In [11]:
# Add feature descriptions
# air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
# air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
# air_quality_fg.update_feature_description("city", "City where the air quality was measured")
# air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
# air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
# air_quality_fg.update_feature_description("url", "Url to sensor live data")
# air_quality_fg.update_feature_description("lagged_1", "Pm25 from the previous day")
# air_quality_fg.update_feature_description("lagged_2", "Pm25 from the second to last day")
# air_quality_fg.update_feature_description("lagged_3", "Pm25 from the third to last day")

# weather_fg.update_feature_description("date", "Date of measurement of weather")
# weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
# weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
# weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
# weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
# weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")