<span style="font-width:bold; font-size: 3rem; color:#333;">- Part 01: Feature Backfill</span>


## 🗒️ This notebook is divided into the following sections:
1. Fetch historical data
2. Connect to the Hopsworks feature store
3. Create feature groups and insert them to the feature store



### <span style='color:#ff5f27'> 📝 Imports

In [1]:

import pandas as pd
import warnings
import datetime
warnings.filterwarnings("ignore")

---

## <span style='color:#ff5f27'> 🌍 Representing the Target cities </span>

In [4]:
country="germany"
city="reutlingen"
street="Zaisentalstraße"
today = datetime.date.today()

df = pd.read_csv("../data/reutlingen.csv",  parse_dates=['date'], skipinitialspace=True, skiprows=3)
df

Unnamed: 0,date,min,max,pm25,q1,q3,stdev,count
0,2019-12-09 00:00:00+00:00,0.50,1.95,0.82,0.68,0.99,0.259,316
1,2019-12-10 00:00:00+00:00,1.11,16.70,2.63,2.06,4.36,3.166,402
2,2019-12-11 00:00:00+00:00,1.10,11.12,5.27,4.07,5.95,1.760,434
3,2019-12-12 00:00:00+00:00,0.50,17.73,2.31,1.32,3.26,1.752,492
4,2019-12-13 00:00:00+00:00,0.45,6.89,1.18,0.86,1.63,0.724,565
...,...,...,...,...,...,...,...,...
1780,2024-11-01 00:00:00+00:00,1.12,14.50,5.30,3.00,6.70,3.245,569
1781,2024-11-02 00:00:00+00:00,2.60,17.32,8.60,6.47,10.70,2.775,567
1782,2024-11-03 00:00:00+00:00,3.77,15.40,7.40,5.97,8.68,1.780,569
1783,2024-11-04 00:00:00+00:00,2.50,13.50,6.42,5.10,8.27,2.281,538


In [5]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1785 entries, 0 to 1784
Data columns (total 8 columns):
 #   Column  Non-Null Count  Dtype              
---  ------  --------------  -----              
 0   date    1785 non-null   datetime64[ns, UTC]
 1   min     1785 non-null   float64            
 2   max     1785 non-null   float64            
 3   pm25    1785 non-null   float64            
 4   q1      1785 non-null   float64            
 5   q3      1785 non-null   float64            
 6   stdev   1785 non-null   float64            
 7   count   1785 non-null   int64              
dtypes: datetime64[ns, UTC](1), float64(6), int64(1)
memory usage: 111.7 KB


In [26]:
from notebooks.utils.functions import *

df2 = df[['date', 'pm25']]
df2['city']=city
df2['street']=street

latitude, longitude = get_city_coordinates(city)



In [8]:
df2['pm25'] = df2['pm25'].astype('float32')
df2["pm25_yesterday"] = df2["pm25"].shift(1)
df2

Unnamed: 0,date,pm25,city,street,pm25_yesterday
0,2019-12-09 00:00:00+00:00,0.82,reutlingen,Zaisentalstraße,
1,2019-12-10 00:00:00+00:00,2.63,reutlingen,Zaisentalstraße,0.82
2,2019-12-11 00:00:00+00:00,5.27,reutlingen,Zaisentalstraße,2.63
3,2019-12-12 00:00:00+00:00,2.31,reutlingen,Zaisentalstraße,5.27
4,2019-12-13 00:00:00+00:00,1.18,reutlingen,Zaisentalstraße,2.31
...,...,...,...,...,...
1780,2024-11-01 00:00:00+00:00,5.30,reutlingen,Zaisentalstraße,5.55
1781,2024-11-02 00:00:00+00:00,8.60,reutlingen,Zaisentalstraße,5.30
1782,2024-11-03 00:00:00+00:00,7.40,reutlingen,Zaisentalstraße,8.60
1783,2024-11-04 00:00:00+00:00,6.42,reutlingen,Zaisentalstraße,7.40


In [9]:
df2.dropna(inplace=True)
df2

Unnamed: 0,date,pm25,city,street,pm25_yesterday
1,2019-12-10 00:00:00+00:00,2.63,reutlingen,Zaisentalstraße,0.82
2,2019-12-11 00:00:00+00:00,5.27,reutlingen,Zaisentalstraße,2.63
3,2019-12-12 00:00:00+00:00,2.31,reutlingen,Zaisentalstraße,5.27
4,2019-12-13 00:00:00+00:00,1.18,reutlingen,Zaisentalstraße,2.31
5,2019-12-14 00:00:00+00:00,1.02,reutlingen,Zaisentalstraße,1.18
...,...,...,...,...,...
1780,2024-11-01 00:00:00+00:00,5.30,reutlingen,Zaisentalstraße,5.55
1781,2024-11-02 00:00:00+00:00,8.60,reutlingen,Zaisentalstraße,5.30
1782,2024-11-03 00:00:00+00:00,7.40,reutlingen,Zaisentalstraße,8.60
1783,2024-11-04 00:00:00+00:00,6.42,reutlingen,Zaisentalstraße,7.40


In [10]:
earliest_aq_date = pd.Series.min(df2['date'])

In [11]:
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date

'2019-12-10'

In [12]:
df_air_quality=df2
df_air_quality.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1784 entries, 1 to 1784
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype              
---  ------          --------------  -----              
 0   date            1784 non-null   datetime64[ns, UTC]
 1   pm25            1784 non-null   float32            
 2   city            1784 non-null   object             
 3   street          1784 non-null   object             
 4   pm25_yesterday  1784 non-null   float32            
dtypes: datetime64[ns, UTC](1), float32(2), object(2)
memory usage: 69.7+ KB


### <span style="color:#ff5f27;">🛠 Feature Engineering</span>

In [13]:
# Print the shape (number of rows and columns) of the df_air_quality DataFrame
df_air_quality.shape

(1784, 5)

In [14]:
# Retrieve and display the column names of the df_air_quality DataFrame
df_air_quality.columns

Index(['date', 'pm25', 'city', 'street', 'pm25_yesterday'], dtype='object')

---

## <span style='color:#ff5f27'> 🌦 Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## Instructions for weather
https://open-meteo.com/en/docs/historical-weather-api#hourly=&daily=temperature_2m_mean,precipitation_sum,wind_speed_10m_max,wind_direction_10m_dominant

In [15]:
weather_df = get_historical_weather(city, earliest_aq_date, str(today))

Coordinates 48.47100067138672°N 9.075630187988281°E
Elevation 385.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [16]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1791 entries, 0 to 1790
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         1791 non-null   datetime64[ns]
 1   temperature_2m_mean          1791 non-null   float32       
 2   precipitation_sum            1791 non-null   float32       
 3   wind_speed_10m_max           1791 non-null   float32       
 4   wind_direction_10m_dominant  1791 non-null   float32       
 5   city                         1791 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 70.0+ KB


In [17]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"pm25",
            "min_value":-0.1,
            "max_value":999.9,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "pm25", "min_value": -0.1, "max_value": 999.9, "strict_min": true}, "meta": {}}

In [18]:
import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")

---

---

### <span style="color:#ff5f27;"> 🔮 Connecting to Hopsworks Feature Store </span>

In [19]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store() 

Copy your Api Key (first register/login): https://c.app.hopsworks.ai/account/api/generated
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1157271
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27;">🪄 Creating Feature Groups</span>

### <span style='color:#ff5f27'> 🌫 Air Quality Data

In [20]:
# Get or create feature group
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['city','street','date'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)    

In [21]:
# Insert data
air_quality_fg.insert(df_air_quality)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1157271/fs/1147974/fg/1336668
2024-11-05 13:56:04,414 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1157271/fs/1147974/fg/1336668


Uploading Dataframe: 0.00% |          | Rows 0/1784 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1157271/jobs/named/air_quality_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x151c98ed550>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "pm25",
           "min_value": -0.1,
           "max_value": 999.9,
           "strict_min": true
         },
         "meta": {
           "expectationId": 659460
         }
       },
       "result": {
         "observed_value": 0.550000011920929,
         "element_count": 1784,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-11-05T12:56:04.000414Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     "successful_expectations

In [22]:
air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("city", "Place where the air quality was measured (sometimes a country in acqcn.org)")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")

<hsfs.feature_group.FeatureGroup at 0x151c96fab10>

### <span style='color:#ff5f27'> 🌦 Weather Data

In [23]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city','date'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 

In [24]:
# Insert data
weather_fg.insert(weather_df)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1157271/fs/1147974/fg/1336669
2024-11-05 13:57:10,818 INFO: 	2 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1157271/fs/1147974/fg/1336669


Uploading Dataframe: 0.00% |          | Rows 0/1791 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1157271/jobs/named/weather_1_offline_fg_materialization/executions


(<hsfs.core.job.Job at 0x151c96fa390>,
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "wind_speed_10m_max",
           "min_value": -0.1,
           "max_value": 1000.0,
           "strict_min": true
         },
         "meta": {
           "expectationId": 659461
         }
       },
       "result": {
         "observed_value": 4.33497428894043,
         "element_count": 1791,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2024-11-05T12:57:10.000818Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     },
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column

In [25]:
weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")

<hsfs.feature_group.FeatureGroup at 0x151c97447d0>

## <span style="color:#ff5f27;">⏭️ **Next:** Part 02: Feature Pipeline 
 </span> 

In the following notebook you will parse data and insert it into Feature Groups.

## <span style="color:#ff5f27;">⏭️ **Exercises:** 
 </span> 
    
    * Add a rolling window of 3 days and 5 days for 'pm25'
        df.set_index("date").rolling(3).mean().head()
