In [1]:
import sys
from pathlib import Path


root_dir = Path().absolute()
# Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
if root_dir.parts[-1:] == ('notebooks',):
    root_dir = Path(*root_dir.parts[:-1])
root_dir = str(root_dir) 
print("Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)
print(f"Added the following directory to the PYTHONPATH: {root_dir}")
    
# Set the environment variables from the file <root_dir>/.env
import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Added the following directory to the PYTHONPATH: /Users/emaminotti/ID2223-ScalableMLDL_Project
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Daily Feature Pipeline</span>

## üóíÔ∏è This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

### <span style='color:#ff5f27'> Imports

In [2]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
import util
import config
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [4]:
import hopsworks
import datetime

# Login e connessione ai servizi Hopsworks
project = hopsworks.login(engine="python")
fs = project.get_feature_store() 

# Configurazione per Stoccolma
# Inseriamo le coordinate usate per il backfill
city = "Stockholm"
country = "Sweden"
latitude = 59.3293
longitude = 18.0686

# Impostazione della data odierna
today = datetime.date.today()

print(f"‚úÖ Configurazione caricata per: {city} (Lat: {latitude}, Lon: {longitude})")
print(f"üìÖ Data odierna di esecuzione: {today}")

2025-12-15 16:46:04,548 INFO: Closing external client and cleaning up certificates.
2025-12-15 16:46:04,553 INFO: Connection closed.
2025-12-15 16:46:04,555 INFO: Initializing external client
2025-12-15 16:46:04,555 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-12-15 16:46:05,798 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1279152
‚úÖ Configurazione caricata per: Stockholm (Lat: 59.3293, Lon: 18.0686)
üìÖ Data odierna di esecuzione: 2025-12-15


### <span style="color:#ff5f27;"> Get references to the Feature Groups </span>

In [5]:
# Recuperiamo i riferimenti ai Feature Group esistenti
try:
    pollen_fg = fs.get_feature_group(name="pollen_measurements", version=1)
    weather_fg = fs.get_feature_group(name="weather_measurements", version=1)
    
    print("‚úÖ Feature Groups recuperati correttamente:")
    print(f"   - {pollen_fg.name} (v{pollen_fg.version})")
    print(f"   - {weather_fg.name} (v{weather_fg.version})")

except Exception as e:
    print(f"‚ùå Errore nel recupero dei Feature Group. Hai eseguito il backfill? \nErrore: {e}")

‚úÖ Feature Groups recuperati correttamente:
   - pollen_measurements (v1)
   - weather_measurements (v1)


## <span style='color:#ff5f27'> Retrieve Today's Pollen data</span>


In [6]:
import requests
import pandas as pd
from datetime import timedelta

# 1. Calcolo del range temporale
# Scarichiamo dal passato (per i lag) fino al futuro (forecast)
# 5 giorni indietro sono sufficienti per calcolare un lag di 3 giorni in sicurezza
start_date = (today - timedelta(days=5)).strftime("%Y-%m-%d")
end_date = (today + timedelta(days=3)).strftime("%Y-%m-%d")

print(f"üì° Scaricando dati polline per la finestra: {start_date} -> {end_date}")

# 2. Definizione variabili (Solo quelle utili per Stoccolma)
pollen_vars = [
    "alder_pollen", 
    "birch_pollen", 
    "grass_pollen", 
    "mugwort_pollen"
]
hourly_params = ",".join(pollen_vars)

# 3. Costruzione URL e Chiamata API
url = (
    f"https://air-quality-api.open-meteo.com/v1/air-quality?"
    f"latitude={latitude}&longitude={longitude}&"
    f"hourly={hourly_params}&"
    f"start_date={start_date}&end_date={end_date}&"
    f"timezone=Europe%2FBerlin"
)

response = requests.get(url)

if response.status_code == 200:
    data = response.json()
    hourly_data = data['hourly']
    
    # Creazione DataFrame
    df_pollen_new = pd.DataFrame(hourly_data)
    
    # Conversione datetime
    df_pollen_new['time'] = pd.to_datetime(df_pollen_new['time'])
    
    print(f"‚úÖ Download completato: {len(df_pollen_new)} righe scaricate.")
    print("Ultime righe (previsioni future):")
    display(df_pollen_new.tail(3))
    
else:
    print(f"‚ùå Errore API: {response.status_code}")
    print(response.text)

üì° Scaricando dati polline per la finestra: 2025-12-10 -> 2025-12-18
‚úÖ Download completato: 216 righe scaricate.
Ultime righe (previsioni future):


Unnamed: 0,time,alder_pollen,birch_pollen,grass_pollen,mugwort_pollen
213,2025-12-18 21:00:00,0.0,0.0,0.0,0.0
214,2025-12-18 22:00:00,0.0,0.0,0.0,0.0
215,2025-12-18 23:00:00,0.0,0.0,0.0,0.0


In [7]:
df_pollen_new.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 216 entries, 0 to 215
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   time            216 non-null    datetime64[ns]
 1   alder_pollen    216 non-null    float64       
 2   birch_pollen    216 non-null    float64       
 3   grass_pollen    216 non-null    float64       
 4   mugwort_pollen  216 non-null    float64       
dtypes: datetime64[ns](1), float64(4)
memory usage: 8.6 KB


### Cleaning + Feature engineering

In [8]:
# RIMOZIONE COLONNE IRRILEVANTI
# Rimuoviamo le stesse colonne escluse nel backfill
cols_to_drop = ['olive_pollen', 'ragweed_pollen']
df_pollen_new = df_pollen_new.drop(columns=[c for c in cols_to_drop if c in df_pollen_new.columns])

# Identifica i target
pollen_cols = [col for col in df_pollen_new.columns if '_pollen' in col]

# Gestione NaN e Valori Negativi
df_pollen_new[pollen_cols] = df_pollen_new[pollen_cols].interpolate(method='linear', limit_direction='both')
df_pollen_new[pollen_cols] = df_pollen_new[pollen_cols].fillna(0.0)
df_pollen_new[pollen_cols] = df_pollen_new[pollen_cols].clip(lower=0.0)

# Aggiunta Chiavi (Primary Keys & Partition Keys)
df_pollen_new['city'] = city      # Variabile definita all'inizio (es. Stockholm)
df_pollen_new['country'] = country
df_pollen_new['unix_time'] = df_pollen_new['time'].astype('int64') // 10**6

# --- Feature Engineering (Lagged Features) ---
# Fondamentale: ricalcoliamo i lag usando la finestra temporale scaricata (passato + futuro)

df_pollen_new = df_pollen_new.sort_values(['city', 'time'])
lags = [1, 2, 3]

for col in pollen_cols:
    for lag in lags:
        new_col_name = f"{col}_lag_{lag}"
        df_pollen_new[new_col_name] = df_pollen_new.groupby('city')[col].shift(lag)

# Rimozione righe iniziali incomplete
df_pollen_new = df_pollen_new.dropna()

# Casting finale (per matchare lo schema del Feature Group)
all_float_cols = pollen_cols + [c for c in df_pollen_new.columns if '_lag_' in c]
df_pollen_new[all_float_cols] = df_pollen_new[all_float_cols].astype('float32')

print(f"‚úÖ Dati processati pronti per l'inserimento. Dimensioni: {df_pollen_new.shape}")
display(df_pollen_new.head(3))

‚úÖ Dati processati pronti per l'inserimento. Dimensioni: (213, 20)


Unnamed: 0,time,alder_pollen,birch_pollen,grass_pollen,mugwort_pollen,city,country,unix_time,alder_pollen_lag_1,alder_pollen_lag_2,alder_pollen_lag_3,birch_pollen_lag_1,birch_pollen_lag_2,birch_pollen_lag_3,grass_pollen_lag_1,grass_pollen_lag_2,grass_pollen_lag_3,mugwort_pollen_lag_1,mugwort_pollen_lag_2,mugwort_pollen_lag_3
3,2025-12-10 03:00:00,0.0,0.0,0.0,0.0,Stockholm,Sweden,1765335600000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,2025-12-10 04:00:00,0.0,0.0,0.0,0.0,Stockholm,Sweden,1765339200000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
5,2025-12-10 05:00:00,0.0,0.0,0.0,0.0,Stockholm,Sweden,1765342800000,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


### Insert into Feature Group

In [9]:
# --- Scrittura su Hopsworks ---

print("üöÄ Avvio inserimento dati nel Feature Group...")

try:
    # L'inserimento attiver√† automaticamente la validazione (Expectation Suite) definita nel backfill
    pollen_fg.insert(df_pollen_new)
    print("‚úÖ Inserimento completato con successo!")
    
except Exception as e:
    print(f"‚ùå Errore durante l'inserimento: {e}")
    # Suggerimento di debug: controlla se ci sono colonne extra o tipi sbagliati
    print("Verifica che lo schema del DataFrame corrisponda a quello del Feature Group.")

üöÄ Avvio inserimento dati nel Feature Group...
2025-12-15 16:53:07,337 INFO: 	8 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279152/fs/1258616/fg/1867084


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 213/213 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: pollen_measurements_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279152/jobs/named/pollen_measurements_1_offline_fg_materialization/executions
‚úÖ Inserimento completato con successo!


## <span style='color:#ff5f27'> Get Weather Forecast data</span>

In [10]:
import requests
import pandas as pd
from datetime import date

# 1. Definizione di TUTTE le feature meteo presenti nel Feature Store
# Devono corrispondere esattamente a quelle usate nel backfill
weather_features = [
    "temperature_2m_max",
    "temperature_2m_min",
    "temperature_2m_mean",
    "precipitation_sum",
    "rain_sum",
    "showers_sum",
    "snowfall_sum",
    "precipitation_hours",
    "wind_speed_10m_max",
    "wind_gusts_10m_max",
    "wind_direction_10m_dominant",
    "shortwave_radiation_sum",
    "et0_fao_evapotranspiration"
]

# Uniamo i parametri per l'URL
daily_params = ",".join(weather_features)

print(f"üå¶Ô∏è Scaricando previsioni meteo (7 giorni) per {city}...")

# 2. Chiamata API (Open-Meteo Forecast)
# Usiamo l'endpoint 'forecast' per ottenere i dati attuali e dei prossimi giorni
url = "https://api.open-meteo.com/v1/forecast"
params = {
    "latitude": latitude,
    "longitude": longitude,
    "daily": daily_params,
    "timezone": "Europe/Berlin"
}

response = requests.get(url, params=params)

if response.status_code == 200:
    data = response.json()
    
    # 3. Creazione DataFrame
    # I dati 'daily' sono gi√† un dizionario {colonna: [valori]}, perfetto per il DataFrame
    df_weather_new = pd.DataFrame(data['daily'])
    
    # 4. Processing per allineare lo schema al Feature Group
    # Conversione data
    df_weather_new['time'] = pd.to_datetime(df_weather_new['time'])
    
    # Creazione chiavi Hopsworks
    df_weather_new['unix_time'] = df_weather_new['time'].astype('int64') // 10**6
    df_weather_new['city'] = city
    
    # Casting a float32 (come nel backfill)
    # Escludiamo le colonne non numeriche o intere
    numeric_cols = [c for c in df_weather_new.columns if c not in ['time', 'city', 'unix_time']]
    df_weather_new[numeric_cols] = df_weather_new[numeric_cols].astype('float32')
    
    # Riordiniamo le colonne per pulizia (opzionale ma utile)
    cols_order = ['city', 'time', 'unix_time'] + weather_features
    df_weather_new = df_weather_new[cols_order]

    print(f"‚úÖ Previsioni scaricate: {len(df_weather_new)} righe (giorni).")
    display(df_weather_new.head())

else:
    print(f"‚ùå Errore API Meteo: {response.status_code}")
    print(response.text)

üå¶Ô∏è Scaricando previsioni meteo (7 giorni) per Stockholm...
‚úÖ Previsioni scaricate: 7 righe (giorni).


Unnamed: 0,city,time,unix_time,temperature_2m_max,temperature_2m_min,temperature_2m_mean,precipitation_sum,rain_sum,showers_sum,snowfall_sum,precipitation_hours,wind_speed_10m_max,wind_gusts_10m_max,wind_direction_10m_dominant,shortwave_radiation_sum,et0_fao_evapotranspiration
0,Stockholm,2025-12-15,1765756800000,9.0,7.7,8.2,0.1,0.1,0.0,0.0,1.0,22.700001,41.799999,237.0,0.36,0.44
1,Stockholm,2025-12-16,1765843200000,7.8,5.7,6.6,0.1,0.1,0.0,0.0,1.0,17.6,33.099998,227.0,0.92,0.28
2,Stockholm,2025-12-17,1765929600000,7.2,6.0,6.6,3.9,3.9,0.0,0.0,5.0,16.9,32.0,234.0,0.19,0.13
3,Stockholm,2025-12-18,1766016000000,6.7,1.9,4.5,0.0,0.0,0.0,0.0,0.0,18.1,37.799999,213.0,0.44,0.21
4,Stockholm,2025-12-19,1766102400000,8.1,6.3,6.9,5.4,5.4,0.0,0.0,12.0,23.6,47.200001,213.0,0.26,0.31


In [11]:
df_weather_new.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7 entries, 0 to 6
Data columns (total 16 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   city                         7 non-null      object        
 1   time                         7 non-null      datetime64[ns]
 2   unix_time                    7 non-null      int64         
 3   temperature_2m_max           7 non-null      float32       
 4   temperature_2m_min           7 non-null      float32       
 5   temperature_2m_mean          7 non-null      float32       
 6   precipitation_sum            7 non-null      float32       
 7   rain_sum                     7 non-null      float32       
 8   showers_sum                  7 non-null      float32       
 9   snowfall_sum                 7 non-null      float32       
 10  precipitation_hours          7 non-null      float32       
 11  wind_speed_10m_max           7 non-null      floa

## <span style="color:#ff5f27;">Uploading new data to the Feature Store</span>

In [12]:
# --- Scrittura dei dati meteo su Hopsworks ---

print(f"üöÄ Avvio inserimento di {len(df_weather_new)} righe nel Feature Group Meteo...")

try:
    # L'inserimento gestisce automaticamente l'upsert (aggiorna se la chiave city+unix_time esiste gi√†)
    weather_fg.insert(df_weather_new)
    
    print("‚úÖ Inserimento completato con successo! I dati meteo sono stati validati e salvati.")
    
except Exception as e:
    print(f"‚ùå Errore durante l'inserimento: {e}")
    print("Controlla che lo schema del DataFrame corrisponda a quello del Feature Group.")

üöÄ Avvio inserimento di 7 righe nel Feature Group Meteo...
2025-12-15 16:56:25,418 INFO: 	11 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1279152/fs/1258616/fg/1866063


Uploading Dataframe: 100.00% |‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| Rows 7/7 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: weather_measurements_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1279152/jobs/named/weather_measurements_1_offline_fg_materialization/executions
‚úÖ Inserimento completato con successo! I dati meteo sono stati validati e salvati.
