In [1]:
import hopsworks
import pandas as pd
import joblib
import time
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from sklearn.feature_selection import chi2
from sklearn.feature_selection import SelectPercentile, f_regression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from hsml.schema import Schema
from hsml.model_schema import ModelSchema
from sklearn.metrics import mean_squared_error, r2_score
import numpy as np
import os, warnings
warnings.filterwarnings("ignore")

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Connect to Hopsworks
project = hopsworks.login()
fs = project.get_feature_store()

2025-12-08 00:09:09,688 INFO: Initializing external client
2025-12-08 00:09:09,688 INFO: Base URL: https://c.app.hopsworks.ai:443






2025-12-08 00:09:17,610 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1315990


In [3]:
df_air_quality = pd.read_csv("data/final_air_quality_data.csv")
df_weather = pd.read_csv("data/final_weather_data.csv")


In [4]:
df_air_quality['date'] = pd.to_datetime(df_air_quality['date'])
df_weather['date'] = pd.to_datetime(df_weather['date'])


In [5]:
# Prepare DataFrames for Hopsworks insertion (Remove unused columns)
df_air_quality_hops = df_air_quality.drop(columns=['month', 'log_pm2_5'], errors='ignore')
df_weather_hops = df_weather.drop(columns=['month'], errors='ignore')


In [8]:
# Delete the existing feature groups
try:
    fs.get_feature_group(name="air_quality", version=1).delete()
    fs.get_feature_group(name="weather", version=1).delete()
    print("Existing Feature Groups deleted successfully.")
except:
    print("Feature Groups not found or could not be deleted, proceeding.")


Existing Feature Groups deleted successfully.


In [9]:
# Create/Retrieve Feature Groups (MUST BE FIRST)
air_quality_fg = fs.get_or_create_feature_group(
    name="air_quality",
    version=1,
    primary_key=["city_name", "date"],
    event_time="date",
    description="PM2.5 air quality readings"
)

weather_fg = fs.get_or_create_feature_group(
    name="weather",
    version=1,
    primary_key=["city_name", "date"],
    event_time="date",
    description="Daily weather measurements"
)

In [11]:
# Insert Data into Feature Groups (MOVED HERE)
print("Inserting Air Quality Data...")
air_quality_fg.insert(df_air_quality_hops, write_options={"wait_for_job": True})

print("Inserting Weather Data...")
weather_fg.insert(df_weather_hops, write_options={"wait_for_job": True})

print(" Data successfully pushed to feature store")

Inserting Air Quality Data...


Uploading Dataframe: 100.00% |███████████████████████| Rows 112590/112590 | Elapsed Time: 00:21 | Remaining Time: 00:00


Launching job: air_quality_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1315990/jobs/named/air_quality_1_offline_fg_materialization/executions
2025-12-08 00:24:24,585 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-08 00:24:27,863 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-12-08 00:26:22,643 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-08 00:26:22,912 INFO: Waiting for log aggregation to finish.
2025-12-08 00:26:31,821 INFO: Execution finished successfully.
Inserting Weather Data...
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1315990/fs/1299526/fg/1787408


Uploading Dataframe: 100.00% |███████████████████████| Rows 169515/169515 | Elapsed Time: 00:58 | Remaining Time: 00:00


Launching job: weather_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1315990/jobs/named/weather_1_offline_fg_materialization/executions
2025-12-08 00:27:52,184 INFO: Waiting for execution to finish. Current state: SUBMITTED. Final status: UNDEFINED
2025-12-08 00:27:55,455 INFO: Waiting for execution to finish. Current state: RUNNING. Final status: UNDEFINED
2025-12-08 00:30:06,753 INFO: Waiting for execution to finish. Current state: AGGREGATING_LOGS. Final status: SUCCEEDED
2025-12-08 00:30:07,027 INFO: Waiting for log aggregation to finish.
2025-12-08 00:30:16,005 INFO: Execution finished successfully.
 Data successfully pushed to feature store


In [None]:
version = 4

try:
    existing_feature_view = fs.get_feature_view(name='air_quality_fv', version=version)
    print(f"Cleaning (deleting) existing Feature View 'air_quality_fv' (v{version}).")
    existing_feature_view.delete()
except:
    pass


try:
    feature_view = fs.create_feature_view(
        name='air_quality_fv',
        version=version,
        labels=['pm2_5'],
        query=query, # This query uses the recently created FGs
    )
    print(f"\nFeature View 'air_quality_fv' (v{version}) created successfully.")

except Exception as e:
    print(f"Error creating Feature View: {e}")
    raise

print(f"Feature View 'air_quality_fv' (v{version}) is ready.")

Cleaning (deleting) existing Feature View 'air_quality_fv' (v4).
Feature view created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1315990/fs/1299526/fv/air_quality_fv/version/4

Feature View 'air_quality_fv' (v4) created successfully.
Feature View 'air_quality_fv' (v4) is ready.


In [38]:
# View Merged Data and Missing Values
print("\n Merged Feature View Data Head")
merged_data_df = query.read()
print(merged_data_df.head())



 Merged Feature View Data Head
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (68.41s) 
   pm2_5    city_name                      date  temperature_max  \
0    7.7  Albuquerque 2013-01-26 00:00:00+00:00             13.7   
1    6.0  Albuquerque 2013-01-31 00:00:00+00:00             10.1   
2    5.3  Albuquerque 2013-02-02 00:00:00+00:00             11.6   
3    4.6  Albuquerque 2013-02-04 00:00:00+00:00             12.4   
4    4.9  Albuquerque 2013-02-18 00:00:00+00:00             15.6   

   temperature_min  precipitation_sum  rain_sum  snowfall_sum  \
0              3.4                6.6       6.6           0.0   
1             -1.3                0.0       0.0           0.0   
2             -2.0                0.0       0.0           0.0   
3             -0.8                0.0       0.0           0.0   
4             -0.6                0.0       0.0           0.0   

   precipitation_hours  wind_speed_max  wind_gusts_max  \
0                  6.0 

In [17]:
missing_values_count = merged_data_df.isnull().sum()
print("\n Missing Value Count Per Column (Before Cleaning)")
print(missing_values_count)


--- Missing Value Count Per Column (Before Cleaning) ---
pm2_5                      0
city_name                  0
date                       0
temperature_max            0
temperature_min            0
precipitation_sum          0
rain_sum                   0
snowfall_sum               0
precipitation_hours        0
wind_speed_max             0
wind_gusts_max             0
wind_direction_dominant    0
log_precipitation_sum      0
log_rain_sum               0
log_snowfall_sum           0
log_precipitation_hours    0
log_wind_speed_max         0
log_wind_gusts_max         0
dtype: int64


In [32]:
X_train, X_test, y_train, y_test = feature_view.train_test_split(test_size=0.2)


Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (9.15s) 
2025-12-08 01:59:48,465 INFO: Provenance cached data - overwriting last accessed/created training dataset from 2 to 3.


In [33]:
X_train

Unnamed: 0,city_name,date,temperature_max,temperature_min,precipitation_sum,rain_sum,snowfall_sum,precipitation_hours,wind_speed_max,wind_gusts_max,wind_direction_dominant,log_precipitation_sum,log_rain_sum,log_snowfall_sum,log_precipitation_hours,log_wind_speed_max,log_wind_gusts_max
0,Denver,2013-01-01 00:00:00+00:00,-3.3,-19.4,0.0,0.0,0.00,0.0,8.7,19.1,235,0.000000,0.000000,0.000000,0.000000,2.272126,3.000720
1,Dallas,2013-01-02 00:00:00+00:00,5.3,1.0,0.0,0.0,0.00,0.0,17.4,34.9,335,0.000000,0.000000,0.000000,0.000000,2.912351,3.580737
2,Tampa,2013-01-02 00:00:00+00:00,23.6,15.7,0.0,0.0,0.00,0.0,14.9,29.2,207,0.000000,0.000000,0.000000,0.000000,2.766319,3.407842
3,Tallinn,2013-01-03 00:00:00+00:00,1.2,0.1,1.6,0.9,0.49,4.0,22.6,37.1,246,0.955511,0.641854,0.398776,1.609438,3.161247,3.640214
4,Varna,2013-01-03 00:00:00+00:00,7.7,-0.6,0.0,0.0,0.00,0.0,15.8,30.2,296,0.000000,0.000000,0.000000,0.000000,2.821379,3.440418
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
112585,Los Angeles,2023-04-09 00:00:00+00:00,24.8,9.2,0.0,0.0,0.00,0.0,12.7,28.1,229,0.000000,0.000000,0.000000,0.000000,2.617396,3.370738
112586,San Francisco,2023-04-09 00:00:00+00:00,18.6,8.1,0.0,0.0,0.00,0.0,16.1,28.8,301,0.000000,0.000000,0.000000,0.000000,2.839078,3.394508
112587,Wien,2023-04-09 00:00:00+00:00,10.5,1.3,2.9,2.9,0.00,14.0,8.7,22.7,342,1.360977,1.360977,0.000000,2.708050,2.272126,3.165475
112588,Denver,2023-04-10 00:00:00+00:00,21.7,0.4,0.0,0.0,0.00,0.0,11.1,23.8,230,0.000000,0.000000,0.000000,0.000000,2.493205,3.210844


In [34]:
y_train

Unnamed: 0,pm2_5
0,13.000000
1,17.900000
2,6.400000
3,2.000000
4,48.000000
...,...
112585,9.091176
112586,18.994118
112587,15.000000
112588,7.333333


In [37]:
# Define the directory
split_dir = "train_test_split"

# Create the directory if it doesn't exist
os.makedirs(split_dir, exist_ok=True)

# Save the CSV files inside the directory
X_train.to_csv(os.path.join(split_dir, "X_train.csv"), index=False)
y_train.to_csv(os.path.join(split_dir, "Y_train.csv"), index=False)
X_test.to_csv(os.path.join(split_dir, "X_test.csv"), index=False)
y_test.to_csv(os.path.join(split_dir, "Y_test.csv"), index=False)

print(f" Train/test CSV files saved in '{split_dir}' directory.")



 Train/test CSV files saved in 'train_test_split' directory.
