In [1]:
import sys
from pathlib import Path
import warnings
warnings.filterwarnings("ignore", module="IPython")

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/featurestorebook/mlfs-book.git
    %cd mlfs-book

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()
    root_dir = str(Path().absolute())
    print("Google Colab environment")
else:
    root_dir = Path().absolute()
    # Strip ~/notebooks/ccfraud from PYTHON_PATH if notebook started in one of these subdirectories
    if root_dir.parts[-1:] == ('airquality',):
        root_dir = Path(*root_dir.parts[:-1])
    if root_dir.parts[-1:] == ('notebooks',):
        root_dir = Path(*root_dir.parts[:-1])
    root_dir = str(root_dir) 
    print("Local environment")

print(f"Root dir: {root_dir}")

# Add the root directory to the `PYTHONPATH` 
if root_dir not in sys.path:
    sys.path.append(root_dir)
    print(f"Added the following directory to the PYTHONPATH: {root_dir}")

# Set the environment variables from the file <root_dir>/.env
from mlfs import config
settings = config.HopsworksSettings(_env_file=f"{root_dir}/.env")

Local environment
Root dir: /Users/alexanderdahm/Documents/GitHub/mlfs-book-proj
Added the following directory to the PYTHONPATH: /Users/alexanderdahm/Documents/GitHub/mlfs-book-proj
HopsworksSettings initialized!


### <span style='color:#ff5f27'> üìù Imports

In [2]:
import datetime
import requests
import pandas as pd
import hopsworks
from mlfs.airquality import util
import datetime
from pathlib import Path
import json
import re
import os
import warnings
warnings.filterwarnings("ignore")






---

In [3]:
project = hopsworks.login(engine="python")

2025-12-27 16:31:22,242 INFO: Initializing external client
2025-12-27 16:31:22,242 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-12-27 16:31:23,771 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1290388


In [4]:

today = datetime.date.today()
csv_file=f"{root_dir}/data/Day-ahead_SE2_SEK_2023-2025ytd.csv"
util.check_file_path(csv_file)


secrets = hopsworks.get_secrets_api()

# If this API call fails (it fails in a github action), then set longitude and latitude explicitly - comment out next line
#latitude, longitude = util.get_city_coordinates(city)
# Uncomment this if API call to get longitude and latitude
# latitude = sensorList[i].lat
# longitude = sensorList[i].lon

File successfully found at the path: /Users/alexanderdahm/Documents/GitHub/mlfs-book-proj/data/Day-ahead_SE2_SEK_2023-2025ytd.csv


## <span style='color:#ff5f27'> üåç STEP 5: Read your CSV file into a DataFrame </span>

The cell below will read up historical air quality data as a CSV file into a Pandas DataFrame

In [5]:
df = pd.read_csv(csv_file,  parse_dates=['date'], skipinitialspace=True, sep=';', decimal=',')
df
df['sek'] = (
    df['sek']
      .astype(str)
      .str.replace(' ', '', regex=False)   # remove spaces
      .str.replace(',', '.', regex=False)  # decimal comma ‚Üí dot
      .astype(float)                       # convert to float
)

print(df.dtypes)

date    datetime64[ns]
sek            float64
dtype: object


## <span style='color:#ff5f27'> üåç STEP 6: Data cleaning</span>



## Check the data types for the columns in your DataFrame

 * `date` should be of type   datetime64[ns] 
 * `pm25` should be of type float64

In [6]:
df_aq = df[['date', 'sek']]

df_aq['sek'] = df_aq['sek'].astype('float32')
df_aq
df_aq.dtypes


date    datetime64[ns]
sek            float32
dtype: object

In [7]:
# Cast the pm25 column to be a float32 data type
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1089 entries, 0 to 1088
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    1089 non-null   datetime64[ns]
 1   sek     1089 non-null   float32       
dtypes: datetime64[ns](1), float32(1)
memory usage: 12.9 KB


## <span style='color:#ff5f27'> üåç STEP 7: Drop any rows with missing data </span>
It will make the model training easier if there is no missing data in the rows, so we drop any rows with missing data.

In [8]:
df_aq.dropna(inplace=True)

df_aq = df_aq.sort_values("date").reset_index(drop=True)

#df_aq["pm25_roll3"] = df_aq["pm25"].shift(1).rolling(window=3).mean()

#df_aq = df_aq.dropna(subset=["pm25_roll3"])

df_aq.dtypes

date    datetime64[ns]
sek            float32
dtype: object

In [9]:
df_aq['zone'] = "SE2"
df_aq
df_aq.dtypes

date    datetime64[ns]
sek            float32
zone            object
dtype: object

In [10]:
df_aq.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1089 entries, 0 to 1088
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   date    1089 non-null   datetime64[ns]
 1   sek     1089 non-null   float32       
 2   zone    1089 non-null   object        
dtypes: datetime64[ns](1), float32(1), object(1)
memory usage: 21.4+ KB


---

## <span style='color:#ff5f27'> üå¶ Loading Weather Data from [Open Meteo](https://open-meteo.com/en/docs)

## <span style='color:#ff5f27'> üåç STEP 9: Download the Historical Weather Data </span>
Load weather data from 5 different cities located in our energy zone. Then concat all values into a single dataframe. There will therefore be 5 distinct weather values on each date.

The weather features we will download are:

 * `temperature (average over the day)`
 * `precipitation (the total over the day)`
 * `wind speed (average over the day)`
 * `wind direction (the most dominant direction over the day)`


In [11]:
earliest_aq_date = pd.Series.min(df_aq['date'])
earliest_aq_date = earliest_aq_date.strftime('%Y-%m-%d')
earliest_aq_date



# 5 diffeten weather sensors
cities = [
    {"name": "flasjon", "lat": 62.760350390111626, "lon": 13.715986496712969},
    {"name": "hudiksvall", "lat": 61.790862930411194, "lon": 17.15754858778168},
    {"name": "ange", "lat": 62.54989082316923, "lon": 15.751547550392734},
    {"name": "solleftea", "lat": 63.159587742988755, "lon": 17.2655114712721},
    {"name": "umea", "lat": 63.81702480736613, "lon": 20.18691175826482},
]

# Store indivudal city data frames
all_weather_data = []

for city in cities:
    weather_df = util.get_historical_weather(city["name"], earliest_aq_date, str(today), city["lat"], city["lon"])
    
    # Rename columns to include city name
    weather_df = weather_df.rename(columns={col: f"{col}_{city['name']}" for col in weather_df.columns if col != "date"})
    
    all_weather_data.append(weather_df)

# Merge all dataframes on date
combined_weather_df = all_weather_data[0]
for df in all_weather_data[1:]:
    combined_weather_df = pd.merge(combined_weather_df, df, on="date", how="outer")



Coordinates 62.74164962768555¬∞N 13.77550983428955¬∞E
Elevation 478.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 61.82776641845703¬∞N 17.11111068725586¬∞E
Elevation 65.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 62.53075408935547¬∞N 15.721518516540527¬∞E
Elevation 165.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 63.16344451904297¬∞N 17.25388526916504¬∞E
Elevation 66.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s
Coordinates 63.866432189941406¬∞N 20.106382369995117¬∞E
Elevation 18.0 m asl
Timezone None None
Timezone difference to GMT+0 0 s


In [12]:
combined_weather_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1092 entries, 0 to 1091
Data columns (total 26 columns):
 #   Column                                  Non-Null Count  Dtype         
---  ------                                  --------------  -----         
 0   date                                    1092 non-null   datetime64[ns]
 1   temperature_2m_mean_flasjon             1092 non-null   float32       
 2   precipitation_sum_flasjon               1092 non-null   float32       
 3   wind_speed_10m_max_flasjon              1092 non-null   float32       
 4   wind_direction_10m_dominant_flasjon     1092 non-null   float32       
 5   city_flasjon                            1092 non-null   object        
 6   temperature_2m_mean_hudiksvall          1092 non-null   float32       
 7   precipitation_sum_hudiksvall            1092 non-null   float32       
 8   wind_speed_10m_max_hudiksvall           1092 non-null   float32       
 9   wind_direction_10m_dominant_hudiksvall  1092 non-nul

## <span style='color:#ff5f27'> üåç STEP 10: Define Data Validation Rules </span>


In [13]:
import great_expectations as ge
aq_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="aq_expectation_suite"
)

aq_expectation_suite.add_expectation(
    ge.core.ExpectationConfiguration(
        expectation_type="expect_column_min_to_be_between",
        kwargs={
            "column":"sek",
            "min_value":-5000,
            "max_value":10000,
            "strict_min":True
        }
    )
)

{"expectation_type": "expect_column_min_to_be_between", "kwargs": {"column": "sek", "min_value": -5000, "max_value": 10000, "strict_min": true}, "meta": {}}

## Expectations for Weather Data
Here, we define an expectation for 2 columns in our weather DataFrame - `precipitation_sum` and `wind_speed_10m_max`, where we expect both values to be greater than zero, but less than 1000.

In [14]:
"""import great_expectations as ge
weather_expectation_suite = ge.core.ExpectationSuite(
    expectation_suite_name="weather_expectation_suite"
)

def expect_greater_than_zero(col):
    weather_expectation_suite.add_expectation(
        ge.core.ExpectationConfiguration(
            expectation_type="expect_column_min_to_be_between",
            kwargs={
                "column":col,
                "min_value":-0.1,
                "max_value":1000.0,
                "strict_min":True
            }
        )
    )
expect_greater_than_zero("precipitation_sum")
expect_greater_than_zero("wind_speed_10m_max")"""

'import great_expectations as ge\nweather_expectation_suite = ge.core.ExpectationSuite(\n    expectation_suite_name="weather_expectation_suite"\n)\n\ndef expect_greater_than_zero(col):\n    weather_expectation_suite.add_expectation(\n        ge.core.ExpectationConfiguration(\n            expectation_type="expect_column_min_to_be_between",\n            kwargs={\n                "column":col,\n                "min_value":-0.1,\n                "max_value":1000.0,\n                "strict_min":True\n            }\n        )\n    )\nexpect_greater_than_zero("precipitation_sum")\nexpect_greater_than_zero("wind_speed_10m_max")'

---

### <span style="color:#ff5f27;"> üîÆ STEP 11: Connect to Hopsworks and save the sensor country, city, street names as a secret</span>

In [15]:
fs = project.get_feature_store() 

### <span style="color:#ff5f27;"> üîÆ STEP 12: Create the Feature Groups and insert the DataFrames in them </span>

### <span style='color:#ff5f27'> üå´ Air Quality Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each air quality sensor measurement is uniquely identified by `country`, `street`, and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [16]:
energy_price_fg = fs.get_or_create_feature_group(
    name=f"energy_price",
    description='Energy price of each day',
    version=1,
    primary_key=['zone'],
    event_time="date",
    stream=True,
    expectation_suite=aq_expectation_suite
)

#### Insert the DataFrame into the Feature Group

In [22]:
energy_price_fg.insert(df_aq)

2025-12-27 16:37:08,862 INFO: 	1 expectation(s) included in expectation_suite.
Validation succeeded.
Validation Report saved successfully, explore a summary at https://c.app.hopsworks.ai:443/p/1290388/fs/1279043/fg/1869353


Uploading Dataframe: 100.00% |‚ñà| Rows 1089/1089 | Elapsed Time: 00:00 | Remainin


Launching job: energy_price_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1290388/jobs/named/energy_price_1_offline_fg_materialization/executions


(Job('energy_price_1_offline_fg_materialization', 'PYSPARK'),
 {
   "success": true,
   "results": [
     {
       "success": true,
       "expectation_config": {
         "expectation_type": "expect_column_min_to_be_between",
         "kwargs": {
           "column": "sek",
           "min_value": -5000,
           "max_value": 10000,
           "strict_min": true
         },
         "meta": {
           "expectationId": 797739
         }
       },
       "result": {
         "observed_value": -95.55000305175781,
         "element_count": 1089,
         "missing_count": null,
         "missing_percent": null
       },
       "meta": {
         "ingestionResult": "INGESTED",
         "validationTime": "2025-12-27T03:37:08.000861Z"
       },
       "exception_info": {
         "raised_exception": false,
         "exception_message": null,
         "exception_traceback": null
       }
     }
   ],
   "evaluation_parameters": {},
   "statistics": {
     "evaluated_expectations": 1,
     

#### Enter a description for each feature in the Feature Group

In [18]:
energy_price_fg.update_feature_description("date", "Date of measurement of energy price")
energy_price_fg.update_feature_description("zone", "Zone where measurement are taken")
energy_price_fg.update_feature_description("sek", "Energy price in SEK")

<hsfs.feature_group.FeatureGroup at 0x177aebfa0>

### <span style='color:#ff5f27'> üå¶ Weather Data
    
 1. Provide a name, description, and version for the feature group.
 2. Define the `primary_key`: we have to select which columns uniquely identify each row in the DataFrame - by providing them as the `primary_key`. Here, each weather measurement is uniquely identified by `city` and  `date`.
 3. Define the `event_time`: We also define which column stores the timestamp or date for the row - `date`.
 4. Attach any `expectation_suite` containing data validation rules

In [19]:
# Get or create feature group 
weather_fg = fs.get_or_create_feature_group(
    name=f"weather",
    description='Weather characteristics of each day',
    version=1,
    #primary_key="date",
    event_time="date",
    stream=True,
    #expectation_suite=weather_expectation_suite
) 

#### Insert the DataFrame into the Feature Group

In [20]:
# Insert data
weather_fg.insert(combined_weather_df, wait=True)

Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1290388/fs/1279043/fg/1869354


Uploading Dataframe: 100.00% |‚ñà| Rows 1092/1092 | Elapsed Time: 00:00 | Remainin


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1290388/jobs/named/weather_1_offline_fg_materialization/executions
2025-12-27 16:32:00,928 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-27 16:32:04,279 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-12-27 16:34:51,166 INFO: Waiting for execution to finish. Current state: SUCCEEDING. Final status: UNDEFINED
2025-12-27 16:34:54,367 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-27 16:34:54,556 INFO: Waiting for log aggregation to finish.
2025-12-27 16:35:16,820 INFO: Execution finished successfully.


(Job('weather_1_offline_fg_materialization', 'PYSPARK'), None)

#### Enter a description for each feature in the Feature Group

In [21]:
weather_fg.update_feature_description("date", "Date of measurement of weather")

<hsfs.feature_group.FeatureGroup at 0x314706260>