# Backfilling Feature Group

## Introduction

This notebook demonstrates how to backfill a feature group in Hopsworks. Backfilling is the process of computing feature values for a feature group for a specific time range. This is useful when you have a new feature group and you want to compute feature values for historical data.

![](public/pipeline.png)

As we can see this is the first step in our pipeline, this code will be only executed once.

### Importing the required libraries

We start by importing the libraries required for this notebook.

In [151]:
import datetime
import pandas as pd
import hopsworks
import great_expectations as ge
import datetime
import util
import json
import os
import warnings
warnings.filterwarnings("ignore")

### Loading Enviroment Secrets

We want to use the secrets stored in the `.env` file in the root of the project. We can load them using the `dotenv` package.

In [152]:
import dotenv

dotenv.load_dotenv()

AQI_API_KEY = os.getenv("AQI_API_KEY")
HOPSWORKS_API_KEY = os.getenv("HOPSWORKS_API_KEY")

### Retrieving the historical air quality data from a CSV file

In [153]:
# We define the path and check if the file exists
csv_file = "data/stockholm-st-eriksgatan-83-air-quality.csv"
util.check_file_path(csv_file)

File successfully found at the path: data/stockholm-st-eriksgatan-83-air-quality.csv


In [154]:
# Variables 
country = "sweden"
city = "stockholm"
street = "stockholm-st-eriksgatan-83"
aqicn_url="https://api.waqi.info/feed/@10523"

today = datetime.date.today()
latitude, longitude = util.get_city_coordinates(city)
print("Latitude: ", latitude, "Longitude: ", longitude)

Latitude:  59.33 Longitude:  18.07


In [155]:
# Login to Hopsworks
project = hopsworks.login()

secrets = util.secrets_api(project.name)
try:
    secrets.create_secret("AQI_API_KEY", AQI_API_KEY)
except hopsworks.RestAPIError:
    AQI_API_KEY = secrets.get_secret("AQI_API_KEY").value

Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1157270
Connected. Call `.close()` to terminate connection gracefully.


In [156]:
# 
try:
    aq_today_df = util.get_pm25(aqicn_url, country, city, street, today, AQI_API_KEY)
except hopsworks.RestAPIError:
    print("It looks like the AQI_API_KEY doesn't work for your sensor. Is the API key correct? Is the sensor URL correct?")

aq_today_df.head()

Unnamed: 0,pm25,country,city,street,date,url
0,16.0,sweden,stockholm,stockholm-st-eriksgatan-83,2024-11-18,https://api.waqi.info/feed/@10523


In [157]:
# Save the data to a CSV file
df = pd.read_csv(csv_file, parse_dates=["date"], skipinitialspace=True)

df = df[["date", "pm25"]]
df["country"] = country
df["city"] = city
df["street"] = street
df["url"] = aqicn_url
df["pm25"] = df["pm25"].astype("float32")

df.sort_values(by="date", inplace=True)

df["pm25_1_days_before"] = df["pm25"].shift(1)
df["pm25_2_days_before"] = df["pm25"].shift(2)
df["pm25_3_days_before"] = df["pm25"].shift(3)
df["pm25_avg_3_days_before"] = df[["pm25_1_days_before", "pm25_2_days_before", "pm25_3_days_before"]].mean(axis=1)

# Drop rows with missing values
df.dropna(inplace=True)

# Let's check the data
df

Unnamed: 0,date,pm25,country,city,street,url,pm25_1_days_before,pm25_2_days_before,pm25_3_days_before,pm25_avg_3_days_before
2111,2018-04-12,26.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,23.0,14.0,55.0,30.666666
2112,2018-04-13,33.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,26.0,23.0,14.0,21.000000
2113,2018-04-14,37.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,33.0,26.0,23.0,27.333334
2114,2018-04-15,64.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,37.0,33.0,26.0,32.000000
2115,2018-04-16,70.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,64.0,37.0,33.0,44.666668
...,...,...,...,...,...,...,...,...,...,...
12,2024-11-13,21.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,21.0,20.0,10.0,17.000000
13,2024-11-14,25.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,21.0,21.0,20.0,20.666666
14,2024-11-15,22.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,25.0,21.0,21.0,22.333334
15,2024-11-16,13.0,sweden,stockholm,stockholm-st-eriksgatan-83,https://api.waqi.info/feed/@10523,22.0,25.0,21.0,22.666666


In [158]:
earliest_date = pd.Series.min(df['date'])
earliest_date = earliest_date.strftime("%Y-%m-%d")
earliest_date

weather_df = util.get_historical_weather(city, earliest_date, str(today), latitude, longitude)

Coordinates 59.29701232910156°N 18.163265228271484°E
Elevation 18.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [159]:
weather_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2411 entries, 0 to 2410
Data columns (total 6 columns):
 #   Column                       Non-Null Count  Dtype         
---  ------                       --------------  -----         
 0   date                         2411 non-null   datetime64[ns]
 1   temperature_2m_mean          2411 non-null   float32       
 2   precipitation_sum            2411 non-null   float32       
 3   wind_speed_10m_max           2411 non-null   float32       
 4   wind_direction_10m_dominant  2411 non-null   float32       
 5   city                         2411 non-null   object        
dtypes: datetime64[ns](1), float32(4), object(1)
memory usage: 94.2+ KB


## Expectation Suite

The goal here is to add an expectation suite to the feature group. This will allow us to validate the data that is inserted into the feature group. We will use the `great_expectations` library to create the expectation suite.

For example, we filter out the rows less than 0 and greater than 500 for the `pm25` feature. We can add this expectation to the feature group.


In [160]:
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite",
)

weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col, min_value, max_value, expectation_suite):
    expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":min_value,
                "max_value":max_value,
                "strict_min":True
            }
        )
    )

expect_greater_than_zero("pm25", min_value=-0.1, max_value=500.0, expectation_suite=aq_expectation_suite)

expect_greater_than_zero("precipitation_sum", min_value=0.0, max_value=1000.0, expectation_suite=weather_expectation_suite)

expect_greater_than_zero("wind_speed_10m_max", min_value=0.0, max_value=1000.0, expectation_suite=weather_expectation_suite)

In [161]:

dict_obj = {
    "country": country,
    "city": city,
    "street": street,
    "aqicn_url": aqicn_url,
    "latitude": latitude,
    "longitude": longitude,
}

# Convert the dictionary to a JSON string
str_dict = json.dumps(dict_obj)

try:
    secrets.create_secret("SENSOR_LOCATION_JSON", str_dict)
except hopsworks.RestAPIError:
    print("SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.")
    existing_key = secrets.get_secret("SENSOR_LOCATION_JSON").value
    print(f"{existing_key}")

SENSOR_LOCATION_JSON already exists. To update, delete the secret in the UI (https://c.app.hopsworks.ai/account/secrets) and re-run this cell.
{"country": "sweden", "city": "stockholm", "street": "stockholm-st-eriksgatan-83", "aqicn_url": "https://api.waqi.info/feed/@10523", "latitude": 59.33, "longitude": 18.07}


In [169]:
fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.


In [170]:
air_quality_fg = fs.get_or_create_feature_group(
    name='air_quality',
    description='Air Quality characteristics of each day',
    version=1,
    primary_key=['city', 'street', 'date'],
    event_time="date",
    expectation_suite=aq_expectation_suite
)

air_quality_fg.insert(df)

air_quality_fg.update_feature_description("date", "Date of measurement of air quality")
air_quality_fg.update_feature_description("country", "Country where the air quality was measured (sometimes a city in acqcn.org)")
air_quality_fg.update_feature_description("city", "City where the air quality was measured")
air_quality_fg.update_feature_description("street", "Street in the city where the air quality was measured")
air_quality_fg.update_feature_description("pm25", "Particles less than 2.5 micrometers in diameter (fine particles) pose health risk")
air_quality_fg.update_feature_description("pm25_1_days_before", "PM2.5 value 1 day before")
air_quality_fg.update_feature_description("pm25_2_days_before", "PM2.5 value 2 days before")
air_quality_fg.update_feature_description("pm25_3_days_before", "PM2.5 value 3 days before")
air_quality_fg.update_feature_description("pm25_avg_3_days_before", "Average PM2.5 value for the last 3 days")

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1157270/fs/1147973/fg/1353047
2024-11-18 12:47:19,478 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1157270/fs/1147973/fg/1353047


Uploading Dataframe: 0.00% |          | Rows 0/2139 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1157270/jobs/named/air_quality_1_offline_fg_materialization/executions


<hsfs.feature_group.FeatureGroup at 0x1304fa2a0>

In [174]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name='weather',
    description='Weather characteristics of each day',
    version=1,
    primary_key=['city', 'date'],
    event_time="date",
    expectation_suite=weather_expectation_suite
) 


# Insert data
weather_fg.insert(weather_df)

weather_fg.update_feature_description("date", "Date of measurement of weather")
weather_fg.update_feature_description("city", "City where weather is measured/forecast for")
weather_fg.update_feature_description("temperature_2m_mean", "Temperature in Celsius")
weather_fg.update_feature_description("precipitation_sum", "Precipitation (rain/snow) in mm")
weather_fg.update_feature_description("wind_speed_10m_max", "Wind speed at 10m abouve ground")
weather_fg.update_feature_description("wind_direction_10m_dominant", "Dominant Wind direction over the dayd")


Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1157270/fs/1147973/fg/1354064
2024-11-18 12:49:00,623 INFO: 	2 expectation(s) included in expectation_suite.
Validation failed.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1157270/fs/1147973/fg/1354064


Uploading Dataframe: 0.00% |          | Rows 0/2411 | Elapsed Time: 00:00 | Remaining Time: ?

Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai/p/1157270/jobs/named/weather_1_offline_fg_materialization/executions


<hsfs.feature_group.FeatureGroup at 0x30ccbc260>