# PeakGuard Training Pipeline (Global GRU with Device Embedding)

This notebook trains a global GRU model using all devices' data stored in SQLite, saves artifacts with a new model id, and logs metrics/plots into SQLite for the dashboard.


In [21]:
import os, sqlite3, json, math, shutil
import numpy as np, pandas as pd
from datetime import datetime
from sklearn.preprocessing import StandardScaler
import tensorflow as tf
from tensorflow import keras
from zoneinfo import ZoneInfo
from pathlib import Path

# Resolve project root robustly in Jupyter (no __file__)
def resolve_project_root():
    p = Path.cwd().resolve()
    for _ in range(6):
        if (p / 'pyproject.toml').exists() or (p / 'app').exists():
            return p
        if p.parent == p:
            break
        p = p.parent
    return Path.cwd().resolve()

PROJECT_ROOT = resolve_project_root()
DB_PATH = os.environ.get('DB_PATH', str(PROJECT_ROOT / 'data' / 'peakguard.db'))
ART_DIR = str(PROJECT_ROOT / 'artifacts')
VERSIONS_DIR = str(Path(ART_DIR) / 'versions')
LATEST_DIR = str(Path(ART_DIR) / 'latest')
os.makedirs(VERSIONS_DIR, exist_ok=True)
os.makedirs(LATEST_DIR, exist_ok=True)


In [22]:
def read_sql_df(query, params=()):
    with sqlite3.connect(DB_PATH) as conn:
        return pd.read_sql_query(query, conn, params=params)

devices = read_sql_df('SELECT id, timezone FROM devices ORDER BY id')
devices


Unnamed: 0,id,timezone
0,1,America/New_York
1,2,Europe/Berlin


In [10]:
def to_device_local(ts_utc, tz):
    ts = pd.to_datetime(ts_utc, utc=True)
    return ts.tz_convert(ZoneInfo(tz)).tz_convert(None)

def make_features(df, device_id, device_tz):
    # df: index=utc timestamp, column 'consumption'
    if df.empty: return df
    idx_local = pd.DatetimeIndex([to_device_local(t, device_tz) for t in df.index])
    df_local = df.copy()
    df_local.index = idx_local
    df_local['hour'] = df_local.index.hour
    df_local['dow'] = df_local.index.dayofweek
    df_local['hour_sin'] = np.sin(2*np.pi*df_local['hour']/24)
    df_local['hour_cos'] = np.cos(2*np.pi*df_local['hour']/24)
    df_local['dow_sin'] = np.sin(2*np.pi*df_local['dow']/7)
    df_local['dow_cos'] = np.cos(2*np.pi*df_local['dow']/7)
    df_local['device_id'] = device_id
    return df_local[['consumption','hour_sin','hour_cos','dow_sin','dow_cos','device_id']]

def fetch_device_df(device_id, device_tz, days=90):
    end_utc = pd.Timestamp.utcnow().floor('H')
    start_utc = end_utc - pd.Timedelta(days=days)
    df = read_sql_df(
        'SELECT ts_utc, consumption FROM readings WHERE device_id=? AND ts_utc BETWEEN ? AND ? ORDER BY ts_utc',
        (device_id, start_utc.strftime('%Y-%m-%d %H:%M:%S'), end_utc.strftime('%Y-%m-%d %H:%M:%S'))
    )
    df.index = pd.to_datetime(df['ts_utc'])
    df = df.drop(columns=['ts_utc'])
    return make_features(df, device_id, device_tz)

frames = []
for _, row in devices.iterrows():
    frames.append(fetch_device_df(int(row.id), row.timezone))
full = pd.concat(frames).sort_index()
full.head()


  end_utc = pd.Timestamp.utcnow().floor('H')
  end_utc = pd.Timestamp.utcnow().floor('H')


Unnamed: 0,consumption,hour_sin,hour_cos,dow_sin,dow_cos,device_id
2025-06-11 18:00:00,1.0,-1.0,-1.83697e-16,0.974928,-0.222521,2
2025-06-11 19:00:00,0.979811,-0.965926,0.258819,0.974928,-0.222521,2
2025-06-11 20:00:00,0.926145,-0.866025,0.5,0.974928,-0.222521,2
2025-06-11 21:00:00,1.129726,-0.707107,0.7071068,0.974928,-0.222521,2
2025-06-11 22:00:00,1.157112,-0.5,0.8660254,0.974928,-0.222521,2


In [23]:
# Build sequences
WINDOW=48

def make_sequences(df):
    values = df[['consumption','hour_sin','hour_cos','dow_sin','dow_cos']].values.astype('float32')
    X_seq, y = [], []
    for i in range(len(values)-WINDOW):
        X_seq.append(values[i:i+WINDOW])
        y.append(df['consumption'].iloc[i+WINDOW])
    X_seq = np.array(X_seq)
    y = np.array(y).reshape(-1,1)
    dev_ids = df['device_id'].values
    dev_ids_seq = []
    for i in range(len(dev_ids)-WINDOW):
        dev_ids_seq.append(dev_ids[i])
    return X_seq, np.array(dev_ids_seq), y

X, dev_ids, y = make_sequences(full)
num_devices = int(dev_ids.max()) + 1
X.shape, dev_ids.shape, y.shape, num_devices


((2833, 48, 5), (2833,), (2833, 1), 3)

In [24]:
# Train/validation split by time
split = int(len(X)*0.8)
X_train, X_val = X[:split], X[split:]
dev_train, dev_val = dev_ids[:split], dev_ids[split:]
y_train, y_val = y[:split], y[split:]

# Scale input features (per-feature) and target
from sklearn.preprocessing import StandardScaler
x_scaler = StandardScaler()
# reshape to 2D for fitting: (N, WINDOW*5)
X_train_2d = X_train.reshape(len(X_train), -1)
X_val_2d = X_val.reshape(len(X_val), -1)
X_train_s = x_scaler.fit_transform(X_train_2d).reshape(X_train.shape)
X_val_s = x_scaler.transform(X_val_2d).reshape(X_val.shape)

y_scaler = StandardScaler()
y_scaler.fit(y_train)
y_train_s = y_scaler.transform(y_train)
y_val_s = y_scaler.transform(y_val)

# Model with device embedding and mild regularization
inp_seq = keras.layers.Input(shape=(WINDOW, 5))
inp_dev = keras.layers.Input(shape=(), dtype='int32')
emb = keras.layers.Embedding(input_dim=num_devices+1, output_dim=8, mask_zero=False)(inp_dev)
emb_r = keras.layers.RepeatVector(WINDOW)(emb)
x = keras.layers.Concatenate(axis=-1)([inp_seq, emb_r])
x = keras.layers.GRU(64, return_sequences=False, dropout=0.1)(x)
x = keras.layers.Dense(32, activation='relu', kernel_regularizer=keras.regularizers.l2(1e-4))(x)
out = keras.layers.Dense(1)(x)
model = keras.Model([inp_seq, inp_dev], out)
model.compile(optimizer=keras.optimizers.Adam(1e-3), loss=keras.losses.Huber())

callbacks = [
    keras.callbacks.EarlyStopping(patience=8, restore_best_weights=True),
    keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=4)
]

history = model.fit([X_train_s, dev_train], y_train_s,
                    validation_data=([X_val_s, dev_val], y_val_s),
                    epochs=40, batch_size=128, verbose=1, callbacks=callbacks)

# Eval RMSE over val
y_val_hat = y_scaler.inverse_transform(model.predict([X_val_s, dev_val]))
rmse = float(np.sqrt(np.mean((y_val_hat - y_val)**2)))
rmse




Epoch 1/40


2025-08-10 20:48:43.608948: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:961] model_pruner failed: INVALID_ARGUMENT: Graph does not contain terminal node Adam/AssignAddVariableOp_10.


Epoch 2/40
Epoch 3/40
Epoch 4/40
Epoch 5/40
Epoch 6/40
Epoch 7/40
Epoch 8/40
Epoch 9/40
Epoch 10/40


0.36618270943130404

In [29]:
# Store training metrics and a compact test plot payload into SQLite
loss_hist = [float(x) for x in history.history['loss']]
val_loss_hist = [float(x) for x in history.history['val_loss']]
rmse_hist = [None]*(len(loss_hist)-1) + [rmse]

# Compact test plot payload: last 400 points of y and yhat
N = min(400, len(y_val))
plot_payload = {
  'y_true': [float(v) for v in y_val[-N:].ravel().tolist()],
  'y_pred': [float(v) for v in y_val_hat[-N:].ravel().tolist()],
}

with sqlite3.connect(DB_PATH) as conn:
    cur = conn.execute("INSERT INTO models(created_at, artifact_dir, notes) VALUES(?,?,?)",
                      (datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'), version_dir, 'global_gru_with_device_emb'))
    model_id = int(cur.lastrowid)
    conn.execute("INSERT OR REPLACE INTO model_results(model_id, loss_history, rmse_history, test_plot) VALUES(?,?,?,?)",
                 (model_id, json.dumps({'train': loss_hist, 'val': val_loss_hist}), json.dumps(rmse_hist), json.dumps(plot_payload)))
model_id


2

In [30]:
# Compute per-epoch RMSE in kWh from validation loss and update DB record
import numpy as np, sqlite3, json

val_loss = history.history.get('val_loss', [])
rmse_per_epoch = [float(abs(scaler.scale_[0]) * np.sqrt(float(v))) for v in val_loss]
rmse_per_epoch


[0.17372801597539453,
 0.1692831431014454,
 0.17008397796698274,
 0.16946951646435332,
 0.17070286463145143,
 0.17079202261145157,
 0.1695666945868818,
 0.16977331728224276,
 0.16987094636760888,
 0.16948614573462525]

In [31]:
# Build a simple HTML summary and Plotly plots inline to validate training quality
import plotly.graph_objects as go
from IPython.display import display, HTML

# Loss curves
epochs = list(range(1, len(history.history['loss'])+1))
fig_loss = go.Figure()
fig_loss.add_trace(go.Scatter(x=epochs, y=history.history['loss'], mode='lines', name='Train Loss'))
fig_loss.add_trace(go.Scatter(x=epochs, y=history.history['val_loss'], mode='lines', name='Val Loss'))
fig_loss.update_layout(title='Loss Curves', xaxis_title='Epoch', yaxis_title='MSE')

# RMSE curve
fig_rmse = go.Figure()
fig_rmse.add_trace(go.Scatter(x=epochs, y=rmse_per_epoch, mode='lines+markers', name='Val RMSE (kWh)'))
fig_rmse.update_layout(title='Validation RMSE per Epoch', xaxis_title='Epoch', yaxis_title='kWh')

# Final metrics and status heuristic
final_val_loss = float(history.history['val_loss'][-1])
final_rmse = float(rmse_per_epoch[-1]) if rmse_per_epoch else None
mean_y = float(np.mean(y_val))
nrmse = float(final_rmse/mean_y) if mean_y else None
status_good = (final_rmse is not None) and (nrmse is not None) and (nrmse <= 0.1)
color = '#2ea043' if status_good else '#f85149'

summary_html = f"""
<div style='display:grid;grid-template-columns:repeat(3,1fr);gap:12px;'>
  <div><b>Final Val Loss</b><div>{final_val_loss:.4f}</div></div>
  <div><b>Final Val RMSE (kWh)</b><div>{final_rmse:.4f}</div></div>
  <div><b>nRMSE</b><div>{nrmse:.2%}</div></div>
  <div><b>Status</b><div style='color:{color}'>{'Good' if status_good else 'Needs Improvement'}</div></div>
</div>
"""

display(HTML(summary_html))
fig_loss.show()
fig_rmse.show()


In [32]:
# Update the latest DB model record with per-epoch RMSE
with sqlite3.connect(DB_PATH) as conn:
    # fetch latest model id
    cur = conn.execute("SELECT id FROM models ORDER BY id DESC LIMIT 1")
    row = cur.fetchone()
    if row:
        mid = int(row[0])
        conn.execute(
            "UPDATE model_results SET rmse_history=? WHERE model_id=?",
            (json.dumps(rmse_per_epoch), mid)
        )
        print('Updated rmse_history for model_id', mid)
    else:
        print('No model record found to update')


Updated rmse_history for model_id 2


In [25]:
# Save artifacts under a versioned directory and update latest/
version = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
version_dir = os.path.join(VERSIONS_DIR, version)
os.makedirs(version_dir, exist_ok=True)

# Persist target scaler and model
import joblib
joblib.dump(y_scaler, os.path.join(version_dir, 'series_minmax_scaler.pkl'))
model.save(os.path.join(version_dir, 'gru_energy_forecaster.keras'))
model.save_weights(os.path.join(version_dir, 'gru_energy_forecaster.weights.h5'))

# overwrite latest/ by copying
for fname in ['series_minmax_scaler.pkl','gru_energy_forecaster.keras','gru_energy_forecaster.weights.h5']:
    shutil.copy(os.path.join(version_dir,fname), os.path.join(LATEST_DIR,fname))
version_dir


'/Users/andressalguero/Documents/peakguard_api/artifacts/versions/20250811_004905'