 Imports and configuration

In [1]:
from __future__ import annotations

from pathlib import Path
from typing import Tuple, List

import numpy as np  # type: ignore
from typing import Any
import pandas as pd  # type: ignore
import matplotlib.pyplot as plt  # type: ignore

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler  # type: ignore MinMaxScaler performed more poorly
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score  # type: ignore
from tensorflow.keras.models import Sequential  # type: ignore
from tensorflow.keras.layers import LSTM, Dense, Dropout  # type: ignore
from tensorflow.keras.callbacks import EarlyStopping  # type: ignore
# Cell: GPU diagnostics
import tensorflow as tf

from sklearn.model_selection import train_test_split

plt.style.use("seaborn-v0_8")
RANDOM_SEED: int = 42
np.random.seed(RANDOM_SEED)

TARGET_COLUMN: str = "MeanTemp"  # the column to predict

LOOKBACK: int = 14  # use last x days to predict next day
PATIENCE: int = 3  # early stopping patience
EPOCHS: int = 50  # maximum number of training epochs
BATCH_SIZE: int = 16  # training batch size, reducing batch size improves accuracy but increases training time
DENSE_LAYER: int = 64  # number of neurons in dense layer
ACTIVATION: str = "relu"  # activation function for dense layer
LSTM_UNITS: int = 128
DROPOUT_RATE: float = 0.20  # dropout rate for regularization
TRAIN_RATIO: float = 0.70  # proportion of data to use for training
STA_WEIGHT: float = 2.0

# scaler to use for data normalization
# scaler: MinMaxScaler = MinMaxScaler(feature_range=(0, 1)) # performs worse than standard scaler
standard_scaler: StandardScaler = StandardScaler()

# ALL_FEATURES: List[str] = [
#     "STA",  # station identifier, predictions should be grouped per station
#     "Precip",
#     "WindGustSpd",
#     "MaxTemp",
#     "MinTemp",
#     "MeanTemp",
#     "Snowfall",
#     # 'PoorWeather',
#     "YR",
#     "MO",
#     # 'DA',
#     "PRCP",
#     # 'DR', -- too many missing values
#     # 'SPD', -- too many missing values
#     # 'MAX', -- will sqew results
#     # 'MIN', -- will sqew results
#     # 'MEA', mean temperature in fahrenheit ignore
#     # "SNF",
#     # 'SND',
#     # 'PGT'
# ]

STANDARD_FEATURES: List[str] = [
    "STA",  # station identifier, predictions should be grouped per station
    # "WindGustSpd",
    "MaxTemp",
    "MinTemp",
    "MeanTemp",
    # "PGT",
    # "YR",
    "MO",
    # "DA",
]

ROBUST_FEATURES: List[str] = ["Snowfall", "Precip"]

ALL_FEATURES = STANDARD_FEATURES + ROBUST_FEATURES

FEATURE_COLUMNS: List[str] = [
    'STA', # station identifier
    'Precip', 
    'WindGustSpd', 
    'MaxTemp', 
    'MinTemp', 
    'MeanTemp', 
    'Snowfall', 
    # 'PoorWeather', 
    'YR', 'MO', 
    # 'DA', 
    'PRCP', 
    # 'DR', -- too many missing values
    # 'SPD', 
    'MAX', 
    'MIN', 
    # 'MEA', mean temperature in fahrenheit ignore
    'SNF', 
    # 'SND', 
    # 'PGT'
]


GPU Diagnostics

In [2]:
if gpus := tf.config.list_physical_devices("GPU"):
    print(f"GPUs available: {gpus}")
    try:
        # Optional: restrict TensorFlow to the first GPU
        tf.config.set_visible_devices(gpus[0], "GPU")
        tf.config.experimental.set_memory_growth(gpus[0], True)
        logical_gpus = tf.config.list_logical_devices("GPU")
        print(f"Using GPU: {logical_gpus}")
    except RuntimeError as e:
        print(f"GPU config error: {e}")
else:
    print("No GPU found, running on CPU.")


No GPU found, running on CPU.


### Load and inspect data from Kaggle

In [3]:
DATA_PATH: Path = Path("Summary of Weather.csv")

df: pd.DataFrame = pd.read_csv(DATA_PATH)
df.head()

FileNotFoundError: [Errno 2] No such file or directory: 'Summary of Weather.csv'

### Show basic info

In [None]:
df.info()
df.describe(include="all").T

### Drop columns with no data

In [None]:
print("Original columns:", df.columns.tolist())
print(len(df.columns))
df = df.dropna(axis=1, how="all")

print("Remaining columns:", df.columns.tolist())
print(len(df.columns))

### Fill columns with zero where no values found

In [None]:
df[["PRCP", "Snowfall", "SNF", "Precip"]] = df[["PRCP", "Snowfall", "SNF", "Precip"]].fillna(0)

### Drop meaningless columns that would sqew results

In [None]:
columns_to_drop = ["SNF", "MAX", "MIN", "MEA", "PoorWeather", "TSHDSBRSGF", "PRCP"]

df = df.drop(columns=columns_to_drop)

### Drop duplicate columns

In [None]:
print(f"Duplicated columns: {df.duplicated().sum()}")

if df.duplicated().sum() > 0:
    df = df.drop_duplicates()

int(df.duplicated().sum())

### Process STA, Year, Month and DA as ints and fill missing values with 0

In [None]:
int_cols = ["STA", "YR", "MO", "DA"]

for col in int_cols:
    if col in df.columns:
        df[col] = df[col].fillna(0).astype(int)

### Select features and basic preprocessing

In [None]:
# Keep only needed columns + Date for reference
columns_to_use: list[str] = ["Date"] + ALL_FEATURES
df_model: pd.DataFrame = df[columns_to_use].copy()

# Handle 'T' in precipitation-like columns if they exist
for col in ["Precip", "PRCP"]:
    if col in df_model.columns:
        if df_model[col].dtype in ("str", "object"):
            df_model[col] = df_model[col].replace("T", 0.0)
        # Cast to float
        df_model[col] = pd.to_numeric(df_model[col], errors="coerce")

# Convert any remaining object columns that look numeric
for col in df_model.columns:
    if df_model[col].dtype == "object":
        # Try to coerce to numeric, keep non-numeric as NaN
        df_model[col] = pd.to_numeric(df_model[col], errors="coerce")

# Parse date and sort
df_model["Date"] = pd.to_datetime(df_model["Date"])
df_model = df_model.sort_values("Date").reset_index(drop=True)

# Drop rows with any missing selected feature values
df_model = df_model.dropna(subset=ALL_FEATURES).reset_index(drop=True)
df_model.head(5)

### Build supervised sequences for LSTM

In [None]:
def create_sequences(
    data: np.ndarray, target_index: int, lookback: int
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Create (X, y) sequences for LSTM from multivariate time series.

    Parameters
    ----------
    data : np.ndarray
        Time-series data of shape (n_samples, n_features).
    target_index : int
        Index of the feature column to be predicted.
    lookback : int
        Number of past time steps used as input.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray]
        X of shape (n_sequences, lookback, n_features),
        y of shape (n_sequences,).
    """
    X_list: list[np.ndarray] = []
    y_list: list[float] = []

    for i in range(lookback, len(data)):
        X_list.append(data[i - lookback : i])
        y_list.append(data[i, target_index])

    X: np.ndarray = np.array(X_list)
    y: np.ndarray = np.array(y_list)

    return X, y


### Scaling and sequence creation

In [None]:
# Extract feature values
features: np.ndarray = df_model[ALL_FEATURES].values

# scale to be able to train LSTM model
scaled_features: np.ndarray = standard_scaler.fit_transform(features)

# add more weight to STA feature
scaled_features[:, 1] *= STA_WEIGHT  # assuming STA is the first column

target_index: int = ALL_FEATURES.index(TARGET_COLUMN)

X_all, y_all = create_sequences(
    data=scaled_features, target_index=target_index, lookback=LOOKBACK
)

X_all.shape, y_all.shape


 Train/validation/test split

In [None]:
def train_val_test_split(
    X: np.ndarray, y: np.ndarray, train_ratio: float = 0.7, val_ratio: float = 0.15
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """
    Split time-series data into train, validation, and test partitions
    preserving temporal order.

    Parameters
    ----------
    X : np.ndarray
        Input sequences of shape (n, lookback, n_features).
    y : np.ndarray
        Targets of shape (n,).
    train_ratio : float
        Fraction of samples for training.
    val_ratio : float
        Fraction of samples for validation.

    Returns
    -------
    Tuple of arrays
        X_train, y_train, X_val, y_val, X_test, y_test.
    """
    n_samples: int = len(X)
    train_end: int = int(n_samples * train_ratio)
    val_end: int = int(n_samples * (train_ratio + val_ratio))

    X_train = X_all[:train_end]
    y_train = y_all[:train_end]

    X_val = X_all[train_end:val_end]
    y_val = y_all[train_end:val_end]

    X_test = X_all[val_end:]
    y_test = y_all[val_end:]

    return X_train, y_train, X_val, y_val, X_test, y_test


X_train, y_train, X_val, y_val, X_test, y_test = train_val_test_split(
    X_all, y_all, train_ratio=TRAIN_RATIO
)

X_train.shape, X_val.shape, X_test.shape


Define and train LSTM model

In [None]:
def build_lstm_model(
    input_shape: tuple[int, int],
    lstm_units: int = 128,
    dropout_rate: float = 0.5,
) -> Sequential:
    """
    Build a simple LSTM regression model in Keras.

    Parameters
    ----------
    input_shape : tuple[int, int]
        Shape (lookback, n_features) for the input.
    lstm_units : int
        Number of LSTM units.
    dropout_rate : float
        Dropout rate after the LSTM layer.

    Returns
    -------
    Sequential
        Compiled Keras model.
    """
    model = Sequential()
    model.add(LSTM(lstm_units, input_shape=input_shape))
    model.add(Dropout(dropout_rate))
    model.add(Dense(DENSE_LAYER, activation=ACTIVATION))
    model.add(Dense(1))  # regression output

    model.compile(
        optimizer="adam",
        loss="mse",
        metrics=["mae"],
    )
    return model

input_shape: tuple[int, int] = (X_train.shape[1], X_train.shape[2])
model: Sequential = build_lstm_model(
    input_shape=input_shape, lstm_units=LSTM_UNITS, dropout_rate=DROPOUT_RATE
)
model.summary()


Train model with early stopping

In [None]:
early_stop = EarlyStopping(
    monitor="val_loss",
    patience=PATIENCE,
    restore_best_weights=True,
)

history = model.fit(
    X_train,
    y_train,
    validation_data=(X_val, y_val),
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    callbacks=[early_stop],
    verbose=1,
)


Visualize training loss and MAE

In [None]:
def plot_training_history(history) -> None:
    """
    Plot training and validation loss and MAE over epochs.
    """
    history_dict = history.history
    epochs = range(1, len(history_dict["loss"]) + 1)

    fig, axes = plt.subplots(1, 2, figsize=(12, 4))

    axes[0].plot(epochs, history_dict["loss"], label="Train loss (MSE)")
    axes[0].plot(epochs, history_dict["val_loss"], label="Val loss (MSE)")
    axes[0].set_xlabel("Epoch")
    axes[0].set_ylabel("Loss")
    axes[0].set_title("Training and validation loss MSE")
    axes[0].legend()

    axes[1].plot(epochs, history_dict["mae"], label="Train MAE")
    axes[1].plot(epochs, history_dict["val_mae"], label="Val MAE")
    axes[1].set_xlabel("Epoch")
    axes[1].set_ylabel("MAE")
    axes[1].set_title("Training and validation MAE")
    axes[1].legend()

    plt.tight_layout()
    plt.show()


plot_training_history(history)


Evaluate on test set and compute scores descale features to interpret results

In [None]:
y_pred_test_scaled: np.ndarray = model.predict(X_test)


# For interpretability, inverse-transform MeanTemp back to original units.
# We need to embed predictions into a full feature vector before inverse scaling.
def inverse_transform_target(
    scaled_target: np.ndarray,
    scaler: Any,
    target_index: int,
) -> np.ndarray:
    """
    Inverse-transform a single target column that was part of a scaler fit
    on multiple features.

    Parameters
    ----------
    scaled_target : np.ndarray
        Predicted or true target values in scaled space, shape (n_samples, 1) or (n_samples,).
    scaler : Any
        Fitted scaler on the full feature matrix.
    target_index : int
        Index of the target column in the original feature matrix.

    Returns
    -------
    np.ndarray
        Target values in original units, shape (n_samples,).
    """
    scaled_target = np.asarray(scaled_target).reshape(-1, 1)
    n_features: int = scaler.n_features_in_

    # Create dummy zeros for all features, then replace target column
    dummy_scaled = np.zeros((len(scaled_target), n_features))
    dummy_scaled[:, target_index] = scaled_target[:, 0]

    inv_full = scaler.inverse_transform(dummy_scaled)
    return inv_full[:, target_index]


y_test_inv: np.ndarray = inverse_transform_target(y_test, standard_scaler, target_index)
y_pred_test_inv: np.ndarray = inverse_transform_target(
    y_pred_test_scaled, standard_scaler, target_index
)


Regression Metrics

In [None]:
def regression_metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    return mse, rmse, mae, r2


test_mse, test_rmse, test_mae, test_r2 = regression_metrics(y_test_inv, y_pred_test_inv)

print("\n=== Test metrics ===")
print(f"MSE : {test_mse:.4f}")
print(f"RMSE: {test_rmse:.4f}")
print(f"MAE : {test_mae:.4f}")
print(f"R²  : {test_r2:.4f}")

Predictions vs Actuals (LSTM Model)

In [None]:
def plot_predictions(
    y_true: np.ndarray,
    y_pred: np.ndarray,
    title: str = "Actual vs predicted MeanTemp",
    n_points: int = 300,
) -> None:
    """
    Plot actual vs predicted time series.

    Parameters
    ----------
    y_true : np.ndarray
        True target values in original units.
    y_pred : np.ndarray
        Predicted target values in original units.
    title : str
        Title of the plot.
    n_points : int
        Number of last points to plot for readability.
    """
    n = len(y_true)
    start: int = max(0, n - n_points)

    plt.figure(figsize=(12, 4))
    plt.plot(range(start, n), y_true[start:], label="Actual")
    plt.plot(range(start, n), y_pred[start:], label="Predicted")
    plt.xlabel("Time index (relative)")
    plt.ylabel("MeanTemp")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()


plot_predictions(y_test_inv, y_pred_test_inv, n_points=150)

print(len(y_pred_test_inv))
print(len(y_test_inv))


### Residual analysis

In [None]:
residuals: np.ndarray = y_test_inv - y_pred_test_inv

plt.figure(figsize=(12, 4))
plt.plot(residuals)
plt.axhline(0, color="black", linewidth=1)
plt.xlabel("Time index (test set)")
plt.ylabel("Residual (Actual - Predicted)")
plt.title("Residuals over time")
plt.tight_layout()
plt.show()


Linear Regression Baseline

In [None]:
df_clean = df_model.dropna(subset=[TARGET_COLUMN]).reset_index(drop=True)

target_candidates = [c for c in df.columns if c in [TARGET_COLUMN]]
target_col = target_candidates[0]

numeric_cols = df_clean.select_dtypes(include=[np.number]).columns.tolist()
numeric_cols = [c for c in numeric_cols if c != target_col]

X = df_clean[numeric_cols]
y = df_clean[target_col]

# --- Ensure no NaNs before splitting: impute with column means ---
X = X.replace([np.inf, -np.inf], np.nan)  # just in case
X = X.fillna(X.mean())

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=(1.0 - TRAIN_RATIO), random_state=42
)

X_test.head()

Run Linear Regression

In [None]:
# Train model
lin_reg = LinearRegression()
lin_reg.fit(X_train, y_train)

# Predictions
y_train_pred = lin_reg.predict(X_train)
y_test_pred = lin_reg.predict(X_test)

train_mse, train_rmse, train_mae, train_r2 = regression_metrics(y_train, y_train_pred)
test_mse, test_rmse, test_mae, test_r2 = regression_metrics(y_test, y_test_pred)

print("=== Train metrics ===")
print(f"MSE : {train_mse:.4f}")
print(f"RMSE: {train_rmse:.4f}")
print(f"MAE : {train_mae:.4f}")
print(f"R²  : {train_r2:.4f}")

print("\n=== Test metrics ===")
print(f"MSE : {test_mse:.4f}")
print(f"RMSE: {test_rmse:.4f}")
print(f"MAE : {test_mae:.4f}")
print(f"R²  : {test_r2:.4f}")


In [None]:
plot_predictions(y_test.values, y_test_pred, n_points=360)

print(len(y_pred_test_inv))
print(len(y_test_inv))