## üìÖ Notebook 2 ‚Äî Daily Feature Pipeline

**Goal**: fetch yesterday‚Äôs electricity prices and observed weather, derive features, and upsert into Hopsworks Feature Store.

### What this pipeline writes
- **`electricity_prices` (v2)**: hourly prices + calendar + lag features
- **`weather_hourly` (v2)**: hourly observed weather + calendar features

### Run order
1. **Setup & login**
2. **Load feature groups (v2)**
3. **Config** (price area, location, dates)
4. **‚ö° Prices**: fetch ‚Üí feature engineering ‚Üí insert
5. **üå¶ Weather**: fetch ‚Üí feature engineering ‚Üí insert

### Notes
- Primary key is **(`price_area`, `unix_time`)**, where `unix_time` is **epoch ms**.
- Inserts use `storage="both"` to keep **online + offline** stores in sync (serving + training).


In [1]:
# --- Imports ---
from pathlib import Path
import datetime
import sys
import warnings

import holidays
import pandas as pd

import hopsworks
from dotenv import load_dotenv

warnings.filterwarnings("ignore")


# --- Project path bootstrap ---
# Add repo root to sys.path.
root_dir = Path("..").resolve()
if str(root_dir) not in sys.path:
    sys.path.append(str(root_dir))

# Project imports (after sys.path update)
from src.config import ElectricitySettings
from src import util


# --- Hopsworks login ---
# Load local env vars (used locally; GitHub Actions uses secrets).
env_path = root_dir / ".env"
load_dotenv(env_path)

settings = ElectricitySettings()
project = hopsworks.login(engine="python")
fs = project.get_feature_store()

print("Successfully logged in to Hopsworks project:", settings.HOPSWORKS_PROJECT)


ElectricitySettings initialized
2025-12-27 18:06:42,238 INFO: Initializing external client
2025-12-27 18:06:42,238 INFO: Base URL: https://eu-west.cloud.hopsworks.ai:443
2025-12-27 18:06:43,932 INFO: Python Engine initialized.

Logged in to project, explore it here https://eu-west.cloud.hopsworks.ai:443/p/127
Successfully logged in to Hopsworks project: ScalableProject


In [2]:
# Get the feature groups (v2)
electricity_prices_fg = fs.get_feature_group('electricity_prices', version=2)
weather_hourly_fg = fs.get_feature_group('weather_hourly', version=2)


## ‚öôÔ∏è Configuration

Define the price area and date range for historical data collection.

In [3]:
# --- Configuration ---
# Location config is duplicated here on purpose: daily pipelines should be explicit.
PRICE_AREA = "SE3"
CITY = "Stockholm"
LATITUDE = 59.3251
LONGITUDE = 18.0711

# Calendar helpers
SEASON_MAP = {12: 0, 1: 0, 2: 0, 3: 1, 4: 1, 5: 1, 6: 2, 7: 2, 8: 2, 9: 3, 10: 3, 11: 3}

today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)


## ‚ö° Step 1 ‚Äî Prices (yesterday)
Fetch electricity prices, compute time/holiday + lag features, and upsert to `electricity_prices` (v2).


In [None]:
# --- Prices ---
window_start = yesterday - datetime.timedelta(days=3)

raw_prices = util.fetch_electricity_prices(
    start_date=window_start,
    end_date=yesterday,
    price_area=PRICE_AREA,
    show_progress=False,
    request_pause=0,
)
raw_prices = util.align_electricity_price_schema(raw_prices)

# Keys
_df = raw_prices.copy()
_df["date"] = pd.to_datetime(_df["timestamp"], utc=True)
_df["unix_time"] = _df["date"].astype("int64") // 10**6
_df = _df.drop(columns=["timestamp"], errors="ignore")
_df = _df.drop(columns=["price_eur", "exchange_rate"], errors="ignore")
_df["price_area"] = PRICE_AREA.lower()
_df["price_area"] = _df["price_area"].astype("string")

_df = _df.sort_values(["price_area", "unix_time"])

# Calendar
_df["weekday"] = _df["date"].dt.weekday.astype("int8")
_df["is_weekend"] = _df["weekday"].isin([5, 6]).astype("int8")
_df["month"] = _df["date"].dt.month.astype("int8")
_df["season"] = _df["month"].map(SEASON_MAP).astype("int8")

# Holidays
try:
    years = range(_df["date"].dt.year.min(), _df["date"].dt.year.max() + 1)
    se_holidays = holidays.Sweden(years=years)
    _df["is_holiday"] = _df["date"].dt.date.isin(se_holidays).astype("int8")
except Exception:
    _df["is_holiday"] = 0

# Lags + rolling
for lag in (24, 48, 72):
    _df[f"price_lag_{lag}"] = _df.groupby("price_area")["price_sek"].shift(lag).astype("float32")

roll = (
    _df.groupby("price_area")["price_sek"]
    .rolling(72, min_periods=1)
    .mean()
    .reset_index(level=0, drop=True)
)
_df["price_roll3d"] = roll.astype("float32")

mask_yesterday = _df["date"].dt.date == yesterday
df_prices = _df.loc[mask_yesterday].dropna().reset_index(drop=True)

cols_out = [
    "unix_time",
    "date",
    "hour",
    "price_area",
    "price_sek",
    "weekday",
    "is_weekend",
    "month",
    "season",
    "is_holiday",
    "price_lag_24",
    "price_lag_48",
    "price_lag_72",
    "price_roll3d",
]
df_prices = df_prices[cols_out]

print(f"Prices: inserting {len(df_prices)} row(s) for {yesterday}")
electricity_prices_fg.insert(df_prices, storage="both", wait=True)


Fetching electricity prices from 2025-12-23 to 2025-12-26 for SE3...
Coerced electricity prices to hourly resolution (aggregated sub-hour data).
Fetched 96 hourly price records across 4 day(s)
Prices: inserting 23 row(s) for 2025-12-26
2025-12-27 18:06:47,815 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://eu-west.cloud.hopsworks.ai:443/p/127/fs/74/fg/2198


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 23/23 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: electricity_prices_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://eu-west.cloud.hopsworks.ai:443/p/127/jobs/named/electricity_prices_2_offline_fg_materialization/executions
2025-12-27 18:07:01,103 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-27 18:08:56,057 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED


## üå¶ Step 2 ‚Äî Observed weather (yesterday)
Fetch observed hourly weather, compute time/holiday features, and upsert to `weather_hourly` (v2).


In [None]:
# --- Weather: fetch + feature engineering ---
weather_df = util.get_yesterday_hourly_weather(
    latitude=LATITUDE,
    longitude=LONGITUDE,
    city=CITY,
)

# Normalize keys (PK: price_area + unix_time)
weather_df["date"] = pd.to_datetime(weather_df["timestamp"], utc=True)
weather_df["unix_time"] = weather_df["date"].astype("int64") // 10**6
weather_df["price_area"] = PRICE_AREA.lower()
weather_df["price_area"] = weather_df["price_area"].astype("string")
weather_df = weather_df.drop(columns=["timestamp", "city"], errors="ignore")

# Calendar features
weather_df["weekday"] = weather_df["date"].dt.weekday.astype("int8")
weather_df["is_weekend"] = weather_df["weekday"].isin([5, 6]).astype("int8")
weather_df["month"] = weather_df["date"].dt.month.astype("int8")
weather_df["season"] = weather_df["month"].map(SEASON_MAP).astype("int8")

# Swedish public holidays
try:
    years = range(weather_df["date"].dt.year.min(), weather_df["date"].dt.year.max() + 1)
    se_holidays = holidays.Sweden(years=years)
    weather_df["is_holiday"] = weather_df["date"].dt.date.isin(se_holidays).astype("int8")
except Exception:
    weather_df["is_holiday"] = 0

cols_out = [
    "unix_time",
    "date",
    "hour",
    "price_area",
    "temperature_2m",
    "apparent_temperature",
    "precipitation",
    "rain",
    "snowfall",
    "cloud_cover",
    "wind_speed_10m",
    "wind_speed_100m",
    "wind_direction_10m",
    "wind_direction_100m",
    "wind_gusts_10m",
    "surface_pressure",
    "weekday",
    "is_weekend",
    "month",
    "season",
    "is_holiday",
]
weather_df = weather_df[cols_out].dropna().reset_index(drop=True)

if len(weather_df):
    print(f"Weather: inserting {len(weather_df)} row(s) for {yesterday}")
    weather_hourly_fg.insert(weather_df, storage="both", wait=True)
else:
    print("Weather: no rows fetched.")
