In [28]:
import sys
from pathlib import Path
import os

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/fingrid from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('fingrid',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
from mlfs import config
if os.path.exists(f"{root_dir}/.env"):
    settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: c:\Users\patri\VScodeProjects\25-ID2223-mlfs-book
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Fingrid Energy Consumption and Weather Data</span>

## This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action to schedule daily runs.
However, you are free to use any Python orchestration tool to schedule this program to run daily.

In [29]:
import datetime
import time
import requests
import pandas as pd
import numpy as np
import hopsworks
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings("ignore")

### <span style='color:#ff5f27'> Imports

In [30]:
project = hopsworks.login()
fs = project.get_feature_store() 

# Configuration
FINGRID_API_KEY = settings.FINGRID_KEY
FINGRID_BASE_URL = "https://data.fingrid.fi/api/datasets"
DATASET_ID = "193"

COUNTRY = "Finland"
CITY = "Helsinki"
LATITUDE = 60.1699
LONGITUDE = 24.9384

today = datetime.today()

2026-01-01 14:14:00,241 INFO: Closing external client and cleaning up certificates.
Connection closed.
2026-01-01 14:14:00,249 INFO: Initializing external client
2026-01-01 14:14:00,250 INFO: Base URL: https://c.app.hopsworks.ai:443






2026-01-01 14:14:02,087 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1286359


### <span style="color:#ff5f27;"> Get references to the Feature Groups </span>

In [31]:
# Retrieve feature groups
energy_fg = fs.get_feature_group(
    name='energy_consumption',
    version=1,
)
weather_fg = fs.get_feature_group(
    name='weather_finland',
    version=1,
)

---

## <span style='color:#ff5f27'> Retrieve Today's Energy Consumption Data from Fingrid API</span>

In [32]:
def fetch_fingrid_data(dataset_id, start_date, end_date, api_key):
    """Fetch data from Fingrid API."""
    url = f"{FINGRID_BASE_URL}/{dataset_id}/data"
    
    params = {
        "startTime": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "endTime": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
        "format": "json",
        "pageSize": 20000
    }
    
    headers = {
        "x-api-key": api_key,
        "Accept": "application/json"
    }
    
    response = requests.get(url, headers=headers, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    
    if 'data' in data:
        return pd.DataFrame(data['data'])
    return pd.DataFrame()

# Fetch today's data (with small buffer for timezone handling)
end_time = today
start_time = today - timedelta(days=1)

df_raw = fetch_fingrid_data(
    dataset_id=DATASET_ID,
    start_date=start_time,
    end_date=end_time,
    api_key=FINGRID_API_KEY.get_secret_value()
)

# Process the data
timestamp_col = 'start_time' if 'start_time' in df_raw.columns else 'startTime'
df_raw['date'] = pd.to_datetime(df_raw[timestamp_col])
value_col = 'value' if 'value' in df_raw.columns else df_raw.columns[1]
df_raw['consumption_mw'] = pd.to_numeric(df_raw[value_col], errors='coerce')

# Filter for today only
df_raw = df_raw[df_raw['date'].dt.date == today.date()]
df_raw = df_raw.drop_duplicates(subset=['date']).sort_values('date')

# Extract temporal features
df_energy = df_raw[['date', 'consumption_mw']].copy()
df_energy['country'] = COUNTRY
df_energy['year'] = df_energy['date'].dt.year
df_energy['month'] = df_energy['date'].dt.month
df_energy['day'] = df_energy['date'].dt.day
df_energy['hour'] = df_energy['date'].dt.hour
df_energy['day_of_week'] = df_energy['date'].dt.dayofweek
df_energy['is_weekend'] = (df_energy['day_of_week'] >= 5).astype(int)
df_energy['week_of_year'] = df_energy['date'].dt.isocalendar().week

# Get historical data for lag features
historical = energy_fg.read()

# Ensure both dataframes have timezone-naive datetime
historical['date'] = pd.to_datetime(historical['date']).dt.tz_localize(None)
df_energy['date'] = pd.to_datetime(df_energy['date']).dt.tz_localize(None)

combined = pd.concat([historical, df_energy], ignore_index=True).sort_values('date')

# Create lag features
combined['consumption_lag_1'] = combined['consumption_mw'].shift(1)
combined['consumption_lag_2'] = combined['consumption_mw'].shift(2)
combined['consumption_lag_8'] = combined['consumption_mw'].shift(8)
combined['consumption_rolling_mean_24h'] = combined['consumption_mw'].rolling(window=8, min_periods=1).mean()
combined['consumption_rolling_std_24h'] = combined['consumption_mw'].rolling(window=8, min_periods=1).std()

# Get only today's rows
df_energy_final = combined[combined['date'].dt.date == today.date()].copy()

df_energy_final

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (2.16s) 


Unnamed: 0,date,consumption_mw,country,year,month,day,hour,day_of_week,is_weekend,week_of_year,consumption_lag_1,consumption_lag_2,consumption_lag_8,consumption_rolling_mean_24h,consumption_rolling_std_24h
86150,2026-01-01 00:00:00,12170.0,Finland,2026,1,1,0,3,0,1,12140.0,12199.0,12235.0,12207.500,36.067397
55016,2026-01-01 00:00:00,12170.0,Finland,2026,1,1,0,3,0,1,12170.0,12140.0,12233.0,12199.625,36.578438
37311,2026-01-01 00:03:00,12131.0,Finland,2026,1,1,0,3,0,1,12170.0,12170.0,12225.0,12187.875,41.964058
86151,2026-01-01 00:03:00,12131.0,Finland,2026,1,1,0,3,0,1,12131.0,12170.0,12236.0,12174.750,41.174715
86152,2026-01-01 00:06:00,12178.0,Finland,2026,1,1,0,3,0,1,12131.0,12131.0,12243.0,12166.625,30.918961
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
86409,2026-01-01 12:57:00,13445.0,Finland,2026,1,1,12,3,0,1,13384.0,13375.0,13237.0,13303.125,92.411251
86410,2026-01-01 13:00:00,13363.0,Finland,2026,1,1,13,3,0,1,13445.0,13384.0,13205.0,13322.875,85.033502
86411,2026-01-01 13:03:00,13380.0,Finland,2026,1,1,13,3,0,1,13363.0,13445.0,13207.0,13344.500,72.417441
86412,2026-01-01 13:06:00,13372.0,Finland,2026,1,1,13,3,0,1,13380.0,13363.0,13223.0,13363.125,53.357119


df_energy_final.info()

## <span style='color:#ff5f27'> Get Weather Forecast Data (7 days ahead)</span>

In [33]:
def fetch_weather_forecast(latitude, longitude, forecast_days=7):
    """Fetch weather FORECAST data from Open-Meteo API (future dates)."""
    url = "https://api.open-meteo.com/v1/forecast"
    
    params = {
        "latitude": latitude,
        "longitude": longitude,
        "forecast_days": forecast_days,
        "hourly": [
            "temperature_2m",
            "precipitation",
            "cloud_cover",
            "wind_speed_10m",
            "wind_speed_100m",
            "wind_direction_10m",
            "surface_pressure",
            "shortwave_radiation"
        ],
        "timezone": "Europe/Helsinki"
    }
    
    response = requests.get(url, params=params, timeout=30)
    response.raise_for_status()
    data = response.json()
    
    if 'hourly' in data:
        hourly = data['hourly']
        df = pd.DataFrame({
            'date': pd.to_datetime(hourly['time']),
            'temperature_2m': hourly['temperature_2m'],
            'precipitation': hourly['precipitation'],
            'cloud_cover': hourly['cloud_cover'],
            'wind_speed_10m': hourly['wind_speed_10m'],
            'wind_speed_100m': hourly['wind_speed_100m'],
            'wind_direction_10m': hourly['wind_direction_10m'],
            'surface_pressure': hourly['surface_pressure'],
            'shortwave_radiation': hourly['shortwave_radiation']
        })
        return df
    return pd.DataFrame()

# Fetch weather forecast for next 7 days
print(f"Fetching weather forecast for 7 days starting from {today.date()}")
df_weather_raw = fetch_weather_forecast(
    latitude=LATITUDE,
    longitude=LONGITUDE,
    forecast_days=7
)
print(f"Fetched {len(df_weather_raw)} hourly forecast rows")

# Resample to 3-hour intervals
df_weather_raw.set_index('date', inplace=True)
df_weather = df_weather_raw.resample('3H').agg({
    'temperature_2m': 'mean',
    'precipitation': 'sum',
    'cloud_cover': 'mean',
    'wind_speed_10m': 'mean',
    'wind_speed_100m': 'mean',
    'wind_direction_10m': 'mean',
    'surface_pressure': 'mean',
    'shortwave_radiation': 'mean'
})
df_weather.reset_index(inplace=True)
df_weather['country'] = COUNTRY
df_weather = df_weather.dropna()

df_weather

Fetching weather forecast for 7 days starting from 2026-01-01
Fetched 168 hourly forecast rows


Unnamed: 0,date,temperature_2m,precipitation,cloud_cover,wind_speed_10m,wind_speed_100m,wind_direction_10m,surface_pressure,shortwave_radiation,country
0,2026-01-01 00:00:00,-11.066667,0.0,70.0,12.966667,21.2,306.666667,1002.2,0.0,Finland
1,2026-01-01 03:00:00,-11.266667,0.0,44.666667,7.7,11.633333,318.666667,1000.333333,0.0,Finland
2,2026-01-01 06:00:00,-10.933333,0.0,65.666667,8.266667,7.533333,145.333333,998.733333,0.0,Finland
3,2026-01-01 09:00:00,-9.166667,0.0,98.666667,13.566667,13.466667,69.333333,996.933333,13.0,Finland
4,2026-01-01 12:00:00,-6.5,0.0,98.333333,23.633333,19.133333,137.0,994.233333,34.0,Finland
5,2026-01-01 15:00:00,-6.0,0.0,89.0,29.633333,30.966667,123.333333,991.4,9.0,Finland
6,2026-01-01 18:00:00,-5.133333,0.0,99.333333,36.366667,37.6,115.666667,988.7,0.0,Finland
7,2026-01-01 21:00:00,-4.266667,0.0,100.0,38.166667,42.233333,112.333333,986.5,0.0,Finland
8,2026-01-02 00:00:00,-4.4,0.0,100.0,38.4,42.433333,113.666667,984.9,0.0,Finland
9,2026-01-02 03:00:00,-4.366667,0.6,100.0,37.566667,44.766667,112.0,984.466667,0.0,Finland


In [34]:
df_weather.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 56 entries, 0 to 55
Data columns (total 10 columns):
 #   Column               Non-Null Count  Dtype         
---  ------               --------------  -----         
 0   date                 56 non-null     datetime64[ns]
 1   temperature_2m       56 non-null     float64       
 2   precipitation        56 non-null     float64       
 3   cloud_cover          56 non-null     float64       
 4   wind_speed_10m       56 non-null     float64       
 5   wind_speed_100m      56 non-null     float64       
 6   wind_direction_10m   56 non-null     float64       
 7   surface_pressure     56 non-null     float64       
 8   shortwave_radiation  56 non-null     float64       
 9   country              56 non-null     object        
dtypes: datetime64[ns](1), float64(8), object(1)
memory usage: 4.5+ KB


---

## <span style="color:#ff5f27;">Uploading new data to the Feature Store</span>

In [35]:
# Insert new energy data
energy_fg.insert(df_energy_final)

2026-01-01 14:14:08,567 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1286359/fs/1284182/fg/1876570


Uploading Dataframe: 100.00% |██████████| Rows 500/500 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: energy_consumption_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1286359/jobs/named/energy_consumption_1_offline_fg_materialization/executions


(Job('energy_consumption_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "consumption_mw",
           "min_value": 0.0,
           "max_value": 20000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 799815
         }
       },
       "result": {
         "observed_value": 12131.0,
         "element_count": 500,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2026-01-01T01:14:08.000567Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
  

# Insert new weather data
weather_fg.insert(df_weather, wait=True)