<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 02: Daily Feature Pipeline for Air Quality (aqicn.org) and weather (openmeteo)</span>

## 🗒️ This notebook is divided into the following sections:
1. Download and Parse Data
2. Feature Group Insertion


__This notebook should be scheduled to run daily__

In the book, we use a GitHub Action stored here:
[.github/workflows/air-quality-daily.yml](https://github.com/featurestorebook/mlfs-book/blob/main/.github/workflows/air-quality-daily.yml)

However, you are free to use any Python Orchestration tool to schedule this program to run daily.

### <span style='color:#ff5f27'> 📝 Imports

In [1]:
import datetime
import time
import requests
import pandas as pd
import hopsworks
from functions import util
from functions import fetch_data
import json
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> 🌍 Get the Sensor URL, Country, City, Street names from Hopsworks </span>

__Update the values in the cell below.__

__These should be the same values as in notebook 1 - the feature backfill notebook__


In [2]:
# If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
# os.environ["HOPSWORKS_API_KEY"] = ""
with open('../data/keys/hopsworks-api-key.txt', 'r') as file:
    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()

project = hopsworks.login(project="ML_Project_Electricity", api_key_value=os.environ["HOPSWORKS_API_KEY"])
fs = project.get_feature_store() 
# secrets = util.secrets_api(project.name)
print("Project name:", project.name)

# This line will fail if you have not registered the AQI_API_KEY as a secret in Hopsworks
# AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value
# location_str = secrets.get_secret("SENSOR_LOCATION_JSON").value
# location = json.loads(location_str)

# country=location['country']
# city=location['city']
# street=location['street']
# aqicn_url=location['aqicn_url']
# latitude=location['latitude']
# longitude=location['longitude']

# today = datetime.date.today()

# location_str

2025-01-08 11:21:04,255 INFO: Initializing external client
2025-01-08 11:21:04,255 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-08 11:21:04,255 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-08 11:21:05,436 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1207495
Project name: ML_Project_Electricity


### <span style="color:#ff5f27;"> 🔮 Get references to the Feature Groups </span>

In [3]:
# Retrieve feature groups
sthlm_weather_fg = fs.get_feature_group(
    name='stockholm_weather',
    version=1,
)
malmo_weather_fg = fs.get_feature_group(
    name='malmo_weather',
    version=1,
)

se3_fg = fs.get_feature_group(
    name='se3_electricity_prices',
    version=1,
)

se4_fg = fs.get_feature_group(
    name='se4_electricity_prices',
    version=1,
)

---

## Retrieve the most recent electricity price data (for tomorrow)

In [4]:
# Use the function get_tomorrows_electricity_prices 
# from the fetch_data module to get the electricity prices for tomorrow

se3_current_prices = fetch_data.get_tomorrows_electricity_prices('SE3')
se4_current_prices = fetch_data.get_tomorrows_electricity_prices('SE4')


se4_current_prices.head()


Unnamed: 0,time,pricearea,spotpriceeur
0,2025-01-07 23:00:00+00:00,SE4,12.43
1,2025-01-08 00:00:00+00:00,SE4,12.88
2,2025-01-08 01:00:00+00:00,SE4,17.61
3,2025-01-08 02:00:00+00:00,SE4,16.83
4,2025-01-08 03:00:00+00:00,SE4,20.25


In [5]:
# Read the feature groups into pandas dataframes
se3_df = se3_fg.read()
se4_df = se4_fg.read()

sthlm_weather_df = sthlm_weather_fg.read()
malmo_weather_df = malmo_weather_fg.read()

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.14s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.99s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.99s) 
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (0.69s) 


In [6]:
# Merge the historical electricity prices with the most recent prices
se3_df = pd.concat([se3_df, se3_current_prices], axis=0)
se4_df = pd.concat([se4_df, se4_current_prices], axis=0)

# sort the dataframes by time
se3_df = se3_df.sort_values('time')
se4_df = se4_df.sort_values('time')

# Calculate a rolling average of the electricity prices of the last 7 days
se3_df['spot_price_rolling'] = se3_df['spotpriceeur'].rolling(window=24*7).mean()
se4_df['spot_price_rolling'] = se4_df['spotpriceeur'].rolling(window=24*7).mean()

# convert to datetime
# se3_df['time'] = pd.to_datetime(se3_df['time'])
# se4_df['time'] = pd.to_datetime(se4_df['time'])


In [7]:
se3_df.tail()

Unnamed: 0,time,pricearea,spotpriceeur,spot_price_rolling
19,2025-01-08 18:00:00+00:00,SE3,139.72,19.396369
20,2025-01-08 19:00:00+00:00,SE3,106.31,20.030595
21,2025-01-08 20:00:00+00:00,SE3,93.57,20.583393
22,2025-01-08 21:00:00+00:00,SE3,75.78,21.023095
23,2025-01-08 22:00:00+00:00,SE3,49.53,21.304643


## Insert the newly retrieved values into the feature groups

In [8]:
# Insert the new electricity prices into the feature store
se3_fg.insert(se3_df.tail(24))
se4_fg.insert(se4_df.tail(24))

Uploading Dataframe: 100.00% |██████████| Rows 24/24 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: se3_electricity_prices_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207495/jobs/named/se3_electricity_prices_1_offline_fg_materialization/executions


Uploading Dataframe: 100.00% |██████████| Rows 24/24 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: se4_electricity_prices_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207495/jobs/named/se4_electricity_prices_1_offline_fg_materialization/executions


(Job('se4_electricity_prices_1_offline_fg_materialization', 'SPARK'), None)

## Retrieve fresh weather data

In [9]:
sthlm_forecast_df = fetch_data.get_hourly_weather_forecast(59.3294, 18.0687) #Stockholm
malmo_forecast_df = fetch_data.get_hourly_weather_forecast(55.6059, 13.0007) #Malmo

# Drop nan values
sthlm_forecast_df = sthlm_forecast_df.dropna()
malmo_forecast_df = malmo_forecast_df.dropna()

malmo_forecast_df

Coordinates 59.32889938354492°N 18.072357177734375°E
Elevation 24.0 m asl
Timezone b'Europe/Berlin' b'CET'
Timezone difference to GMT+0 3600 s
Coordinates 55.60652542114258°N 13.002044677734375°E
Elevation 12.0 m asl
Timezone b'Europe/Berlin' b'CET'
Timezone difference to GMT+0 3600 s


Unnamed: 0,time,temperature,precipitation,cloud_cover,wind_speed_10m,date,sunshine_duration,weekday,month,hour
0,2025-01-07 23:00:00+00:00,3.524,0.0,100,34.200001,2025-01-07,0.000000,1,1,23
1,2025-01-08 00:00:00+00:00,3.874,0.0,100,36.360001,2025-01-08,258.567657,2,1,0
2,2025-01-08 01:00:00+00:00,3.724,0.0,100,37.079998,2025-01-08,258.567657,2,1,1
3,2025-01-08 02:00:00+00:00,3.674,0.0,100,35.639999,2025-01-08,258.567657,2,1,2
4,2025-01-08 03:00:00+00:00,3.624,0.0,100,34.919998,2025-01-08,258.567657,2,1,3
...,...,...,...,...,...,...,...,...,...,...
140,2025-01-13 19:00:00+00:00,4.100,0.1,100,27.609911,2025-01-13,0.000000,0,1,19
141,2025-01-13 20:00:00+00:00,4.100,0.1,100,28.916763,2025-01-13,0.000000,0,1,20
142,2025-01-13 21:00:00+00:00,4.150,0.1,100,29.869154,2025-01-13,0.000000,0,1,21
143,2025-01-13 22:00:00+00:00,4.350,0.0,100,29.871325,2025-01-13,0.000000,0,1,22


## Insert weather forecast data into weather featuregroups

In [10]:
# Insert the new weather forecast into the feature store
sthlm_weather_fg.insert(sthlm_forecast_df)
malmo_weather_fg.insert(malmo_forecast_df)

Uploading Dataframe: 100.00% |██████████| Rows 145/145 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: stockholm_weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207495/jobs/named/stockholm_weather_1_offline_fg_materialization/executions


%3|1736331709.665|FAIL|rdkafka#consumer-8| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/bootstrap: SSL handshake failed: Disconnected: connecting to a PLAINTEXT broker listener? (after 115ms in state SSL_HANDSHAKE)
Uploading Dataframe: 100.00% |██████████| Rows 145/145 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: malmo_weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1207495/jobs/named/malmo_weather_1_offline_fg_materialization/executions


(Job('malmo_weather_1_offline_fg_materialization', 'SPARK'), None)

## END