In [None]:
pip install hopsworks

Collecting hopsworks
  Downloading hopsworks-4.3.1-py3-none-any.whl.metadata (11 kB)
Collecting pyhumps==1.6.1 (from hopsworks)
  Downloading pyhumps-1.6.1-py3-none-any.whl.metadata (3.7 kB)
Collecting furl (from hopsworks)
  Downloading furl-2.1.4-py2.py3-none-any.whl.metadata (25 kB)
Collecting boto3 (from hopsworks)
  Downloading boto3-1.40.9-py3-none-any.whl.metadata (6.7 kB)
Collecting numpy<2 (from hopsworks)
  Downloading numpy-1.26.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyjks (from hopsworks)
  Downloading pyjks-20.0.0-py2.py3-none-any.whl.metadata (1.7 kB)
Collecting mock (from hopsworks)
  Downloading mock-5.2.0-py3-none-any.whl.metadata (3.1 kB)
Collecting avro==1.11.3 (from hopsworks)
  Downloading avro-1.11.3.tar.gz (90 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.6/90.6 kB

In [None]:
# --- SARIMAX AQI: load from Hopsworks, evaluate, forecast 72h, log to MLflow & Model Registry ---

# pip install hopsworks hsfs statsmodels scikit-learn joblib mlflow -q

import os, json, warnings
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import joblib
from statsmodels.tsa.statespace.sarimax import SARIMAX
import hopsworks, hsfs
import re


warnings.filterwarnings("ignore", "Maximum Likelihood optimization failed to converge")


In [None]:
import os

# Paste your key inside the quotes
os.environ["HOPSWORKS_API_KEY"] = "QbEE5yBSJE4QLoLV.J42Eh3dwTWMZzeVSd4h49ywMTqGOnI0baaBynQ1wxbF2JJ8AF0btuAZH6Iyu2FVY"


In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:

# ======================= CONFIG =======================
PROJECT_NAME      = os.environ.get("HOPSWORKS_PROJECT", None)  # optional; default project from API key
API_KEY           = os.environ["HOPSWORKS_API_KEY"]

FG_NAME           = "aqi_features"
FG_VERSION        = 1

TARGET_CANDIDATES = ["us_aqi", "aqi_us"]
TIME_CANDIDATES   = ["time", "datetime"]

TEST_FRAC         = 0.20
SEASONAL_M        = 24
BEST_ORDER        = (1, 1, 2)
BEST_SEASONAL     = (1, 0, 0, SEASONAL_M)
PREDICT_HORIZON   = 72

ROOT       = Path("/content")
PRED_DIR   = ROOT / "predictions"; PRED_DIR.mkdir(parents=True, exist_ok=True)
MODEL_DIR  = ROOT / "models" / "current"; MODEL_DIR.mkdir(parents=True, exist_ok=True)
CSV_PATH   = PRED_DIR / "sarimax_predicted_aqi_72hrs.csv"
MODEL_PKL  = MODEL_DIR / "sarimax_aqi.pkl"
SCALER_PKL = MODEL_DIR / "exog_scaler.joblib"
META_JSON  = MODEL_DIR / "metadata.json"
FEATURES_JSON = Path("/content/drive/MyDrive/Colab Notebooks/final_feature_list.json")

print("Exists?", FEATURES_JSON.exists(), "→", FEATURES_JSON)



Exists? True → /content/drive/MyDrive/Colab Notebooks/final_feature_list.json


In [None]:
# ======================= Hopsworks: load data =======================
print("Logging in to Hopsworks…")
project = hopsworks.login(project=PROJECT_NAME, api_key_value=API_KEY)
fs = project.get_feature_store()

print(f"Reading feature group {FG_NAME} v{FG_VERSION}…")
fg = fs.get_feature_group(name=FG_NAME, version=FG_VERSION)
df = fg.read()  # pandas DataFrame

# Detect time/target
time_col = next((c for c in TIME_CANDIDATES if c in df.columns), None)
if time_col is None:
    raise ValueError(f"Expected a time column in {TIME_CANDIDATES}. Found: {df.columns.tolist()[:15]}")
target_col = next((c for c in TARGET_CANDIDATES if c in df.columns), None)
if target_col is None:
    raise ValueError(f"Expected a target column in {TARGET_CANDIDATES}. Found: {df.columns.tolist()[:15]}")

# Clean time & sort
df[time_col] = pd.to_datetime(df[time_col], errors="coerce", dayfirst=True)
df = df.dropna(subset=[time_col, target_col]).sort_values(time_col).reset_index(drop=True)


Logging in to Hopsworks…
Connection closed.




To ensure compatibility please install the latest bug fix release matching the minor version of your backend (4.2) by running 'pip install hopsworks==4.2.*'



Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1238211
Reading feature group aqi_features v1…
Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (1.17s) 


In [None]:

# ======================= Feature selection & safe rebuild =======================
if FEATURES_JSON.exists():
    requested_feats = json.loads(FEATURES_JSON.read_text())
    requested_feats = [str(f).strip() for f in requested_feats]
    print(f"Loaded {len(requested_feats)} features from {FEATURES_JSON}")
else:
    requested_feats = [c for c in df.columns if c not in (time_col, target_col)]
    print(f"{FEATURES_JSON} not found — using all columns except time & target.")

# Recompute safe rolling mean if requested (shifted 1 step to avoid leakage)
roll_aliases = {f"{target_col}_roll3", "us_aqi_roll3", "aqi_us_roll3"}
if any(r in requested_feats for r in roll_aliases):
    safe_roll = df[target_col].shift(1).rolling(3, min_periods=3).mean()
    roll_target_name = next((r for r in requested_feats if r in df.columns and r in roll_aliases),
                            f"{target_col}_roll3")
    df[roll_target_name] = safe_roll

# Recompute safe lags if requested (…_lag1/_lag6/_lag12/_lag24 etc.)
lag_pattern = re.compile(rf"^(?:{re.escape(target_col)}|us_aqi|aqi_us)_lag(\d+)$")
for feat in list(requested_feats):
    m = lag_pattern.match(feat)
    if m:
        k = int(m.group(1))
        df[feat] = df[target_col].shift(k)

# Drop rows made NaN by shift/rolling
before = len(df)
df = df.dropna().reset_index(drop=True)
if len(df) < before:
    print(f"Dropped {before - len(df)} rows due to lag/rolling NaNs.")

# Final exog feature list = requested ∩ df.columns (excluding time & target)
feat_cols, missing = [], []
for f in requested_feats:
    if f in (time_col, target_col):
        continue
    (feat_cols if f in df.columns else missing).append(f)
print(f"Using {len(feat_cols)} features:")
for f in feat_cols: print("  -", f)
if missing:
    print("Missing from dataframe (skipped):", missing)

X_all_raw = df[feat_cols].copy()
y_all     = df[target_col].copy()


Loaded 23 features from /content/drive/MyDrive/Colab Notebooks/final_feature_list.json
Dropped 24 rows due to lag/rolling NaNs.
Using 23 features:
  - us_aqi_roll3
  - us_aqi_lag1
  - us_aqi_diff
  - pm2_5_diff
  - log_carbon_monoxide
  - log_pm10
  - log_nitrogen_dioxide
  - us_aqi_lag6
  - log_sulphur_dioxide
  - log_pm2_5
  - us_aqi_lag24
  - wind_speed_10m
  - pm2_5_temp_interaction
  - hour_sin
  - ozone_per_humidity
  - ozone
  - temperature_2m
  - relative_humidity_2m
  - hour
  - day_of_week
  - hour_cos
  - month
  - precipitation


In [None]:

# ======================= Chrono split & scale (train-only) =======================
n = len(df)
test_size = max(1, int(round(n * TEST_FRAC)))
cut = n - test_size

X_train_raw, X_test_raw = X_all_raw.iloc[:cut], X_all_raw.iloc[cut:]
y_train,     y_test     = y_all.iloc[:cut],     y_all.iloc[cut:]
t_test_index            = df[time_col].iloc[cut:]

scaler = StandardScaler()
X_train = scaler.fit_transform(X_train_raw.values)
X_test  = scaler.transform(X_test_raw.values)


In [None]:

# ======================= Fit SARIMAX =======================
model = SARIMAX(
    y_train, exog=X_train,
    order=BEST_ORDER, seasonal_order=BEST_SEASONAL,
    enforce_stationarity=False, enforce_invertibility=False,
    concentrate_scale=True, initialization="approximate_diffuse",
)
res = model.fit(method="lbfgs", maxiter=60, disp=False)

# ======================= Metrics =======================
def metrics_block(name, y_true, y_pred):
    mae  = mean_absolute_error(y_true, y_pred)
    rmse = float(np.sqrt(mean_squared_error(y_true, y_pred)))  # compatible with old/new sklearn
    r2   = r2_score(y_true, y_pred)
    print(f"\n=== {name} ===\nMAE: {mae:.3f}\nRMSE: {rmse:.3f}\nR²: {r2:.3f}")
    return {"MAE": float(mae), "RMSE": rmse, "R2": float(r2)}

# 1) Train (in-sample)
train_fit = res.fittedvalues.reindex(y_train.index).dropna()
train_m = metrics_block("Train (in-sample)", y_train.loc[train_fit.index], train_fit)

# 2) Validation (multi-step, one-shot over full test)  ← matches your friend
model_val = SARIMAX(
    y_train, exog=X_train,
    order=BEST_ORDER, seasonal_order=BEST_SEASONAL,
    enforce_stationarity=False, enforce_invertibility=False,
    concentrate_scale=True, initialization="approximate_diffuse",
)
res_val = model_val.fit(method="lbfgs", maxiter=60, disp=False)
y_pred_full = res_val.forecast(steps=len(y_test), exog=X_test)
val_m = metrics_block("Validation (multi-step, one-shot over full test)", y_test, y_pred_full)



=== Train (in-sample) ===
MAE: 0.171
RMSE: 1.930
R²: 0.979

=== Validation (multi-step, one-shot over full test) ===
MAE: 0.130
RMSE: 0.570
R²: 0.994


In [None]:

# ======================= 72h Forecast CSV =======================
# Build future timestamps
last_ts = df.loc[df[time_col].notna(), time_col].iloc[-1]
future_idx = pd.date_range(start=last_ts.floor("h") + pd.Timedelta(hours=1),
                           periods=PREDICT_HORIZON, freq="h")

# Build future exog:
#  - calendar features (if your JSON included them, you may add reconstruction here)
#  - carry-forward last observed exog for the rest (robust default)
last_ex = X_all_raw.iloc[-1].to_dict()
future_exog = np.array([[last_ex.get(f, 0.0) for f in feat_cols] for _ in range(PREDICT_HORIZON)], dtype=float)
future_exog_s = scaler.transform(future_exog)

future_pred = res_val.forecast(steps=PREDICT_HORIZON, exog=future_exog_s)
future_pred = np.asarray(future_pred).ravel()

# CSV format d/m/yy HH:MM (matches your LSTM CSV)
fmt = (future_idx.day.astype(str) + "/" + future_idx.month.astype(str) + "/" +
       future_idx.strftime("%y") + " " + future_idx.strftime("%H:%M"))
forecast_df = pd.DataFrame({"datetime": fmt, "predicted_aqi": future_pred})
forecast_df.to_csv(CSV_PATH, index=False)
print(f"\nSaved forecast CSV → {CSV_PATH}")



Saved forecast CSV → /content/predictions/sarimax_predicted_aqi_72hrs.csv


In [None]:

# ======================= Save artifacts (AFTER CSV) =======================
joblib.dump({"order": BEST_ORDER, "seasonal_order": BEST_SEASONAL, "features": feat_cols, "result": res_val}, MODEL_PKL)
joblib.dump(scaler, SCALER_PKL)
META_JSON.write_text(json.dumps({
    "time_col": time_col, "target_col": target_col, "features": feat_cols,
    "seasonal_m": SEASONAL_M, "train_rows": int(len(y_train)), "test_rows": int(len(y_test)),
    "train_metrics": train_m, "val_multistep_metrics": val_m
}, indent=2))
print(f"Saved model → {MODEL_PKL}")
print(f"Saved scaler → {SCALER_PKL}")
print(f"Saved metadata → {META_JSON}")


Saved model → /content/models/current/sarimax_aqi.pkl
Saved scaler → /content/models/current/exog_scaler.joblib
Saved metadata → /content/models/current/metadata.json


In [None]:

# ======================= (Optional) Register in Hopsworks Model Registry =======================
try:
    mr = project.get_model_registry()
    model_meta = mr.python.create_model(
        name="sarimax_aqi",
        metrics={"val_rmse": val_m["RMSE"], "val_mae": val_m["MAE"], "val_r2": val_m["R2"]},
        description="SARIMAX AQI forecaster — one-shot multi-step validation (matches peer notebook).",
    )
    model_meta.save(str(MODEL_DIR))
    print("Registered model in Hopsworks Model Registry.")
except Exception as e:
    print("ℹSkipped model registry:", e)



  0%|          | 0/6 [00:00<?, ?it/s]

Uploading /content/models/current/metadata.json: 0.000%|          | 0/845 elapsed<00:00 remaining<?

Uploading /content/models/current/exog_scaler.joblib: 0.000%|          | 0/1151 elapsed<00:00 remaining<?

Uploading /content/models/current/sarimax_aqi.pkl: 0.000%|          | 0/249085264 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/1238211/models/sarimax_aqi/2
Registered model in Hopsworks Model Registry.
