<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill for Air Quality Data</span>


## 🗒️ You have the following tasks
1. Choose an Air Quality Sensor
2. Update the country, city, and street information to point to YOUR chosen Air Quality Sensor
3. Download historical measures for your Air Quality Sensor as a CSV file
4. Update the path of the CSV file in this notebook to point to the one that you downloaded
5. Create an account on www.hopsworks.ai and get your HOPSWORKS_API_KEY
6. Run this notebook



### <span style='color:#ff5f27'> 📝 Imports

In [2]:
import datetime
import requests
import pandas as pd
import hopsworks
import datetime
from pathlib import Path
from functions import util
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")

### IF YOU WANT TO WIPE OUT ALL OF YOUR FEATURES AND MODELS, run the cell below

In [3]:
# # # If you haven't set the env variable 'HOPSWORKS_API_KEY', then uncomment the next line and enter your API key
# with open('../../data/hopsworks-api-key.txt', 'r') as file:
#    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()
# project = hopsworks.login()
# util.purge_project(project)

## Hopsworks API Key
You need to have registered an account on app.hopsworks.ai.
You will be prompted to enter your API key here, unless you set it as the environment variable HOPSWORKS_API_KEY (my preffered approach).

In [4]:
with open('../../data/hopsworks-api-key.txt', 'r') as file:
    os.environ["HOPSWORKS_API_KEY"] = file.read().rstrip()
    
project = hopsworks.login()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1169565


In [5]:
csv_file="../../data/Lokuciewskiego-Warszawa.csv"
util.check_file_path(csv_file)

File successfully found at the path: ../../data/Lokuciewskiego-Warszawa.csv


## <span style='color:#ff5f27'> 🌍 STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame

In [6]:
df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True)
df

Unnamed: 0,date,min,max,median,q1,q3,stdev,count
0,2019-12-09 00:00:00+00:00,3.68,27.58,17.00,9.91,21.19,6.446,304
1,2019-12-10 00:00:00+00:00,2.50,34.82,12.92,5.04,23.59,9.840,397
2,2019-12-11 00:00:00+00:00,4.82,53.52,29.46,18.12,33.97,10.053,418
3,2019-12-12 00:00:00+00:00,19.38,51.10,28.29,22.72,32.60,7.211,476
4,2019-12-13 00:00:00+00:00,13.28,61.43,18.63,16.43,22.28,11.133,558
...,...,...,...,...,...,...,...,...
1784,2024-11-08 00:00:00+00:00,5.72,18.67,13.68,10.64,14.87,2.786,421
1785,2024-11-09 00:00:00+00:00,6.63,18.82,13.07,10.65,15.47,2.902,393
1786,2024-11-10 00:00:00+00:00,12.07,30.15,18.00,16.39,19.29,3.152,352
1787,2024-11-11 00:00:00+00:00,11.93,30.48,19.13,16.63,24.77,4.782,427


In [7]:
# TODO: Change these values to point to your Sensor
country="Poland"
city = "Warszawa"
street = "Lokuciewskiego"
aqicn_url="https://api.waqi.info/feed/A65074/"

# This API call may fail if the IP address you run this notebook from is blocked by the Nominatim API
# If this fails, lookup the latitude, longitude using Google and enter the values here.
latitude, longitude = util.get_city_coordinates(city)
# latitude = "53.3498"
# longitude = "-6.2603"
today = datetime.date.today()

## <span style='color:#ff5f27'> 🌍 STEP 6: Data cleaning</span>


### Rename columns if needed and drop unneccessary columns

We want to have a DataFrame with 2 columns - `date` and `pm25` after this cell below:

## Check the data types for the columns in your DataFrame

 * `date` should be of type   datetime64[ns] 
 * `pm25` should be of type float64

In [10]:
# These commands will succeed if your CSV file didn't have a `median` or `timestamp` column
df = df.rename(columns={"median": "pm25"})
df = df.rename(columns={"timestamp": "date"})

df_aq_lagged = df[['date', 'pm25']]
df_aq_lagged['lagged_1'] = df_aq_lagged['pm25'].shift(1)
df_aq_lagged['lagged_2'] = df_aq_lagged['pm25'].shift(2)
df_aq_lagged['lagged_3'] = df_aq_lagged['pm25'].shift(3)

df_aq_lagged['pm25'] = df_aq_lagged['pm25'].astype('float32')
df_aq_lagged['lagged_1'] = df_aq_lagged['lagged_1'].astype('float32')
df_aq_lagged['lagged_2'] = df_aq_lagged['lagged_2'].astype('float32')
df_aq_lagged['lagged_3'] = df_aq_lagged['lagged_3'].astype('float32')


df_aq_lagged

Unnamed: 0,date,pm25,lagged_1,lagged_2,lagged_3
0,2019-12-09 00:00:00+00:00,17.000000,,,
1,2019-12-10 00:00:00+00:00,12.920000,17.000000,,
2,2019-12-11 00:00:00+00:00,29.459999,12.920000,17.000000,
3,2019-12-12 00:00:00+00:00,28.290001,29.459999,12.920000,17.000000
4,2019-12-13 00:00:00+00:00,18.629999,28.290001,29.459999,12.920000
...,...,...,...,...,...
1784,2024-11-08 00:00:00+00:00,13.680000,11.650000,21.620001,8.770000
1785,2024-11-09 00:00:00+00:00,13.070000,13.680000,11.650000,21.620001
1786,2024-11-10 00:00:00+00:00,18.000000,13.070000,13.680000,11.650000
1787,2024-11-11 00:00:00+00:00,19.129999,18.000000,13.070000,13.680000


In [11]:
# Cast the pm25 column to be a float32 data type
df_aq_lagged.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1789 entries, 0 to 1788
Data columns (total 5 columns):
 #   Column    Non-Null Count  Dtype              
---  ------    --------------  -----              
 0   date      1789 non-null   datetime64[ns, UTC]
 1   pm25      1789 non-null   float32            
 2   lagged_1  1788 non-null   float32            
 3   lagged_2  1787 non-null   float32            
 4   lagged_3  1786 non-null   float32            
dtypes: datetime64[ns, UTC](1), float32(4)
memory usage: 42.1 KB


## <span style='color:#ff5f27'> 🌍 STEP 7: Drop any rows with missing data </span>
It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.

In [12]:
df_aq_lagged.dropna(inplace=True)
df_aq_lagged

Unnamed: 0,date,pm25,lagged_1,lagged_2,lagged_3
3,2019-12-12 00:00:00+00:00,28.290001,29.459999,12.920000,17.000000
4,2019-12-13 00:00:00+00:00,18.629999,28.290001,29.459999,12.920000
5,2019-12-14 00:00:00+00:00,18.299999,18.629999,28.290001,29.459999
6,2019-12-15 00:00:00+00:00,6.910000,18.299999,18.629999,28.290001
7,2019-12-16 00:00:00+00:00,6.780000,6.910000,18.299999,18.629999
...,...,...,...,...,...
1784,2024-11-08 00:00:00+00:00,13.680000,11.650000,21.620001,8.770000
1785,2024-11-09 00:00:00+00:00,13.070000,13.680000,11.650000,21.620001
1786,2024-11-10 00:00:00+00:00,18.000000,13.070000,13.680000,11.650000
1787,2024-11-11 00:00:00+00:00,19.129999,18.000000,13.070000,13.680000


## <span style='color:#ff5f27'> 🌍 STEP 8: Add country, city, street, url to the DataFrame </span>

Your CSV file may have many other air quality measurement columns. We will only work with the `pm25` column.

We add the columns for the country, city, and street names that you changed for your Air Quality sensor.

We also want to make sure the `pm25` column is a float32 data type.

In [13]:
# Your sensor may have columns we won't use, so only keep the date and pm25 columns
# If the column names in your DataFrame are different, rename your columns to `date` and `pm25`
df_aq_lagged['country']=country
df_aq_lagged['city']=city
df_aq_lagged['street']=street
df_aq_lagged['url']=aqicn_url
df_aq_lagged

Unnamed: 0,date,pm25,lagged_1,lagged_2,lagged_3,country,city,street,url
3,2019-12-12 00:00:00+00:00,28.290001,29.459999,12.920000,17.000000,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
4,2019-12-13 00:00:00+00:00,18.629999,28.290001,29.459999,12.920000,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
5,2019-12-14 00:00:00+00:00,18.299999,18.629999,28.290001,29.459999,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
6,2019-12-15 00:00:00+00:00,6.910000,18.299999,18.629999,28.290001,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
7,2019-12-16 00:00:00+00:00,6.780000,6.910000,18.299999,18.629999,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
...,...,...,...,...,...,...,...,...,...
1784,2024-11-08 00:00:00+00:00,13.680000,11.650000,21.620001,8.770000,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
1785,2024-11-09 00:00:00+00:00,13.070000,13.680000,11.650000,21.620001,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
1786,2024-11-10 00:00:00+00:00,18.000000,13.070000,13.680000,11.650000,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/
1787,2024-11-11 00:00:00+00:00,19.129999,18.000000,13.070000,13.680000,Poland,Warszawa,Lokuciewskiego,https://api.waqi.info/feed/A65074/


---

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [14]:
import great_expectations as ge
aq_lagged_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_lagged_expectation_suite"
)

def expect_greater_than_zero(col):
    aq_lagged_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":500.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("pm25")
expect_greater_than_zero("lagged_1")
expect_greater_than_zero("lagged_2")
expect_greater_than_zero("lagged_3")

---

### <span style="color:#ff5f27;"> 🔮 STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [15]:
fs = project.get_feature_store() 

Connected. Call `.close()` to terminate connection gracefully.


#### Save country, city, street names as a secret

These will be downloaded from Hopsworks later in the (1) daily feature pipeline and (2) the daily batch inference pipeline

In [16]:
dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)


### <span style="color:#ff5f27;"> 🔮 STEP 12: Create the Feature Groups and insert the DataFrames in them </span>

### <span style='color:#ff5f27'> 🌫 Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [17]:
air_quality_lagged_fg = fs.get_or_create_feature_group(
    name='air_quality_lagged',
    description='Air Quality Lagged characteristics of each day',
    version=1,
    primary_key=['city', 'street', 'date'],
    event_time="date",
    expectation_suite=aq_lagged_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [18]:
air_quality_lagged_fg.insert(df_aq_lagged)

RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1169565/featurestores/1160267/featuregroups). Server response: 
HTTP code: 504, HTTP reason: Gateway Time-out, body: b'<html>\r\n<head><title>504 Gateway Time-out</title></head>\r\n<body>\r\n<center><h1>504 Gateway Time-out</h1></center>\r\n</body>\r\n</html>\r\n', error code: , error msg: , user msg: 

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Daily Feature Pipeline 
 </span> 


## <span style="color:#ff5f27;">⏭️ **Exercises:** 
 </span> 

Extra Homework:

  * Try adding a new feature based on a rolling window of 3 days for 'pm25'
      * This is not easy, as forecasting more than 1 day in the future, you won't have the previous 3 days of pm25 measurements.
      * df.set_index("date").rolling(3).mean() is only the start....
  * Parameterize the notebook, so that you can provide the `country`/`street`/`city`/`url`/`csv_file` as parameters. 
      * Hint: this will also require making the secret name (`SENSOR_LOCATION_JSON`), e.g., add the street name as part of the secret name. Then you have to pass that secret name as a parameter when running the operational feature pipeline and batch inference pipelines.
      * After you have done this, collect the street/city/url/csv files for all the sensors in your city or region and you make dashboards for all of the air quality sensors in your city/region. You could even then add a dashboard for your city/region, as done [here for Poland](https://github.com/erno98/ID2223).

Improve this AI System
  * As of mid 2024, there is no API call available to download historical data from the AQIN website. You could improve this system by writing a PR to download the CSV file using Python Selenium and the URL for the sensor.


---