# Imports

In [12]:
import pandas as pd
from datetime import datetime, timedelta
import json
import time
import os
import numpy as np


In [13]:
# Data directory based on root folder
DATA_DIR = 'data'


# GET ABSOLUTE PATH
head, _ = os.path.split(os.getcwd()) # Get parent directory, from notebooks
DATA_DIR = os.path.join(head, 'data')

# DATA LOADING

In [14]:
def load_data(filename: str, prefix: str="", data_dir: os.PathLike=DATA_DIR):
    """Load data file with a specific prefix"""

    filename = f"{prefix}-{filename}" if len(prefix) else filename

    filepath = os.path.join(data_dir, filename)
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Data file not found: {filepath}. Run scraping first.")

    df = pd.read_csv(filepath)
    df.columns = df.columns.str.strip()
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])

    # if 'date' in df.columns:
    #     df['date'] = pd.to_datetime(df['date'])

    return df

# ============================================================================
# DATA LOADING FUNCTIONS WINDSPEED AND DIRECTION
# ============================================================================

def load_all_stations(input_dir: os.PathLike =DATA_DIR, file_pattern: str=""):
    """Load data for all stations"""
    import glob
    pattern = os.path.join(input_dir, file_pattern)
    station_files = glob.glob(pattern)

    station_dfs = []
    for filepath in station_files:
        filename = os.path.basename(filepath)
        station_dfs.append(load_data(filename, data_dir=input_dir))

    return pd.concat(station_dfs).reset_index(drop=True)

def load_stations_metadata(input_dir: os.PathLike=DATA_DIR):
    """Load stations metadata"""
    filename = os.path.join(input_dir, "stations_metadata.csv")
    if not os.path.exists(filename):
        raise FileNotFoundError(f"Metadata file not found: {filename}")

    return pd.read_csv(filename)


In [15]:
REGIONS_PREFIX = ['east', 'west', 'north', 'south', 'central']

pm25_hourly_df_temp = {region: load_data(f"pm2.5-hourly-230701-251101.csv", prefix=region, data_dir=os.path.join(DATA_DIR, "pm2.5-hourly")) for region in REGIONS_PREFIX}
pm25_hourly_df = []
for region, df in pm25_hourly_df_temp.items():
    df['region'] = region
    pm25_hourly_df.append(df)
pm25_hourly_df = pd.concat(pm25_hourly_df).reset_index(drop=True)
pm25_hourly_df['region'] = pm25_hourly_df['region'].astype('category')
pm25_hourly_df['pm25'] = pd.to_numeric(pm25_hourly_df['pm25'], errors='coerce')
pm25_hourly_df.drop(columns=['date'], axis=1, inplace=True)
del pm25_hourly_df_temp


pm25_daily_df_temp = {region: load_data(f"singapore-air-quality.csv", prefix=region, data_dir=os.path.join(DATA_DIR, "pm2.5-daily")) for region in REGIONS_PREFIX}
pm25_daily_df = []
for region, df in pm25_daily_df_temp.items():
    df['region'] = region
    pm25_daily_df.append(df)
pm25_daily_df = pd.concat(pm25_daily_df).reset_index(drop=True)
pm25_daily_df['region'] = pm25_daily_df['region'].astype('category')
pm25_daily_df.drop(columns=['pm10', 'o3', 'no2', 'so2', 'co', 'psi'], axis=1, inplace=True)
pm25_daily_df['pm25'] = pd.to_numeric(pm25_daily_df['pm25'], errors='coerce')
del pm25_daily_df_temp


wind_speed_df = load_all_stations(os.path.join(DATA_DIR, 'wind-speed'), file_pattern="*-wind-speed-hourly-*.csv")
wind_speed_df.drop(columns=['last_update'], axis=1, inplace=True)
wind_speed_df['station_name'] = wind_speed_df['station_name'].astype('category')

wind_direction_df = load_all_stations(os.path.join(DATA_DIR, 'wind-direction'), file_pattern="*-wind-direction-hourly-*.csv")
wind_direction_df.drop(columns=['last_update'], axis=1, inplace=True)
wind_direction_df['station_name'] = wind_direction_df['station_name'].astype('category')



In [16]:
# PLAYGROUND AREA TO VIEW LOADED DATAFRAMES

# FEATURE ENGINEERING FOR REGRESSION

In [17]:
from typing import Callable


def regression_features_pm25_daily(df: pd.DataFrame):
    """Prepare features for time series regression modeling"""
    df = df.copy().sort_values('timestamp').reset_index(drop=True)

    # Time-based features
    df['day_of_week'] = (df['timestamp'].dt.dayofweek).astype('int8')
    df['day_of_month'] = (df['timestamp'].dt.day).astype('int8')
    df['month'] = (df['timestamp'].dt.month).astype('int8')
    df['year'] = df['timestamp'].dt.year
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(int)

    # Lag features
    for lag in [1, 2, 3, 4, 5, 6, 7, 14, 28]:
        df[f'pm25_lag_{lag}d'] = df['pm25'].shift(lag)

    # Rolling statistics
    for window in [3, 7, 14, 28]:
        min_valid = max(1, window // 2)
        df[f'pm25_rolling_mean_{window}d'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).mean()
        df[f'pm25_rolling_std_{window}d'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).std()
        df[f'pm25_rolling_min_{window}d'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).min()
        df[f'pm25_rolling_max_{window}d'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).max()

    df_clean = df.dropna()

    return df_clean


def regression_features_pm25_hourly(df: pd.DataFrame):
    """Prepare features for time series regression modeling"""
    df = df.copy().sort_values('timestamp').reset_index(drop=True)

    # Time-based features
    df['hour'] = (df['timestamp'].dt.hour).astype('int8')
    df['day_of_week'] = (df['timestamp'].dt.dayofweek).astype('int8')
    df['day_of_month'] = (df['timestamp'].dt.day).astype('int8')
    df['month'] = (df['timestamp'].dt.month).astype('int8')
    df['year'] = df['timestamp'].dt.year
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(bool)

    # Lag features
    for lag in [1, 2, 3, 6, 12, 24]:
        df[f'pm25_lag_{lag}h'] = df['pm25'].shift(lag)

    # Rolling statistics
    for window in [6, 12, 24, 72, 168]:
        min_valid = max(1, window // 2)
        df[f'pm25_rolling_mean_{window}h'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).mean()
        df[f'pm25_rolling_std_{window}h'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).std()
        df[f'pm25_rolling_min_{window}h'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).min()
        df[f'pm25_rolling_max_{window}h'] = df['pm25'].shift(1).rolling(window, min_periods=min_valid).max()

    df_clean = df.dropna()

    return df_clean


def regression_features_wind_direction(df: pd.DataFrame):
    """Prepare features for time series regression modeling"""
    df = df.copy().sort_values('timestamp').reset_index(drop=True)

    # Time-based features
    df['hour'] = (df['timestamp'].dt.hour).astype('int8')
    df['day_of_week'] = (df['timestamp'].dt.dayofweek).astype('int8')
    df['day_of_month'] = (df['timestamp'].dt.day).astype('int8')
    df['month'] = (df['timestamp'].dt.month).astype('int8')
    df['year'] = df['timestamp'].dt.year
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(bool)

    # Convert direction to vector components (recommended for modeling)
    df['wind_u'] = -np.sin(np.deg2rad(df['wind_direction_avg']))
    df['wind_v'] = -np.cos(np.deg2rad(df['wind_direction_avg']))

    # Lag features (using vector components)
    for lag in [1, 2, 3, 6, 12, 24, 48, 72, 168]:
        df[f'wind_u_lag_{lag}h'] = df['wind_u'].shift(lag)
        df[f'wind_v_lag_{lag}h'] = df['wind_v'].shift(lag)
        df[f'wind_direction_lag_{lag}h'] = df['wind_direction_avg'].shift(lag)

    # Rolling statistics
    for window in [6, 12, 24, 72, 168]:
        min_valid = max(1, window // 2)
        df[f'wind_u_rolling_mean_{window}h'] = df['wind_u'].shift(1).rolling(window, min_periods=min_valid).mean()
        df[f'wind_v_rolling_mean_{window}h'] = df['wind_v'].shift(1).rolling(window, min_periods=min_valid).mean()
        df[f'direction_std_rolling_mean_{window}h'] = df['wind_direction_std'].shift(1).rolling(window, min_periods=min_valid).mean()

    df_clean = df.dropna()

    return df_clean


def regression_features_wind_speed(df: pd.DataFrame):
    """Prepare features for time series regression modeling"""
    df = df.copy().sort_values('timestamp').reset_index(drop=True)

    # Time-based features
    df['hour'] = (df['timestamp'].dt.hour).astype('int8')
    df['day_of_week'] = (df['timestamp'].dt.dayofweek).astype('int8')
    df['day_of_month'] = (df['timestamp'].dt.day).astype('int8')
    df['month'] = (df['timestamp'].dt.month).astype('int8')
    df['year'] = df['timestamp'].dt.year
    df['is_weekend'] = (df['day_of_week'] >= 5).astype(bool)

    # Lag features
    for lag in [1, 2, 3, 6, 12, 24, 48, 72, 168]:
        df[f'wind_speed_lag_{lag}h'] = df['wind_speed_avg'].shift(lag)

    # Rolling statistics
    for window in [6, 12, 24, 72, 168]:
        min_valid = max(1, window // 2)
        df[f'wind_speed_rolling_mean_{window}h'] = df['wind_speed_avg'].shift(1).rolling(window, min_periods=min_valid).mean()
        df[f'wind_speed_rolling_std_{window}h'] = df['wind_speed_avg'].shift(1).rolling(window, min_periods=min_valid).std()
        df[f'wind_speed_rolling_min_{window}h'] = df['wind_speed_avg'].shift(1).rolling(window, min_periods=min_valid).min()
        df[f'wind_speed_rolling_max_{window}h'] = df['wind_speed_avg'].shift(1).rolling(window, min_periods=min_valid).max()

    df_clean = df.dropna()

    return df_clean


def apply_func_to_groups(df: pd.DataFrame, group_col: str, func: Callable[[pd.DataFrame], pd.DataFrame]) -> pd.DataFrame:
    """Apply a function to each group in the DataFrame and combine results"""
    grouped = df.groupby(group_col)
    processed_groups = []

    for name, group in grouped:
        processed_group = func(group)
        processed_groups.append(processed_group)

    return pd.concat(processed_groups).reset_index(drop=True)

In [18]:
pm25_daily_df = apply_func_to_groups(pm25_daily_df, 'region', regression_features_pm25_daily)
pm25_hourly_df = apply_func_to_groups(pm25_hourly_df, 'region', regression_features_pm25_hourly)
wind_direction_df = apply_func_to_groups(wind_direction_df, 'station_name', regression_features_wind_direction)
wind_speed_df = apply_func_to_groups(wind_speed_df, 'station_name', regression_features_wind_speed)




In [19]:
print(pm25_daily_df.info())
# print(pm25_daily_df.dtypes)
# print(pm25_daily_df.columns)
print(pm25_hourly_df.info())
# print(pm25_hourly_df.dtypes)
# print(pm25_hourly_df.columns)
print(wind_direction_df.info())
# print(wind_direction_df.dtypes)
# print(wind_direction_df.columns)
print(wind_speed_df.info())
# print(wind_speed_df.dtypes)
# print(wind_speed_df.columns)


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20774 entries, 0 to 20773
Data columns (total 33 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   timestamp              20774 non-null  datetime64[ns]
 1   pm25                   20774 non-null  float64       
 2   region                 20774 non-null  category      
 3   day_of_week            20774 non-null  int8          
 4   day_of_month           20774 non-null  int8          
 5   month                  20774 non-null  int8          
 6   year                   20774 non-null  int32         
 7   is_weekend             20774 non-null  int64         
 8   pm25_lag_1d            20774 non-null  float64       
 9   pm25_lag_2d            20774 non-null  float64       
 10  pm25_lag_3d            20774 non-null  float64       
 11  pm25_lag_4d            20774 non-null  float64       
 12  pm25_lag_5d            20774 non-null  float64       
 13  p

# Data Ingestion

In [20]:
import hopsworks
project = hopsworks.login(engine="python")
fs = project.get_feature_store()

2025-11-09 16:47:31,522 INFO: Closing external client and cleaning up certificates.
Connection closed.
2025-11-09 16:47:31,525 INFO: Initializing external client
2025-11-09 16:47:31,525 INFO: Base URL: https://c.app.hopsworks.ai:443




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'


2025-11-09 16:47:33,314 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1277076


In [21]:
# Feature Group 1: Hourly PM2.5 features
fg_pm25_hourly = fs.get_or_create_feature_group(
    name="pm25_hourly",
    description="Hourly PM2.5 features with short-term patterns",
    version=1,
    primary_key=["region", "timestamp"],
    partition_key=["region"],
    event_time="timestamp",
)
fg_pm25_hourly.insert(pm25_hourly_df)

# Feature Group 2: Daily PM2.5 features
fg_pm25_daily = fs.get_or_create_feature_group(
    name="pm25_daily",
    description="Daily PM2.5 aggregations for long-term trends",
    version=1,
    primary_key=["region", "timestamp"],
    partition_key=["region"],
    event_time="timestamp",
)
fg_pm25_daily.insert(pm25_daily_df)

# Feature Group 3: Wind Direction features
fg_wind_direction = fs.get_or_create_feature_group(
    name="wind_direction_hourly",
    description="Wind direction features with vector components",
    version=1,
    primary_key=["station_name", "timestamp"],
    partition_key=["station_name"],
    event_time="timestamp",
)
fg_wind_direction.insert(wind_direction_df)

# Feature Group 4: Wind Speed features
fg_wind_speed = fs.get_or_create_feature_group(
    name="wind_speed_hourly",
    description="Wind speed features",
    version=1,
    primary_key=["station_name", "timestamp"],
    partition_key=["station_name"],
    event_time="timestamp",
)
fg_wind_speed.insert(wind_speed_df)


Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1277076/fs/1263683/fg/1638020


Uploading Dataframe: 100.00% |██████████| Rows 100857/100857 | Elapsed Time: 00:03 | Remaining Time: 00:00


Launching job: pm25_hourly_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1277076/jobs/named/pm25_hourly_1_offline_fg_materialization/executions
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1277076/fs/1263683/fg/1668587


Uploading Dataframe: 100.00% |██████████| Rows 20774/20774 | Elapsed Time: 00:01 | Remaining Time: 00:00


Launching job: pm25_daily_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1277076/jobs/named/pm25_daily_1_offline_fg_materialization/executions
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1277076/fs/1263683/fg/1638021


Uploading Dataframe: 100.00% |██████████| Rows 303560/303560 | Elapsed Time: 00:15 | Remaining Time: 00:00


Launching job: wind_direction_hourly_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1277076/jobs/named/wind_direction_hourly_1_offline_fg_materialization/executions
Feature Group created successfully, explore it at 
https://c.app.hopsworks.ai:443/p/1277076/fs/1263683/fg/1668588


Uploading Dataframe: 100.00% |██████████| Rows 303561/303561 | Elapsed Time: 00:11 | Remaining Time: 00:00


Launching job: wind_speed_hourly_1_offline_fg_materialization
Job started successfully, you can follow the progress at 
https://c.app.hopsworks.ai:443/p/1277076/jobs/named/wind_speed_hourly_1_offline_fg_materialization/executions


(Job('wind_speed_hourly_1_offline_fg_materialization', 'SPARK'), None)