In [88]:
import sys
print(sys.executable)
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

/Users/isabell/Documents/KTH/KTH femman/Scalable Machine learning/id2223-lab1/.venv/bin/python
Local environment
Root dir: /Users/isabell/Documents/KTH/KTH femman/Scalable Machine learning/id2223-lab1
HopsworksSettings initialized!


<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill for Air Quality Data</span>


## 🗒️ You have the following tasks
1. Choose an Air Quality Sensor
2. Update the country, city, and street information to point to YOUR chosen Air Quality Sensor
3. Download historical measures for your Air Quality Sensor as a CSV file
4. Update the path of the CSV file in this notebook to point to the one that you downloaded
5. Create an account on www.hopsworks.ai and get your HOPSWORKS_API_KEY
6. Run this notebook



### <span style='color:#ff5f27'> 📝 Imports

In [89]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

## <span style='color:#ff5f27'> 🌍 STEP 1: Pick your Air Quality Sensor</span>

![image.png](attachment:b40d25b6-4994-4674-970b-a1eb4a14b9ad.png)

  * Find your favorite sensor on https://waqi.info/ 
  * The sensor should have a URL in one of the two following forms: 

  `https://aqicn.org/station/<CITY OR COUNTRY NAME>/<STREET>`
  or

  `https://aqicn.org/station/@36655//`
  or 
  
  `https://aqicn.org/city/<CITY OR COUNTRY NAME>/<STREET>`

With your URL, we will need to do two things:

 * download the historical air quality readings as a CSV file
 * note down the URL for the real-time API (you will need to create an API key for accessing this).

If your sensor's URL has one of the first two formats (the first URL path component is `station`), you will find the links to both historical CSV data and the real-time API on the same web page.

However, if your sensor's URL has the last format above (the first URL path component is `city` instead of `station`), then you will need to use 2 different URLs - one to download the historical CSV data and one for the real-time air quality measurements. You will find both of those links in the "Air quality historical data" section. Click on the "Historical air quality data" when you need to download the CSV file, and when you need the real-time API, click on the "Real-time air quality data".

Some examples of URLs for stations:

 * https://aqicn.org/station/sweden/stockholm-hornsgatan-108-gata/ - in Stockholm, Sweden
 * https://aqicn.org/station/@36655// - in Hell's Kitchen, Manhatten, New York, USA
 * https://aqicn.org/station/nigeria-benin-city-edo-state-secretariat-palm-house/ - in Benin City, Nigeria
 * https://aqicn.org/station/india/mumbai/sion/ - Sion in Mumbai, India

Here is what the webpage at URL for the Stockholm sensor looks like:

![station.png](attachment:1922302e-13b9-469d-ba75-63dabf7b4475.png)

__When you pick a sensor for your project, there are 2 things the sensor MUST have__:
  1. __PM 2.5__ measurements
  2. __Good Historical Values__ for download as a CSV file

__Write down the country, city, and the street for your sensor.__

We will use the city name to download weather data for your sensor, and we will store the country and street values in the sensor's feature group.

## What makes a good quality Air Quality Sensor?

In the image below, we can see below that our sensor in Stockholm fulfills our 2 requirements. It has:
  1. __PM 2.5__ measurements (see upper red-ringed value in image below)
  2. __Good Historical Measurements__ with few missing values (see lower red-ringed values in image below) 

![sensor.png](attachment:46ebde65-85ff-4b12-a560-65230881db0b.png)

---

## <span style='color:#ff5f27'> 🌍 STEP 2: Download the Historical Air Quality </span>

You can download a CSV file containing the historical air quality data from your your sensor's URL.
Scroll down to the section "Air Quality Historical Data". Click on the PM2.5 option and save the file to the `data` directory in your forked version of this Github repository. Note the name of your CSV file, you will need 


![download-csv.png](attachment:ab17240a-17ad-47de-97af-2a3a1d32f247.png)

## <span style='color:#ff5f27'> 🌍 STEP 3: Get an AQICN API Token and Store it in .env file</span>

You have to first get your AQI API key [from here](https://aqicn.org/data-platform/token/):

![image.png](attachment:9336a7d3-f7dc-4aec-a854-78e787e4493e.png)


Save the API KEY to ~/.env file in the root directory of your project

 * mv .env.example .env
 * edit .env

In the .env file, update AQICN_API_KEY:

`AQICN_API_KEY="put API KEY value in this string"`


## Hopsworks API Key
You need to have registered an account on app.hopsworks.ai.

Save the HOPSWORKS_API_KEY  to ~/.env file in the root directory of your project

 * mv .env.example .env
 * edit .env

In the .env file, update HOPSWORKS_API_KEY:

`HOPSWORKS_API_KEY="put API KEY value in this string"`


In [90]:
project = hopsworks.login(engine="python")

2025-11-13 14:10:15,830 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-13 14:10:15,848 INFO: Initializing external client
2025-11-13 14:10:15,848 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-11-13 14:10:17,285 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1272015


## <span style='color:#ff5f27'> 🌍 STEP 4: Get the AQICN_URL and API key. Enter country, city, street names for your Sensor.</span>

You can find your __AQICN_URL__ if you scroll down the webpage for your sensor - it is the URL inside the redbox here.
You shouldn't include the last part of the url - "/?token=\__YOUR_TOKEN\__". 
It is bad practice to save TOKENs (aka API KEYs) in your source code - you might make it public if you check that code into Github!
We will fill in the token later by saving the AQICN_API_KEY as a secret in Hopsworks.

![stockholm-rt-api.png](attachment:70fea299-d303-49f8-99ba-43e981d7c3aa.png)


In [None]:
today = datetime.date.today()

csv_file=f"{root_dir}/data/air-quality-data.csv"
util.check_file_path(csv_file)
# taken from ~/.env. You can also replace settings.AQICN_API_KEY with the api key value as a string "...."
if settings.AQICN_API_KEY is None:
    print("You need to set AQICN_API_KEY either in this cell or in ~/.env")
    sys.exit(1)

#Minut 21 i videon pratar han om att inte göra detta om man ska ha många sensorer
AQICN_API_KEY = settings.AQICN_API_KEY.get_secret_value() 
aqicn_url = settings.AQICN_URL
country = settings.AQICN_COUNTRY
city = settings.AQICN_CITY
street = settings.AQICN_STREET
# If this API call fails (it fails in a github action), then set longitude and latitude explicitly - comment out next line
#latitude, longitude = util.get_city_coordinates(city)
# Uncomment this if API call to get longitude and latitude
latitude = "59.36004"
longitude = "18.00086"


print(f"Found AQICN_API_KEY: {AQICN_API_KEY}")

secrets = hopsworks.get_secrets_api()
# Replace any existing secret with the new value
secret = secrets.get_secret("AQICN_API_KEY")
if secret is not None:
    secret.delete()
    print("Replacing existing AQICN_API_KEY")

secrets.create_secret("AQICN_API_KEY", AQICN_API_KEY)

File successfully found at the path: /Users/isabell/Documents/KTH/KTH femman/Scalable Machine learning/id2223-lab1/data/air-quality-data.csv
Found AQICN_API_KEY: 457af4ea34575c00a7d57e82143c0860cd1c78c4
Replacing existing AQICN_API_KEY
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('AQICN_API_KEY', 'PRIVATE')

### Validate that the AQICN_API_KEY you added earlier works

The cell below should print out something like:

![image.png](attachment:832cc3e9-876c-450f-99d3-cc97abb55b13.png)

In [92]:
try:
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQICN_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQICN_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_df.head()

Unnamed: 0,pm25,country,city,street,date,url
0,3.0,sweden,solna,anders-lundstroms-gata,2025-11-13,https://api.waqi.info/feed/A61420


## <span style='color:#ff5f27'> 🌍 STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame

In [93]:
df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True)
df

Unnamed: 0,date,min,max,pm25,q1,q3,stdev,count
0,2019-12-09,0.20,2.12,1.14,0.55,1.48,0.496,308
1,2019-12-10,0.43,3.44,1.20,0.92,1.76,0.629,411
2,2019-12-11,0.44,4.38,2.00,1.57,2.83,0.896,426
3,2019-12-12,0.66,28.92,18.39,2.50,21.43,9.478,492
4,2019-12-13,3.38,59.44,10.95,7.91,15.36,6.874,569
...,...,...,...,...,...,...,...,...
2137,2025-11-03,0.30,6.13,2.15,1.00,3.45,1.484,559
2138,2025-11-04,0.10,5.35,1.15,0.50,2.45,1.403,555
2139,2025-11-05,0.30,4.38,2.17,1.42,2.90,1.087,561
2140,2025-11-06,3.03,13.35,8.64,6.85,10.30,2.539,462


## <span style='color:#ff5f27'> 🌍 STEP 6: Data cleaning</span>


### Rename columns if needed and drop unneccessary columns

We want to have a DataFrame with 2 columns - `date` and `pm25` after this cell below:

## Check the data types for the columns in your DataFrame

 * `date` should be of type   datetime64[ns] 
 * `pm25` should be of type float64

In [94]:
df_aq = df[['date', 'pm25']]
df_aq['pm25'] = df_aq['pm25'].astype('float32')

df_aq

Unnamed: 0,date,pm25
0,2019-12-09,1.140000
1,2019-12-10,1.200000
2,2019-12-11,2.000000
3,2019-12-12,18.389999
4,2019-12-13,10.950000
...,...,...
2137,2025-11-03,2.150000
2138,2025-11-04,1.150000
2139,2025-11-05,2.170000
2140,2025-11-06,8.640000


In [95]:
# Cast the pm25 column to be a float32 data type
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2142 entries, 0 to 2141
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    2142 non-null   datetime64[ns]
 1   pm25    2142 non-null   float32       
dtypes: datetime64[ns](1), float32(1)
memory usage: 25.2 KB


## <span style='color:#ff5f27'> 🌍 STEP 7: Drop any rows with missing data </span>
It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.

In [96]:
df_aq.dropna(inplace=True)
df_aq

Unnamed: 0,date,pm25
0,2019-12-09,1.140000
1,2019-12-10,1.200000
2,2019-12-11,2.000000
3,2019-12-12,18.389999
4,2019-12-13,10.950000
...,...,...
2137,2025-11-03,2.150000
2138,2025-11-04,1.150000
2139,2025-11-05,2.170000
2140,2025-11-06,8.640000


## <span style='color:#ff5f27'> 🌍 STEP 8: Add country, city, street, url to the DataFrame </span>

Your CSV file may have many other air quality measurement columns. We will only work with the `pm25` column.

We add the columns for the country, city, and street names that you changed for your Air Quality sensor.

We also want to make sure the `pm25` column is a float32 data type.

In [97]:
# Your sensor may have columns we won't use, so only keep the date and pm25 columns
# If the column names in your DataFrame are different, rename your columns to `date` and `pm25`
df_aq['country']=country
df_aq['city']=city
df_aq['street']=street
df_aq['url']=aqicn_url
df_aq

Unnamed: 0,date,pm25,country,city,street,url
0,2019-12-09,1.140000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
1,2019-12-10,1.200000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
2,2019-12-11,2.000000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
3,2019-12-12,18.389999,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
4,2019-12-13,10.950000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
...,...,...,...,...,...,...
2137,2025-11-03,2.150000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
2138,2025-11-04,1.150000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
2139,2025-11-05,2.170000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420
2140,2025-11-06,8.640000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420


In [98]:
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2142 entries, 0 to 2141
Data columns (total 6 columns):
 #   Column   Non-Null Count  Dtype         
---  ------   --------------  -----         
 0   date     2142 non-null   datetime64[ns]
 1   pm25     2142 non-null   float32       
 2   country  2142 non-null   object        
 3   city     2142 non-null   object        
 4   street   2142 non-null   object        
 5   url      2142 non-null   object        
dtypes: datetime64[ns](1), float32(1), object(4)
memory usage: 92.2+ KB


## <span style='color:#ff5f27'> 👀 Assignment for level C: Adding lagged air quality data </span>

Adding the pm25 value for previous day, two days ago and three days ago as features.

In [99]:
#Adding columns for lagged air quality
#FRÅGA: översta kommer bli tomma - bör vi ta bort de raderna??
#KOM IHÅG: uppdatera feature description där nere

df_aq["lagged_aq_1_day"] = df_aq["pm25"].shift(1)
df_aq["lagged_aq_2_days"] = df_aq["pm25"].shift(2)
df_aq["lagged_aq_3_days"] = df_aq["pm25"].shift(3)

df_aq

Unnamed: 0,date,pm25,country,city,street,url,lagged_aq_1_day,lagged_aq_2_days,lagged_aq_3_days
0,2019-12-09,1.140000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,,,
1,2019-12-10,1.200000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,1.140000,,
2,2019-12-11,2.000000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,1.200000,1.14,
3,2019-12-12,18.389999,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,2.000000,1.20,1.14
4,2019-12-13,10.950000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,18.389999,2.00,1.20
...,...,...,...,...,...,...,...,...,...
2137,2025-11-03,2.150000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,2.880000,0.56,0.68
2138,2025-11-04,1.150000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,2.150000,2.88,0.56
2139,2025-11-05,2.170000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,1.150000,2.15,2.88
2140,2025-11-06,8.640000,sweden,solna,anders-lundstroms-gata,https://api.waqi.info/feed/A61420,2.170000,1.15,2.15


---

## <span style='color:#ff5f27'> 🌦 Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## <span style='color:#ff5f27'> 🌍 STEP 9: Download the Historical Weather Data </span>

https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

We will download the historical weather data for your `city` by first extracting the earliest date from your DataFrame containing the historical air quality measurements.

We will download all daily historical weather data measurements for your `city` from the earliest date in your air quality measurement DataFrame. It doesn't matter if there are missing days of air quality measurements. We can store all of the daily weather measurements, and when we build our training dataset, we will join up the air quality measurements for a given day to its weather features for that day. 

The weather features we will download are:

 * `temperature (average over the day)`
 * `precipitation (the total over the day)`
 * `wind speed (average over the day)`
 * `wind direction (the most dominant direction over the day)`


In [100]:
earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

weather_df = util.get_historical_weather(city, earliest_aq_date, str(today), latitude, longitude)

Coordinates 59.3673095703125°N 18.0°E
Elevation 13.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [101]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2167 entries, 0 to 2166
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         2167 non-null   datetime64[ns]
 1   temperature_2m_mean          2167 non-null   float32       
 2   precipitation_sum            2167 non-null   float32       
 3   wind_speed_10m_max           2167 non-null   float32       
 4   wind_direction_10m_dominant  2167 non-null   float32       
 5   city                         2167 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 67.8+ KB


## <span style='color:#ff5f27'> 🌍 STEP 10: Define Data Validation Rules </span>

We will validate the air quality measurements (`pm25` values) before we write them to Hopsworks.

We define a data validation rule (an expectation in Great Expectations) that ensures that `pm25` values are not negative or above the max value available by the sensor.

We will attach this expectation to the air quality feature group, so that we validate the `pm25` data every time we write a DataFrame to the feature group. We want to prevent garbage-in, garbage-out.

In [102]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":500.0,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 500.0, "strict_min": true}, "meta": {}}

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [103]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---

### <span style="color:#ff5f27;"> 🔮 STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [104]:
fs = project.get_feature_store() 

#### Save country, city, street names as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline

In [105]:
dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

# Replace any existing secret with the new value
secret = secrets.get_secret("SENSOR_LOCATION_JSON")
if secret is not None:
    secret.delete()
    print("Replacing existing SENSOR_LOCATION_JSON")

secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)

Replacing existing SENSOR_LOCATION_JSON
Secret created successfully, explore it at https://c.app.hopsworks.ai:443/account/secrets


Secret('SENSOR_LOCATION_JSON', 'PRIVATE')

### <span style="color:#ff5f27;"> 🔮 STEP 12: Create the Feature Groups and insert the DataFrames in them </span>

### <span style='color:#ff5f27'> 🌫 Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [106]:

#For original assigment
# air_quality_fg = fs.get_or_create_feature_group(
#     name='air_quality',
#     description='Air Quality characteristics of each day',
#     version=1,
#     primary_key=['country','city', 'street'],
#     event_time="date",
#     expectation_suite=aq_expectation_suite
# )

#For C-level (new group)
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=2,
    primary_key=['country','city', 'street'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [107]:
#For original assigment
air_quality_fg.insert(df_aq)


Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1272015/fs/1258614/fg/1711441
2025-11-13 14:10:23,780 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1272015/fs/1258614/fg/1711441


Uploading Dataframe: 100.00% |██████████| Rows 2142/2142 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: air_quality_2_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1272015/jobs/named/air_quality_2_offline_fg_materialization/executions


(Job('air_quality_2_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 500.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 752651
         }
       },
       "result": {
         "observed_value": 0.20000000298023224,
         "element_count": 2142,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-13T01:10:23.000778Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "s

#### Enter a description for each feature in the Feature Group

In [108]:
#For original assigment
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("lagged_aq_1_day", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk, value for yesterday")
air_quality_fg.update_feature_description("lagged_aq_2_days", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk, value for 2 days ago")
air_quality_fg.update_feature_description("lagged_aq_3_days", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk, value for 3 days ago")



%6|1763039436.487|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 50153ms in state UP, 1 identical error(s) suppressed)


<hsfs.feature_group.FeatureGroup at 0x17fd76e50>

### <span style='color:#ff5f27'> 🌦 Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [109]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

#### Insert the DataFrame into the Feature Group

In [110]:
# Insert data
weather_fg.insert(weather_df, wait=True)

2025-11-13 14:10:42,674 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1272015/fs/1258614/fg/1668525


Uploading Dataframe: 100.00% |██████████| Rows 2167/2167 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization


%6|1763039455.836|FAIL|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator: 51.161.81.208:9093: Disconnected (after 50113ms in state UP, 1 identical error(s) suppressed)
%6|1763039455.836|FAIL|rdkafka#consumer-6| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 50113ms in state UP, 1 identical error(s) suppressed)


Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1272015/jobs/named/weather_1_offline_fg_materialization/executions
2025-11-13 14:11:00,225 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-11-13 14:11:03,455 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED


%6|1763039487.633|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.188:9093/bootstrap]: ssl://51.161.81.188:9093/1: Disconnected (after 50097ms in state UP, 1 identical error(s) suppressed)
%6|1763039506.274|FAIL|rdkafka#consumer-6| [thrd:ssl://51.161.80.189:9093/bootstrap]: ssl://51.161.80.189:9093/0: Disconnected (after 49974ms in state UP, 1 identical error(s) suppressed)
%6|1763039506.495|FAIL|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator: 51.161.81.208:9093: Disconnected (after 50200ms in state UP, 1 identical error(s) suppressed)
%6|1763039538.797|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 50132ms in state UP, 1 identical error(s) suppressed)
%6|1763039556.989|FAIL|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator: 51.161.81.208:9093: Disconnected (after 50009ms in state UP, 1 identical error(s) suppressed)
%6|1763039557.184|FAIL|rdkafka#consumer-6| [thrd:ssl://51.161.81.188:9093/bo

2025-11-13 14:12:53,015 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-11-13 14:12:53,203 INFO: Waiting for log aggregation to finish.
2025-11-13 14:13:05,261 INFO: Execution finished successfully.


(Job('weather_1_offline_fg_materialization', 'SPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 739347
         }
       },
       "result": {
         "observed_value": 3.054701328277588,
         "element_count": 2167,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-11-13T01:10:42.000673Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type

#### Enter a description for each feature in the Feature Group

In [None]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x162a02e10>

%6|1763039590.082|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/bootstrap]: ssl://51.161.80.189:9093/0: Disconnected (after 50165ms in state UP, 1 identical error(s) suppressed)
%6|1763039607.112|FAIL|rdkafka#consumer-6| [thrd:ssl://51.161.81.208:9093/bootstrap]: ssl://51.161.81.208:9093/2: Disconnected (after 100005ms in state UP, 1 identical error(s) suppressed)
%6|1763039607.600|FAIL|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator: 51.161.81.208:9093: Disconnected (after 50111ms in state UP, 1 identical error(s) suppressed)
%6|1763039658.079|FAIL|rdkafka#consumer-6| [thrd:GroupCoordinator]: GroupCoordinator: 51.161.81.208:9093: Disconnected (after 50023ms in state UP, 1 identical error(s) suppressed)
%6|1763039658.277|FAIL|rdkafka#consumer-6| [thrd:ssl://51.161.80.189:9093/bootstrap]: ssl://51.161.80.189:9093/0: Disconnected (after 50221ms in state UP, 1 identical error(s) suppressed)
%6|1763039685.331|FAIL|rdkafka#producer-5| [thrd:ssl://51.161.80.189:9093/b

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Daily Feature Pipeline 
 </span> 


## <span style="color:#ff5f27;">⏭️ **Exercises:** 
 </span> 

Extra Homework:

  * Try adding a new feature based on a rolling window of 3 days for 'pm25'
      * This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
      * df.set_index("date").rolling(3).mean() is only the start....
  * Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters. 
      * Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
      * After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System
  * As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---