In [None]:
# src/data_prep.py

# Script to load, clean, and merge Alberta generation and load CSVs.
# Outputs a cleaned CSV with consistent hourly timestamps and engineered features.

from pathlib import Path
import pandas as pd
import numpy as np

TZ = "America/Edmonton"

def _normalize_col(c):
    if not isinstance(c, str):
        return c
    s = c.strip().lower()
    s = s.replace("(", "").replace(")", "")
    s = s.replace("/", "_").replace("-", "_").replace(".", "").replace(",", "")
    s = "_".join(s.split())
    return s

def _find_by_keywords(cols, keywords_list):
    """
    Return first column name that contains all keywords in one of the keyword lists.
    keywords_list: list of keyword-lists (priority)
    """
    norm = {c: _normalize_col(c) for c in cols}
    for keywords in keywords_list:
        for c, nc in norm.items():
            if all(k in nc for k in keywords):
                return c
    return None

def _to_numeric(s):
    return pd.to_numeric(s.astype(str).str.replace(",","").str.strip(), errors="coerce")

def load_generation_csv(path: Path):
    df = pd.read_csv(path)
    cols = list(df.columns)

    # find date col
    date_col = _find_by_keywords(cols, [["date"], ["time"], ["date_time"], ["date_-_mst"], ["date_mst"]]) or cols[0]

    # find main columns
    total_gen_col = _find_by_keywords(cols, [["total","generation"], ["total_generation"], ["total_gen"], ["total"]])
    wind_col = _find_by_keywords(cols, [["wind"]])
    solar_col = _find_by_keywords(cols, [["solar"]])
    other_col = _find_by_keywords(cols, [["other"]])

    # Build df with only relevant columns if present
    keep = [date_col]
    for c in (total_gen_col, wind_col, solar_col, other_col):
        if c and c not in keep:
            keep.append(c)
    df2 = df[keep].copy()

    # parse timestamp
    df2['timestamp'] = pd.to_datetime(df2[date_col], format="%m/%d/%Y %I:%M:%S %p", errors='coerce')
    # localize to TZ (strings appear to be local MST)
    if df2['timestamp'].dt.tz is None:
        try:
            df2['timestamp'] = df2['timestamp'].dt.tz_localize(TZ, ambiguous='infer', nonexistent='shift_forward')
        except Exception:
            df2['timestamp'] = df2['timestamp'].dt.tz_localize(TZ, ambiguous='NaT', nonexistent='NaT')
    else:
        df2['timestamp'] = df2['timestamp'].dt.tz_convert(TZ)

    # numeric fields
    df2['total_generation_mw'] = _to_numeric(df2[total_gen_col]) if total_gen_col else np.nan
    df2['wind_mw'] = _to_numeric(df2[wind_col]) if wind_col else np.nan
    df2['solar_mw'] = _to_numeric(df2[solar_col]) if solar_col else np.nan
    if other_col:
        df2['other_orig_mw'] = _to_numeric(df2[other_col])
    else:
        df2['other_orig_mw'] = np.nan

    out = df2[['timestamp','total_generation_mw','wind_mw','solar_mw','other_orig_mw']].copy()
    return out

def load_load_csv(path: Path):
    df = pd.read_csv(path)
    cols = list(df.columns)
    date_col = _find_by_keywords(cols, [["date"], ["time"], ["date_time"], ["date_-_mst"], ["date_mst"]]) or cols[0]
    load_col = _find_by_keywords(cols, [["ail"], ["load"], ["demand"]])
    if load_col is None:
        raise ValueError(f"Could not find AIL/load column in {path}. Columns: {cols}")
    df2 = df[[date_col, load_col]].copy()
    df2['timestamp'] = pd.to_datetime(df2[date_col], errors='coerce')
    if df2['timestamp'].dt.tz is None:
        try:
            df2['timestamp'] = df2['timestamp'].dt.tz_localize(TZ, ambiguous='infer', nonexistent='shift_forward')
        except Exception:
            df2['timestamp'] = df2['timestamp'].dt.tz_localize(TZ, ambiguous='NaT', nonexistent='NaT')
    else:
        df2['timestamp'] = df2['timestamp'].dt.tz_convert(TZ)
    df2['load_mw'] = _to_numeric(df2[load_col])
    return df2[['timestamp','load_mw']]

def merge_and_clean(gen_df, load_df, out_path: Path):
    # Outer merge to preserve any timestamps that might be missing in one
    df = pd.merge(gen_df, load_df, on='timestamp', how='outer', sort=True)

    # Ensure datetime sorted, dedupe
    df = df.sort_values('timestamp').drop_duplicates('timestamp').reset_index(drop=True)

    # Compute other_gen (if wind/solar present)
    if 'wind_mw' in df.columns and 'solar_mw' in df.columns:
        df['wind_mw'] = df['wind_mw'].fillna(0.0)
        df['solar_mw'] = df['solar_mw'].fillna(0.0)
        df['other_gen_mw'] = df['total_generation_mw'] - (df['wind_mw'] + df['solar_mw'])
    else:
        df['other_gen_mw'] = df.get('other_orig_mw', np.nan)

    # Compute balance
    df['balance_mw'] = df['total_generation_mw'] - df['load_mw']

    # Check continuity and fill small gaps
    ts_min = df['timestamp'].min()
    ts_max = df['timestamp'].max()
    expected_idx = pd.date_range(ts_min, ts_max, freq='h', tz=TZ)
    if len(expected_idx) != df.shape[0]:
        # reindex & interpolate small gaps
        df = df.set_index('timestamp').reindex(expected_idx)
        df.index.name = 'timestamp'
        numeric_cols = ['total_generation_mw','load_mw','wind_mw','solar_mw','other_gen_mw','balance_mw']
        for c in numeric_cols:
            if c in df.columns:
                df[c] = df[c].interpolate(limit=2)
        df = df.reset_index()

    # Print basic sanity
    missing_gen = df['total_generation_mw'].isna().sum()
    missing_load = df['load_mw'].isna().sum()
    print("Sanity checks after merge:")
    print(f"  rows: {len(df)}")
    print(f"  missing generation rows: {missing_gen}")
    print(f"  missing load rows: {missing_load}")

    # Feature engineering (basic)
    df['hour'] = df['timestamp'].dt.hour
    df['dow'] = df['timestamp'].dt.dayofweek
    df['month'] = df['timestamp'].dt.month
    df['is_weekend'] = (df['dow'] >= 5).astype(int)

    for lag in [1,24,168]:
        df[f'balance_lag{lag}'] = df['balance_mw'].shift(lag)
    for w in [24,168]:
        df[f'balance_roll_{w}'] = df['balance_mw'].rolling(w).mean().shift(1)

    # Save a clean CSV
    out_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(out_path, index=False)
    print(f"Wrote cleaned CSV to {out_path}")
    return df

def load_and_prepare(gen_csv, load_csv, out_path=Path("data/processed/alberta_hourly_clean.csv")):
    gen = Path(gen_csv)
    load = Path(load_csv)
    if not gen.exists() or not load.exists():
        raise FileNotFoundError("Generation / Load CSV not found at paths provided.")
    gen_df = load_generation_csv(gen)
    load_df = load_load_csv(load)
    return merge_and_clean(gen_df, load_df, out_path)



# Training script

Builds regression, classification, and clustering models for Alberta grid data.
Saves models to models/ and predictions to data/processed/predictions.csv for use in Streamlit app.

In [None]:
import argparse
from pathlib import Path
import joblib
import numpy as np
import pandas as pd

from . import data_prep, features, recommend, evaluate, utils
from sklearn.linear_model import Ridge
from sklearn.ensemble import HistGradientBoostingRegressor, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import tensorflow as tf
from tensorflow.keras.callbacks import EarlyStopping


def build_ann_regressor_sklearn():
    from sklearn.neural_network import MLPRegressor
    return ("sklearn_mlp", MLPRegressor(hidden_layer_sizes=(64,32), max_iter=500, random_state=42))

def build_ann_regressor_keras(input_dim):
    import tensorflow as tf
    from tensorflow.keras import layers, models
    model = models.Sequential([
        layers.Input(shape=(input_dim,)),
        layers.Dense(64, activation='relu'),
        layers.Dense(32, activation='relu'),
        layers.Dense(1, activation='linear')
    ])
    model.compile(optimizer='adamW', loss='mae')
    return ("keras", model)

def main(args):
    cfg = utils.load_yaml(args.config) if args.config else {
        "tz":"America/Edmonton","test_days":90,"lags":[1,24,168],"rolls":[24,168],
        "neutral_k":0.25,"high_k":0.75,"models":{"regressor":"hgb","classifier":"rf"},"n_clusters_daily":3
    }

    # Directories
    Path("models").mkdir(parents=True, exist_ok=True)
    Path("data/processed").mkdir(parents=True, exist_ok=True)

    # Data loading and preparation (produces data/processed/alberta_hourly_clean.csv)
    print("Loading and cleaning data...")
    df_clean = data_prep.load_and_prepare(args.gen, args.load, Path("data/processed/alberta_hourly_clean.csv"))

    # Features
    df_feat = features.make_features(df_clean, lags=tuple(cfg.get("lags",[1,24,168])), rolls=tuple(cfg.get("rolls",[24,168])))
    feats = features.feature_list(df_feat)
    target = 'balance_mw'

    # Train/test split (time-based)
    last_ts = df_feat['timestamp'].max()
    cutoff = last_ts - pd.Timedelta(days=cfg.get("test_days",90))
    train_df = df_feat[df_feat['timestamp'] <= cutoff].reset_index(drop=True)
    test_df  = df_feat[df_feat['timestamp'] > cutoff].reset_index(drop=True)
    print("Train rows:", len(train_df), "Test rows:", len(test_df))

    Xtr = train_df[feats]
    ytr = train_df[target]
    Xte = test_df[feats]
    yte = test_df[target]

    sigma_train = float(ytr.std())

    # Regression model selection
    reg_choice = cfg.get("models",{}).get("regressor","ann")
    print("Training regressor:", reg_choice)
    if reg_choice == "hgb":
        reg = HistGradientBoostingRegressor(random_state=42)
        reg.fit(Xtr, ytr)
        ypred = reg.predict(Xte)
        joblib.dump(reg, "models/regressor.joblib")
    elif reg_choice == "ridge":
        reg = Ridge(random_state=42)
        reg.fit(Xtr, ytr)
        ypred = reg.predict(Xte)
        joblib.dump(reg, "models/regressor.joblib")
    elif reg_choice == "ann":
        backend, model = build_ann_regressor_keras(Xtr.shape[1])
        scaler = StandardScaler()
        Xtr_s = scaler.fit_transform(Xtr)
        Xte_s = scaler.transform(Xte)
        early_stop = EarlyStopping(
            monitor='val_loss',  # Which metric to monitor
            patience=10,  # Stop after 10 epochs with no improvement
            restore_best_weights=True
        )
        model.fit(Xtr_s, ytr, validation_split=0.1, epochs=100, batch_size=32, verbose=2, callbacks=[early_stop])
        ypred = model.predict(Xte_s).ravel()
        model.save("models/regressor.keras")
        joblib.dump(scaler, "models/scaler_ann.joblib")
    else:
        # default to hgb
        reg = HistGradientBoostingRegressor(random_state=42)
        reg.fit(Xtr, ytr)
        ypred = reg.predict(Xte)
        joblib.dump(reg, "models/regressor.joblib")

    reg_metrics = evaluate.regression_metrics(yte, ypred)
    print("Regression metrics:", reg_metrics)

    # compute resid std on training (for band)
    try:
        if reg_choice == "ann":
            scaler = joblib.load("models/scaler_ann.joblib")
            train_pred = model.predict(scaler.transform(Xtr)).ravel()
        else:
            train_pred = reg.predict(Xtr)
        resid_std = float((ytr - train_pred).std())
    except Exception:
        resid_std = float((ytr - reg.predict(Xtr)).std())

    # Step 5: classification (3-class)
    neutral_k = cfg.get("neutral_k", 0.25)
    high_k = cfg.get("high_k", 0.75)
    def label_from_balance(x):
        if x >= high_k * sigma_train: return 2
        if x <= -high_k * sigma_train: return 0
        if abs(x) <= neutral_k * sigma_train: return 1
        return 2 if x > 0 else 0

    df_feat['label'] = df_feat['balance_mw'].apply(label_from_balance)
    train_df = df_feat[df_feat['timestamp'] <= cutoff].reset_index(drop=True)
    test_df  = df_feat[df_feat['timestamp'] > cutoff].reset_index(drop=True)
    clf_feats = feats
    clf = RandomForestClassifier(n_estimators=200, random_state=42)
    clf.fit(train_df[clf_feats], train_df['label'])
    ypred_clf = clf.predict(test_df[clf_feats])
    clf_metrics = evaluate.classification_metrics(test_df['label'], ypred_clf)
    print("Classifier metrics:", clf_metrics)
    joblib.dump(clf, "models/classifier.joblib")

    # Step 6: clustering daily profiles
    daily = df_clean[['timestamp','balance_mw']].copy()
    daily['date'] = daily['timestamp'].dt.date
    daily['hour'] = daily['timestamp'].dt.hour
    daily_pivot = daily.pivot_table(
        index='date',
        columns='hour',
        values='balance_mw',
        aggfunc='mean'
    ).fillna(method='ffill', axis=1)
    sc = StandardScaler()
    X_daily = sc.fit_transform(daily_pivot.fillna(0))
    n_clusters = cfg.get("n_clusters_daily", 3)
    kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
    daily_labels = kmeans.fit_predict(X_daily)
    sil = silhouette_score(X_daily, daily_labels)
    joblib.dump(kmeans, "models/kmeans_daily.joblib")

    # Build predictions.csv for app
    out = test_df[['timestamp','balance_mw']].copy()
    out['y_pred'] = ypred
    out['y_lower'] = out['y_pred'] - resid_std
    out['y_upper'] = out['y_pred'] + resid_std
    out['band'] = out['y_pred'].apply(lambda x: recommend.band_label(x, sigma_train, neutral_k=neutral_k, high_k=high_k))
    out['grid_description'] = out['band'].apply(lambda b: recommend.recommendation_from_band(b)['description'])
    out['grid_recommendation'] = out['band'].apply(lambda b: recommend.recommendation_from_band(b)['grid_action'])
    out['customer_recommendation'] = out['band'].apply(lambda b: recommend.recommendation_from_band(b)['customer_action'])
    out.to_csv("data/processed/predictions.csv", index=False)
    print("Wrote predictions.csv (for Streamlit app)")

    # Step 8: save meta
    meta = {
        "regression_metrics": reg_metrics,
        "classification_metrics": clf_metrics,
        "clustering_silhouette": float(sil),
        "sigma_train": float(sigma_train),
        "resid_std": float(resid_std),
        "features": feats
    }
    utils.save_json("models/meta.json", meta)
    print("Saved models/meta.json")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--gen", required=True, help="path to generation CSV")
    parser.add_argument("--load", required=True, help="path to load CSV")
    parser.add_argument("--config", default="config.yaml", help="path to config yaml")
    args = parser.parse_args()
    main(args)