In [None]:
# import sys
# from pathlib import Path

# # Add the src directory to sys.path
# root_dir = Path().absolute().parent.parent
# # Add project root to the path of Python
# sys.path.append(str(root_dir))

# from src import config

# # Set the environment variables from the file <root_dir>/.env
# settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

ImportError: cannot import name 'config' from 'src' (/Users/simon/Documents/music-recommendation-system/src/__init__.py)

## Part 01: Feature Backfill for Algorhythm


### <span style='color:#ff5f27'> 📝 Imports


In [None]:
!uv add --group dev hopsworks
!uv add --group dev

In [None]:
import json
import sys

import hopsworks
import pandas as pd

from src.utils import util

# Define variables that are causing F821 errors
settings = None
today = None
csv_file = None

## Hopsworks API Key

You need to have registered an account on app.hopsworks.ai.

Save the HOPSWORKS_API_KEY to ~/.env file in the root directory of your project

- mv .env.example .env
- edit .env

In the .env file, update HOPSWORKS_API_KEY:

`HOPSWORKS_API_KEY="put API KEY value in this string"`


In [None]:
project = hopsworks.login()

2025-05-18 09:44:39,945 INFO: Initializing external client
2025-05-18 09:44:39,946 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-05-18 09:44:42,272 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1224891


In [None]:
# taken from ~/.env. You can also replace settings.AQICN_API_KEY with the api key value as a string "...."
if settings.AQICN_API_KEY is None:
    print("You need to set AQICN_API_KEY either in this cell or in ~/.env")
    sys.exit(1)

AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value()
aqicn_url = settings.AQICN_URL
country = settings.AQICN_COUNTRY
city = settings.AQICN_CITY
street = settings.AQICN_STREET
# If this API call fails (it fails in a github action), then set longitude and latitude explicitly - comment out next line
latitude, longitude = util.get_city_coordinates(city)
# Uncomment this if API call to get longitude and latitude
# latitude = "53.3498"
# longitude = "-6.2603"


print(f"Found AQICN_API_KEY: {AQICN_API_KEY}")

secrets = hopsworks.get_secrets_api()
try:
    # Replace any existing secret with the new value
    secret = secrets.get_secret("AQICN_API_KEY")
    secret.delete()
    print("Replacing existing AQICN_API_KEY")
except hopsworks.RestAPIError:
    print("No existing AQICN_API_KEY secret")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

Secrets can be explored at https://c.app.hopsworks.ai:443/account/secrets


### Validate that the AQICN_API_KEY you added earlier works

The cell below should print out something like:

![image.png](attachment:832cc3e9-876c-450f-99d3-cc97abb55b13.png)


In [None]:
try:
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
except hopsworks.RestAPIError:
    print(
        "It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?"
    )

aq_today_df.head()

Unnamed: 0,pm25,country,city,street,date,url
0,38.0,colombia,medellin,el-poblado,2025-05-18,https://api.waqi.info/feed/@12635


## <span style='color:#ff5f27'> 🌍 STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame


In [None]:
df = pd.read_csv(csv_file, parse_dates=["date"], skipinitialspace=True)
df.sort_values(by="date", ascending=False)

Unnamed: 0,date,pm25
16,2025-05-19,42
15,2025-05-18,52
14,2025-05-16,56
13,2025-05-15,65
12,2025-05-14,52
...,...,...
1924,2019-12-19,73
1923,2019-12-18,70
1922,2019-12-17,59
1921,2019-12-16,57


## <span style='color:#ff5f27'> 🌍 STEP 6: Data cleaning</span>

### Rename columns if needed and drop unneccessary columns

We want to have a DataFrame with 2 columns - `date` and `pm25` after this cell below:


## Check the data types for the columns in your DataFrame

- `date` should be of type datetime64[ns]
- `pm25` should be of type float64


In [None]:
# These commands will succeed if your CSV file didn't have a `median` or `timestamp` column
df = df.rename(columns={"median": "pm25"})
df = df.rename(columns={"timestamp": "date"})

df_aq = df[["date", "pm25"]]
df_aq["pm25"] = df_aq["pm25"].astype("float32")

df_aq

Unnamed: 0,date,pm25
0,2025-05-02,50.0
1,2025-05-03,56.0
2,2025-05-04,55.0
3,2025-05-05,55.0
4,2025-05-06,49.0
...,...,...
1931,2019-12-26,65.0
1932,2019-12-27,48.0
1933,2019-12-28,54.0
1934,2019-12-29,64.0


In [None]:
# Cast the pm25 column to be a float32 data type
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1936 entries, 0 to 1935
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    1936 non-null   datetime64[ns]
 1   pm25    1936 non-null   float32       
dtypes: datetime64[ns](1), float32(1)
memory usage: 22.8 KB


## <span style='color:#ff5f27'> 🌍 STEP 7: Drop any rows with missing data </span>

It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.


In [None]:
df_aq.dropna(inplace=True)
df_aq

Unnamed: 0,date,pm25
0,2025-05-02,50.0
1,2025-05-03,56.0
2,2025-05-04,55.0
3,2025-05-05,55.0
4,2025-05-06,49.0
...,...,...
1931,2019-12-26,65.0
1932,2019-12-27,48.0
1933,2019-12-28,54.0
1934,2019-12-29,64.0


## <span style='color:#ff5f27'> 🌍 STEP 8: Add country, city, street, url to the DataFrame </span>

Your CSV file may have many other air quality measurement columns. We will only work with the `pm25` column.

We add the columns for the country, city, and street names that you changed for your Air Quality sensor.

We also want to make sure the `pm25` column is a float32 data type.


In [None]:
# Your sensor may have columns we won't use, so only keep the date and pm25 columns
# If the column names in your DataFrame are different, rename your columns to `date` and `pm25`
df_aq["country"] = country
df_aq["city"] = city
df_aq["street"] = street
df_aq["url"] = aqicn_url
df_aq

Unnamed: 0,date,pm25,country,city,street,url
0,2025-05-02,50.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
1,2025-05-03,56.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
2,2025-05-04,55.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
3,2025-05-05,55.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
4,2025-05-06,49.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
...,...,...,...,...,...,...
1931,2019-12-26,65.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
1932,2019-12-27,48.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
1933,2019-12-28,54.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635
1934,2019-12-29,64.0,colombia,medellin,el-poblado,https://api.waqi.info/feed/@12635


---


## <span style='color:#ff5f27'> 🌦 Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)


## <span style='color:#ff5f27'> 🌍 STEP 9: Download the Historical Weather Data </span>

https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

We will download the historical weather data for your `city` by first extracting the earliest date from your DataFrame containing the historical air quality measurements.

We will download all daily historical weather data measurements for your `city` from the earliest date in your air quality measurement DataFrame. It doesn't matter if there are missing days of air quality measurements. We can store all of the daily weather measurements, and when we build our training dataset, we will join up the air quality measurements for a given day to its weather features for that day.

The weather features we will download are:

- `temperature (average over the day)`
- `precipitation (the total over the day)`
- `wind speed (average over the day)`
- `wind direction (the most dominant direction over the day)`


In [None]:
earliest_aq_date = pd.Series.min(df_aq["date"])
earliest_aq_date = earliest_aq_date.strftime("%Y-%m-%d")

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 6.2917399406433105°N -75.69036865234375°E
Elevation 1570.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [None]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1980 entries, 0 to 1979
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         1980 non-null   datetime64[ns]
 1   temperature_2m_mean          1980 non-null   float32       
 2   precipitation_sum            1980 non-null   float32       
 3   wind_speed_10m_max           1980 non-null   float32       
 4   wind_direction_10m_dominant  1980 non-null   float32       
 5   city                         1980 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 77.3+ KB


In [None]:
weather_df.head()

Unnamed: 0,date,temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant,city
0,2019-12-15,18.322668,8.9,6.618519,48.224552,medellin
1,2019-12-16,19.126833,6.3,9.36,76.25338,medellin
2,2019-12-17,18.935167,5.4,8.049845,71.833992,medellin
3,2019-12-18,19.2185,2.2,7.23591,352.234863,medellin
4,2019-12-19,19.410166,0.6,8.287822,64.303047,medellin


## <span style='color:#ff5f27'> 🌍 STEP 10: Define Data Validation Rules </span>

We will validate the air quality measurements (`pm25` values) before we write them to Hopsworks.

We define a data validation rule (an expectation in Great Expectations) that ensures that `pm25` values are not negative or above the max value available by the sensor.

We will attach this expectation to the air quality feature group, so that we validate the `pm25` data every time we write a DataFrame to the feature group. We want to prevent garbage-in, garbage-out.


In [None]:
import great_expectations as ge

aq_expectation_suite = ge.core.ExpectationSuite(expectation_suite_name="aq_expectation_suite")

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column": "pm25",
            "min_value": -0.1,
            "max_value": 500.0,
            "strict_min": True,
        },
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

## Expectations for Weather Data

Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.


In [None]:
import great_expectations as ge

weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)


def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column": col,
                "min_value": -0.1,
                "max_value": 1000.0,
                "strict_min": True,
            },
        )
    )


expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---


### <span style="color:#ff5f27;"> 🔮 STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>


In [None]:
fs = project.get_feature_store()

#### Save country, city, street names as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline


In [None]:
dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude,
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

try:
    # Replace any existing secret with the new value
    secret = secrets.get_secret("SENSOR_LOCATION_JSON")
    secret.delete()
    print("Replacing existing SENSOR_LOCATION_JSON")
except (hopsworks.RestAPIError, AttributeError):
    print("No existing SENSOR_LOCATION_JSON secret or error deleting it")

secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)

No existing SENSOR_LOCATION_JSON secret or error deleting it
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('SENSOR_LOCATION_JSON', 'PRIVATE')

### <span style="color:#ff5f27;"> 🔮 STEP 12: Create the Feature Groups and insert the DataFrames in them </span>


### <span style='color:#ff5f27'> 🌫 Air Quality Data

1.  Provide a name, description, and version for the feature group.
2.  Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and `date`.
3.  Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
4.  Attach any `expectation_suite` containing data validation rules


In [None]:
air_quality_fg = fs.get_or_create_feature_group(
    name="air_quality",
    description="Air Quality characteristics of each day",
    version=1,
    primary_key=["city", "street"],
    event_time="date",
    expectation_suite=aq_expectation_suite,
)

#### Insert the DataFrame into the Feature Group


In [None]:
air_quality_fg.insert(df_aq)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1224891/fs/1211503/fg/1458858
2025-05-18 10:14:27,114 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1224891/fs/1211503/fg/1458858


Uploading Dataframe: 100.00% |██████████| Rows 1936/1936 | Elapsed Time: 00:00 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1224891/jobs/named/air_quality_1_offline_fg_materialization/executions


(Job('air_quality_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 709663
         }
       },
       "result": {
         "observed_value": 18.999998092651367,
         "element_count": 1936,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-05-18T03:14:27.000114Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "su

#### Enter a description for each feature in the Feature Group


In [None]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description(
    "country",
    "Country where the air quality was measured (sometimes a city in acqcn.org)",
)
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description(
    "street", "Street in the city where the air quality was measured"
)
air_quality_fg.update_feature_description(
    "pm25",
    "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk",
)

<hsfs.feature_group.FeatureGroup at 0x741545502390>

### <span style='color:#ff5f27'> 🌦 Weather Data

1.  Provide a name, description, and version for the feature group.
2.  Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and `date`.
3.  Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
4.  Attach any `expectation_suite` containing data validation rules


In [None]:
# Get or create feature group
weather_fg = fs.get_or_create_feature_group(
    name="weather",
    description="Weather characteristics of each day",
    version=1,
    primary_key=["city"],
    event_time="date",
    expectation_suite=weather_expectation_suite,
)

#### Insert the DataFrame into the Feature Group


In [None]:
# Insert data
weather_fg.insert(weather_df, wait=True)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1224891/fs/1211503/fg/1458859
2025-05-18 10:15:22,473 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1224891/fs/1211503/fg/1458859


Uploading Dataframe: 100.00% |██████████| Rows 1980/1980 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1224891/jobs/named/weather_1_offline_fg_materialization/executions
2025-05-18 10:15:38,866 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-05-18 10:15:48,351 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-05-18 10:17:47,325 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-05-18 10:17:50,573 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-05-18 10:17:50,777 INFO: Waiting for log aggregation to finish.
2025-05-18 10:18:12,720 INFO: Execution finished successfully.


(Job('weather_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "precipitation_sum",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 709664
         }
       },
       "result": {
         "observed_value": 0.0,
         "element_count": 1980,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-05-18T03:15:22.000473Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_colu

#### Enter a description for each feature in the Feature Group


In [None]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description(
    "wind_direction_10m_dominant", "Dominant Wind direction over the dayd"
)

<hsfs.feature_group.FeatureGroup at 0x7415442c6b90>

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Daily Feature Pipeline

 </span>


## <span style="color:#ff5f27;">⏭️ **Exercises:**

 </span>

Extra Homework:

- Try adding a new feature based on a rolling window of 3 days for 'pm25'
  - This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
  - df.set_index("date").rolling(3).mean() is only the start....
- Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters.
  - Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
  - After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System

- As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---
