# Air Quality Prediction

[World's Air Pollution: Real-Time Air Quality Index](https://waqi.info/)

https://aqicn.org/json-api/doc/

## Prepare environment

In [1]:
import random
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.tsa.seasonal import seasonal_decompose

from scipy.signal import periodogram

import torch
import torch.nn as nn
import torch.optim as optim

# Load environment variables
from dotenv import load_dotenv
load_dotenv()

# Make random numbers stable for reproduction
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

# Add sources to the path
import sys
from pathlib import Path
PROJECT_ROOT = str(Path().resolve().parent)
sys.path.append(PROJECT_ROOT)

from src.data import aqi, meteo
from src.data.calendar import add_calendar_features
from src.data.features import FeatureScaler, split_to_windows, flatten_windows, _flatten_windows # TODO: don't use internal methods
from src.model.training import split_data
from src.model.evaluation import evaluate_iaqi_predictions, create_metrics_dataframe, get_day_n_metrics
from src.model.inference import recursive_forecasting
from src.model import xgboost
from src.hopsworks.client import HopsworksClient
from src.common import LOGGER_NAME, IAQI_FEATURES

import logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)

LOGGER = logging.getLogger(LOGGER_NAME)
LOGGER.setLevel(logging.DEBUG)

## Configuration

In [None]:
# How many (lagged) days to use as input during training
historical_window_size = 3
# How many days to teach the model to predict 
prediction_window_size = 3
# TODO: use recursive_forecasting boolean instead
# How many predictions to do as part of recursive forecasting
num_of_predictions = 1

## Prepare data

### Load data

In [None]:
aqi_df = aqi.load_data(PROJECT_ROOT)

In [None]:
# Make sure column names are stripped
aqi_df.columns

In [None]:
aqi_df.head()

In [None]:
aqi_df.describe(percentiles=[0.25, 0.5, 0.75, 0.9, 0.95])

### Handle missing dates

In [None]:
aqi_df = aqi.clean_missing_dates(aqi_df)

### Handle N/A values

In [None]:
aqi_df = aqi.clean_missing_values(aqi_df)

## Feature engineering

### Calendar Features

In [None]:
aqi_df = add_calendar_features(aqi_df)
aqi_df.head()

### Cyclical features

TODO: 
For cyclical features (month, day_of_week, hour, day_of_year), it's often better to transform them
into sine and cosine components to preserve the cyclical nature and avoid arbitrary ordinal relationships.

### Meteorological data

[Meteorological Parameters](https://dev.meteostat.net/formats.html#meteorological-parameters)

In [None]:
weather_df = meteo.fetch_daily_data(aqi_df)
weather_df.head()

In [None]:
weather_df.describe()

In [None]:
weather_df = meteo.clean_missing_values(weather_df)

In [None]:
merged_df = pd.merge_asof(aqi_df, weather_df, left_index=True, right_index=True)

In [None]:
merged_df.head()

In [None]:
all_columns = merged_df.columns

### Target columns

In [None]:
target_columns = all_columns

### Convert all features to float

Converting all features to floats is a fundamental preprocessing step for neural networks. It ensures compatibility with the underlying mathematical operations, facilitates normalization, and aligns with the requirements of deep learning frameworks.

In [None]:
merged_df = merged_df.astype(float)

In [None]:
merged_df.dtypes

## Explore trends

In [None]:
plt.figure(figsize=(12, 6))
for year, group in aqi_df.groupby("year"):
    if (year in [2022, 2023]):
        plt.plot(group["day_of_year"], group["pm25"], label=str(year), alpha=0.7)

plt.xlabel("Day of Year")
plt.ylabel("PM2.5")
plt.title("Yearly PM2.5 Trends Overlaid by Day of Year")
plt.legend(title="Year", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()


In [None]:
plt.figure(figsize=(10, 5))
plot_acf(aqi_df["pm25"], lags=365)
plt.title("Autocorrelation Function (ACF) for PM2.5")
plt.xlabel("Lag")
plt.ylabel("Autocorrelation")
plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(14, 8))
sns.boxplot(x="month", y="pm25", data=aqi_df)
plt.title("Seasonal Subseries Plot: PM2.5 by Month")
plt.xlabel("Month")
plt.ylabel("PM2.5")
plt.show()

In [None]:
# Ensure pm25 is float and has no missing values for decomposition
pm25_series = aqi_df["pm25"].astype(float).interpolate()

result = seasonal_decompose(pm25_series, model='additive', period=365)

plt.figure(figsize=(14, 10))
result.plot()
plt.suptitle("Seasonal Decomposition of PM2.5 Time Series", fontsize=16)
plt.tight_layout(rect=[0, 0, 1, 0.97])
plt.show()

In [None]:
# Fill missing values in pm25 for spectral analysis
pm25_filled = aqi_df["pm25"].astype(float).values

# Compute the periodogram
freqs, power = periodogram(pm25_filled)

plt.figure(figsize=(12, 6))
plt.semilogy(freqs, power)
plt.title("Spectral Analysis (Periodogram) of PM2.5")
plt.xlabel("Frequency")
plt.ylabel("Power Spectral Density")
plt.tight_layout()
plt.show()

## Split Data into Training, Validation, and Test Sets

In [None]:
train_df, val_df, test_df = split_data(merged_df)

## Feature Scaling

Not strictly required for tree-based models (Random Forest, Gradient Boosting like XGBoost/LightGBM) as they are scale-invariant, but also don't hurt performance of these models.
However, it is a must for Neural Networks.

In [None]:
feature_scaler = FeatureScaler()
feature_scaler.fit(train_df)

train_df = feature_scaler.transform(train_df)
val_df = feature_scaler.transform(val_df)
test_df = feature_scaler.transform(test_df)

In [None]:
train_df.head()

## Prepare prediction windows

In [None]:
(
    X_window_train,
    X_window_val,
    X_window_test,
    y_window_train,
    y_window_val,
    y_window_test,
) = split_to_windows(
    train_df, val_df, test_df, historical_window_size, prediction_window_size, target_columns=target_columns
)

In [None]:
X_window_train[-1]

In [None]:
y_window_train[-1]

## Training and Evaluation

In [None]:
# Flatten for regressors
X_flat_train, X_flat_val, X_flat_test, y_flat_train, y_flat_val, y_flat_test = flatten_windows(X_window_train, X_window_val, X_window_test, y_window_train, y_window_val, y_window_test)

In [None]:
X_flat_train[-1]

In [None]:
y_flat_train[-1]

### XGBoost

In [None]:
xgboost_model = xgboost.create_regressor()
xgboost_model.fit(X_flat_train, y_flat_train)

In [None]:
actual, predictions = recursive_forecasting(xgboost_model, test_df, historical_window_size, prediction_window_size, num_of_predictions, torch=False)

predictions = [feature_scaler.inverse_transform(prediction) for prediction in predictions]
actual = [feature_scaler.inverse_transform(value) for value in actual]

xgboost_prediction_metrics = evaluate_iaqi_predictions(actual, predictions, prediction_window_size, num_of_predictions)
xgboost_prediction_metrics

In [None]:
xboost_metrics_df = create_metrics_dataframe(xgboost_prediction_metrics)
print(xboost_metrics_df.to_string(float_format="%.2f"))

In [None]:
feature_names = []
for i in reversed(range(historical_window_size)):
    for column in merged_df.columns:
        feature_names.append(f"{column}_lag_{i + 1}d")

In [None]:
# Extract feature importance from each target model
importance_df = pd.DataFrame()

for i, target_name in enumerate(merged_df.columns):  # Replace with your target names
    estimator = xgboost_model.estimators_[i]
    importance_df[target_name] = estimator.feature_importances_

# Set feature names as index
importance_df.index = feature_names  # Set index to feature names

# Display top features for each target
for target in importance_df.columns:
    print(f"\n{target} - Top 10 Features:")
    target_imp = importance_df[target].sort_values(ascending=False).head(10)
    for i, (feat, score) in enumerate(target_imp.items()):
        print(f"{i+1:2d}. {feat:<20} : {score:.4f}")

# Overall top features (averaged across all targets)
overall_top = importance_df.mean(axis=1).sort_values(ascending=False).head(10)
print(f"\nOverall Top 10 Features:")
for i, (feat, score) in enumerate(overall_top.items()):
    print(f"{i+1:2d}. {feat:<20} : {score:.4f}")

### Neural Networks

In [None]:
class EarlyStopping:
    """Early stopping utility to monitor validation loss and save best weights"""

    def __init__(self, patience=10, min_delta=1e-6, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights

        self.best_loss = float("inf")
        self.counter = 0
        self.best_weights = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            # Validation loss improved
            self.best_loss = val_loss
            self.counter = 0
            # Save best weights
            if self.restore_best_weights:
                self.best_weights = {
                    k: v.clone().detach() for k, v in model.state_dict().items()
                }
        else:
            # No improvement
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

        return self.early_stop

    def restore_best_weights_to_model(self, model):
        """Restore the best weights to the model"""
        if self.best_weights is not None:
            model.load_state_dict(self.best_weights)
            print(f"Restored best weights (val_loss: {self.best_loss:.6f})")

In [None]:
# Prepare data for LSTM: convert windowed data to tensors
def windows_to_tensor(X_windows, y_windows):
    # X: list of DataFrames, each (window_size, num_features)
    # y: list of DataFrames, each (prediction_window_size, num_targets)
    X_tensor = torch.tensor(
        np.stack([x.values for x in X_windows]), dtype=torch.float32
    )
    y_tensor = torch.tensor(
        np.stack([y.values for y in y_windows]), dtype=torch.float32
    )
    return X_tensor, y_tensor

In [None]:
X_lstm_train, y_lstm_train = windows_to_tensor(X_window_train, y_window_train)
X_lstm_val, y_lstm_val = windows_to_tensor(X_window_val, y_window_val)
X_lstm_test, y_lstm_test = windows_to_tensor(X_window_test, y_window_test)

In [None]:
X_lstm_train.shape

In [None]:
y_lstm_train.shape

In [None]:
class LSTMRegressor(nn.Module):

    def __init__(
        self, input_dim, hidden_dim, output_dim, num_layers=1, prediction_window_size=3
    ):
        super().__init__()

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.1)
        self.dropout = nn.Dropout(0.5)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)

        self.fc = nn.Linear(hidden_dim, output_dim * prediction_window_size)
        
        self.output_dim = output_dim
        self.prediction_window_size = prediction_window_size

    def forward(self, x):
        # x: (batch, seq_len, input_dim)
        out, _ = self.lstm(x)
        # Use last hidden state for prediction
        out = out[:, -1, :]
        out = self.dropout(out)
        out = self.batch_norm(out)
        out = self.fc(out)
        # Reshape to (batch, prediction_window_size, output_dim)
        out = out.view(-1, self.prediction_window_size, self.output_dim)
        return out

In [None]:
input_dim = X_lstm_train.shape[2]
output_dim = y_lstm_train.shape[2]
hidden_dim = 64
num_layers = 2
prediction_window_size = y_lstm_train.shape[1]

model = LSTMRegressor(
    input_dim, hidden_dim, output_dim, num_layers, prediction_window_size
)
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.MSELoss()
early_stopping = EarlyStopping(patience=15, min_delta=1e-6)

# Training loop
epochs = 1000
batch_size = 64

for epoch in range(epochs):
    model.train()
    permutation = torch.randperm(X_lstm_train.size(0))
    epoch_loss = 0
    for i in range(0, X_lstm_train.size(0), batch_size):
        idx = permutation[i : i + batch_size]
        batch_X, batch_y = X_lstm_train[idx], y_lstm_train[idx]
        optimizer.zero_grad()
        output = model(batch_X)
        loss = criterion(output, batch_y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item() * batch_X.size(0)
    epoch_loss /= X_lstm_train.size(0)

    # Validation loss
    model.eval()
    with torch.no_grad():
        val_output = model(X_lstm_val)
        val_loss = criterion(val_output, y_lstm_val).item()
    
    # Print progress
    print(
        f"Epoch {epoch+1}/{epochs} | "
        f"Train Loss: {epoch_loss:.4f} | "
        f"Val Loss: {val_loss:.4f} | "
        f"Best Val: {early_stopping.best_loss:.6f}"
    )
    
    # Early stopping check
    if early_stopping(val_loss, model):
        best_epoch = epoch + 1 - early_stopping.patience
        print(f"\nEarly stopping triggered at epoch {epoch + 1}")
        print(f"Best epoch was {best_epoch} with val_loss: {early_stopping.best_loss:.6f}")
        break

    # Update best epoch if this is the best so far
    if val_loss == early_stopping.best_loss:
        best_epoch = epoch + 1

# Restore best weights
early_stopping.restore_best_weights_to_model(model)

# Training completed
final_status = "Early stopped" if early_stopping.early_stop else "Completed"
print(f"\nTraining {final_status.lower()} after {epoch + 1} epochs")
print(f"Best validation loss: {early_stopping.best_loss:.6f} at epoch {best_epoch}")

In [None]:
actual, predictions = recursive_forecasting(model, test_df, historical_window_size, prediction_window_size, num_of_predictions, torch=True)

predictions = [feature_scaler.inverse_transform(prediction) for prediction in predictions]
actual = [feature_scaler.inverse_transform(value) for value in actual]

lstm_prediction_metrics = evaluate_iaqi_predictions(y_true=actual, y_pred=predictions, prediction_window_size=prediction_window_size, num_of_predictions=num_of_predictions)
lstm_prediction_metrics

In [None]:
lstm_metrics_df = create_metrics_dataframe(lstm_prediction_metrics)
print(lstm_metrics_df.to_string(float_format="%.2f"))

## Save Model

In [None]:
# Using recursive forecasting - error is cumulative - cannot improve Day 3 prediction without improving Day 1 - so last day's results are enough
last_day_metrics = get_day_n_metrics(xgboost_prediction_metrics, prediction_window_size * num_of_predictions)
# Using single metric for model comparison - The Willmott index - it gives credit for correlation but heavily penalizes systematic errors that would make the forecasts unreliable for air quality management.
metrics = last_day_metrics["Willmott"]
metrics

In [None]:
hopsworks_model = HopsworksClient().save_model(PROJECT_ROOT, xgboost_model, metrics, X_flat_test[0], y_flat_test[0], feature_scaler)

## Real world prediction

In [None]:
# Disabled for fast iteration (for full test - uncomment)
# hopsworks_model, multi_regressor = HopsworksClient().load_model(version=hopsworks_model.version)

In [None]:
# TODO: move to inference module

# Take last {historical_window_size} items for {prediction_window_size} predictions 
X = merged_df[-historical_window_size:]
X = feature_scaler.transform(X)

for day_index in range(num_of_predictions):
    # Input expects multiple windows
    X_flat = _flatten_windows([X])

    y_pred = xgboost_model.predict(X_flat)

    # Split y_pred into 3 arrays, one for each prediction day
    y_pred_split = np.split(y_pred.flatten(), prediction_window_size)

    # Create DataFrame for predictions, each row is a prediction day
    predictions_df = pd.DataFrame(y_pred_split, columns=target_columns)

    # Set the index to continue from the last date in merged_df
    last_date = X.index[-1]
    future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=prediction_window_size, freq="D")
    predictions_df.index = future_dates

    # Add predictions to the end so that we can use them as input (don't forget to remove the same number of items as we added - model expects certain size)
    X = pd.concat([X[prediction_window_size:], predictions_df], axis=0)

X = feature_scaler.inverse_transform(X)

# Definition of Air Quality Index is maximum value of Individual Air Quality Indexes
X["aqi"] = X[IAQI_FEATURES].max(axis=1)

result = X[-num_of_predictions*prediction_window_size:][IAQI_FEATURES]

print(result)

## Hopsworks deployment

In [None]:
deployment = HopsworksClient().deploy_model(hopsworks_model, overwrite=True)
deployment.describe()

In [None]:
# Use for already deployed model

# deployment_name = "aqipredictionmodeldeployment"
# model_serving = HopsworksClient().project.get_model_serving()

# deployment = model_serving.get_deployment(deployment_name)

In [3]:
deployment.start(await_running=300)
# make predictions
predictions = deployment.predict({"instances":[["not_empty"]]})
print(predictions)
deployment.stop(await_stopped=180)

  0%|          | 0/5 [00:00<?, ?it/s]

Start making predictions by using `.predict()`
This may lead to errors when urllib3 tries to modify verify_mode.
Please report an issue at https://gitlab.com/alelec/pip-system-certs with your
python version included in the description


{'predictions': '{"pm25":{"1754438400000":38.8687515259,"1754524800000":39.4622612,"1754611200000":44.2506866455},"pm10":{"1754438400000":14.111992836,"1754524800000":15.1734304428,"1754611200000":14.6231155396},"no2":{"1754438400000":2.5830421448,"1754524800000":3.107077837,"1754611200000":2.6003279686},"so2":{"1754438400000":2.7117853165,"1754524800000":2.931812048,"1754611200000":2.7034041882},"co":{"1754438400000":4.2741427422,"1754524800000":3.9961788654,"1754611200000":3.9455645084}}'}


  0%|          | 0/3 [00:00<?, ?it/s]

In [None]:
deployment.get_logs(component='predictor')