In [1]:
# Ensure project root is on sys.path so `import src...` works
import sys
from pathlib import Path

root = Path.cwd().resolve()
# If running from inside notebooks/ adjust to parent directory containing src
if not (root / 'src').exists():
    candidate = root.parent
    if (candidate / 'src').exists():
        root = candidate
# Prepend if missing
root_str = str(root)
if root_str not in sys.path:
    sys.path.insert(0, root_str)
print('Added to sys.path:', root_str)
print('Current working directory:', Path.cwd())

Added to sys.path: /Users/zak/Repos/E-commerce-Demand-Forecasting
Current working directory: /Users/zak/Repos/E-commerce-Demand-Forecasting/notebooks


# N-BEATS Training Notebook

Train the minimal N-BEATS implementation on the panel parquet subset.

## Objectives
1. Load processed panel data (item_id, date, demand).
2. Create sliding window dataset (input_length -> forecast_length).
3. Train N-BEATS LightningModule for a few epochs.
4. Compute validation metrics (MAE, WAPE).
5. Save checkpoint + metrics artifacts.

If the panel file is missing, a synthetic dataset will be generated so the pipeline can run end-to-end.

In [None]:
# Imports and environment checks (single accelerator definition)
import os, json, math
from pathlib import Path
import pandas as pd
import torch
from torch.utils.data import DataLoader
import pytorch_lightning as pl
from src.models.nbeats_module import NBeatsModule, NBeatsConfig
from src.data.dataset_nbeats import PanelForecastDataset, PanelWindowConfig, split_dataset

# Device & accelerator selection (Apple Silicon, CUDA, CPU)
if torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device("mps")
    accelerator = 'mps'
    backend_note = "Using Apple Silicon MPS backend"
elif torch.cuda.is_available():
    device = torch.device("cuda")
    accelerator = 'gpu'
    backend_note = f"Using CUDA GPU: {torch.cuda.get_device_name(0)}"
else:
    device = torch.device("cpu")
    accelerator = 'cpu'
    backend_note = "Falling back to CPU"

print('PyTorch version:', torch.__version__)
print('Lightning version:', pl.__version__)
print('Device:', device)
print('Accelerator:', accelerator)
print('Backend note:', backend_note)


PyTorch version: 2.8.0
Lightning version: 2.5.5
Device: mps
Backend note: Using Apple Silicon MPS backend


In [None]:
# Configuration parameters (aggressive compute)
PANEL_PATH = Path('data/processed/m5_panel_subset.parquet')
ARTIFACTS_DIR = Path('artifacts/models')
ARTIFACTS_DIR.mkdir(parents=True, exist_ok=True)
INPUT_LENGTH = 28 * 4  # 112 days lookback
FORECAST_LENGTH = 30
BATCH_SIZE = 64  # increased for higher utilization
EPOCHS = 30
LEARNING_RATE = 1e-3
NUM_STACKS = 3           # increased
BLOCKS_PER_STACK = 3     # increased
LAYER_WIDTH = 768        # wider layers
N_LAYERS = 4             # deeper blocks
DROPOUT = 0.05           # small regularization
MAX_ITEMS = 50
MAX_WINDOWS_PER_ITEM = 40
VAL_FRACTION = 0.1
SEED = 42
pl.seed_everything(SEED, workers=True)
print('Config -> stacks:', NUM_STACKS, 'blocks/stack:', BLOCKS_PER_STACK, 'layer_width:', LAYER_WIDTH, 'batch_size:', BATCH_SIZE)

Seed set to 42


42

In [11]:
# Load or create synthetic panel
if PANEL_PATH.exists():
    print('Loading panel from', PANEL_PATH)
    panel_df = pd.read_parquet(PANEL_PATH)
else:
    print('Panel not found. Creating synthetic panel for demo...')
    # Synthetic: 20 items, 200 days, simple seasonal pattern + noise
    import numpy as np
    items = [f'ITEM_{i:03d}' for i in range(20)]
    dates = pd.date_range('2024-01-01', periods=200, freq='D')
    rows = []
    for item in items:
        base = np.random.randint(5, 25)
        seasonal = np.sin(np.linspace(0, 12 * math.pi, len(dates))) * np.random.uniform(3, 8)
        noise = np.random.randn(len(dates)) * np.random.uniform(0.5, 2.0)
        demand = (base + seasonal + noise).clip(min=0).round(2)
        for d, val in zip(dates, demand):
            rows.append({'item_id': item, 'date': d, 'demand': float(val)})
    panel_df = pd.DataFrame(rows)
    PANEL_PATH.parent.mkdir(parents=True, exist_ok=True)
    panel_df.to_parquet(PANEL_PATH, index=False)
print(panel_df.head())
print('Panel shape:', panel_df.shape)

Loading panel from data/processed/m5_panel_subset.parquet
    item_id       date  demand
0  ITEM_000 2024-01-01   11.90
1  ITEM_000 2024-01-02   11.30
2  ITEM_000 2024-01-03   11.60
3  ITEM_000 2024-01-04   18.30
4  ITEM_000 2024-01-05   15.64
Panel shape: (4000, 3)


In [12]:
# Build dataset windows with chronological split
# Determine cutoff date for validation based on VAL_FRACTION of unique days
unique_dates = sorted(panel_df['date'].unique())
val_days = max(FORECAST_LENGTH, int(len(unique_dates) * VAL_FRACTION))
# Ensure we have enough history for validation windows
val_history_needed = INPUT_LENGTH + FORECAST_LENGTH
cutoff_index = len(unique_dates) - val_days
cutoff_date = unique_dates[cutoff_index]

# Train: all dates strictly before cutoff_date
train_df = panel_df[panel_df['date'] < cutoff_date]
# Validation: last segment plus required preceding history window
val_start_history_date = unique_dates[max(0, cutoff_index - (val_history_needed - 1))]
val_df = panel_df[panel_df['date'] >= val_start_history_date]

print('Cutoff date for validation segment:', cutoff_date)
print('Train date range:', train_df['date'].min(), '->', train_df['date'].max(), '| rows:', len(train_df))
print('Val+history date range:', val_df['date'].min(), '->', val_df['date'].max(), '| rows:', len(val_df))

# Persist temporary parquet shards (avoids modifying original panel file)
train_path = PANEL_PATH.parent / 'm5_panel_subset_train.parquet'
val_path = PANEL_PATH.parent / 'm5_panel_subset_val.parquet'
train_df.to_parquet(train_path, index=False)
val_df.to_parquet(val_path, index=False)

cfg_ds = PanelWindowConfig(input_length=INPUT_LENGTH, forecast_length=FORECAST_LENGTH, max_items=MAX_ITEMS, max_windows_per_item=MAX_WINDOWS_PER_ITEM)
train_ds = PanelForecastDataset(train_path, cfg_ds)
val_ds = PanelForecastDataset(val_path, cfg_ds)
print('Windows -> train:', len(train_ds), 'val:', len(val_ds))
# Inspect one sample
x0, y0 = train_ds[0]
print('Sample shapes -> x:', x0.shape, 'y:', y0.shape)

Cutoff date for validation segment: 2024-06-19 00:00:00
Train date range: 2024-01-01 00:00:00 -> 2024-06-18 00:00:00 | rows: 3400
Val+history date range: 2024-01-30 00:00:00 -> 2024-07-18 00:00:00 | rows: 3420
Windows -> train: 580 val: 600
Sample shapes -> x: torch.Size([112]) y: torch.Size([30])


In [None]:
# DataLoaders (chronological split, updated batch & workers)
num_workers = 6  # tune based on CPU cores; higher can improve throughput
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=num_workers, persistent_workers=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, num_workers=num_workers, persistent_workers=True)
print('Batches -> train:', len(train_loader), 'val:', len(val_loader), '| workers:', num_workers, '| batch_size:', BATCH_SIZE)

Batches -> train: 19 val: 19 | workers: 4


In [18]:
# Initialize model
cfg_model = NBeatsConfig(input_length=INPUT_LENGTH, forecast_length=FORECAST_LENGTH, learning_rate=LEARNING_RATE, num_stacks=NUM_STACKS, num_blocks_per_stack=BLOCKS_PER_STACK, layer_width=LAYER_WIDTH, n_layers=N_LAYERS, dropout=DROPOUT)
model = NBeatsModule(cfg_model).to(device)
print(model)

NBeatsModule(
  (stacks): ModuleList(
    (0-1): 2 x ModuleList(
      (0-1): 2 x NBeatsBlock(
        (fc): Sequential(
          (0): Linear(in_features=112, out_features=256, bias=True)
          (1): ReLU()
          (2): Linear(in_features=256, out_features=256, bias=True)
          (3): ReLU()
          (4): Linear(in_features=256, out_features=256, bias=True)
          (5): ReLU()
        )
        (backcast_head): Linear(in_features=256, out_features=112, bias=True)
        (forecast_head): Linear(in_features=256, out_features=30, bias=True)
      )
    )
  )
  (loss_fn): MSELoss()
)


In [None]:
# Parameter count & quick inference timing benchmark (updated after scaling)
import time

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total params: {total_params:,} | Trainable: {trainable_params:,} (~{trainable_params/1e6:.3f}M)")

try:
    first_batch = next(iter(train_loader))[0]
except StopIteration:
    first_batch = torch.zeros((BATCH_SIZE, INPUT_LENGTH), dtype=torch.float32)
xb = first_batch.to(device)

# Warmup
for _ in range(3):
    _ = model(xb)

n_runs = 10
start = time.perf_counter()
for _ in range(n_runs):
    _ = model(xb)
end = time.perf_counter()

avg_ms = (end - start) / n_runs * 1000
throughput = (xb.shape[0] * FORECAST_LENGTH) / ((end - start) / n_runs)
print(f"Avg forward time: {avg_ms:.2f} ms | Throughput items*horizon/sec: {throughput:.1f}")
print(f"Device: {device} | Accelerator: {accelerator}")

Total params: 788,024 | Trainable: 788,024 (~0.788M)


AttributeError: Can't pickle local object 'split_dataset.<locals>._Subset'

In [None]:
# ðŸ“Š Metric Interpretation & Overfitting Guide (updated for scaled run)
# Refer to earlier explanations. After scaling model, monitor:
#  - train_loss vs val_loss divergence
#  - val_wape flattening
#  - potential instability if precision < 32 on MPS
# If OOM occurs, reduce BATCH_SIZE first, then LAYER_WIDTH.


In [None]:
# Train (refactored - no duplicate accelerator selection) with performance tweaks
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
import time

print(f'Using accelerator (predefined): {accelerator}')

callbacks = [
    EarlyStopping(monitor='val_loss', patience=4, mode='min'),
    ModelCheckpoint(dirpath=ARTIFACTS_DIR, filename='nbeats-scaled-{epoch:02d}-{val_loss:.4f}', monitor='val_loss', save_top_k=1, mode='min')
]

# Enable mixed precision cautiously (MPS half support still evolving). If instability occurs, set precision=32.
precision_setting = 32 if accelerator == 'mps' else '16-mixed'

trainer = pl.Trainer(
    max_epochs=EPOCHS,
    accelerator=accelerator,
    devices=1 if accelerator in ('gpu','mps') else None,
    log_every_n_steps=5,
    enable_checkpointing=True,
    callbacks=callbacks,
    enable_progress_bar=True,
    precision=precision_setting,
    profiler=None  # set to 'simple' or 'advanced' to profile
)

start_time = time.perf_counter()
trainer.fit(model, train_loader, val_loader)
elapsed = time.perf_counter() - start_time
print(f'Total training time ({EPOCHS} epochs target): {elapsed:.2f}s')

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name    | Type       | Params | Mode 
-----------------------------------------------
0 | stacks  | ModuleList | 788 K  | train
1 | loss_fn | MSELoss    | 0      | train
-----------------------------------------------
788 K     Trainable params
0         Non-trainable params
788 K     Total params
3.152     Total estimated model params size (MB)
44        Modules in train mode
0         Modules in eval mode

  | Name    | Type       | Params | Mode 
-----------------------------------------------
0 | stacks  | ModuleList | 788 K  | train
1 | loss_fn | MSELoss    | 0      | train
-----------------------------------------------
788 K     Trainable params
0         Non-trainable params
788 K     Total params
3.152     Total estimated model params size (MB)
44        Modules in train mode
0      

Using accelerator: mps


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]


Detected KeyboardInterrupt, attempting graceful shutdown ...
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipelibc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe

libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipelibc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe
libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe

libc++abi: terminating due to uncaught exception of type std::__1::system_error: Broken pipe


AttributeError: 'tuple' object has no attribute 'tb_frame'

In [16]:
# Evaluate on validation set (manual pass) with explicit device move
model.eval()
model.to(device)
val_losses = []
val_mae = []
val_wape_num = 0.0
val_wape_den = 0.0
with torch.no_grad():
    for xb, yb in val_loader:
        xb = xb.to(device)
        yb = yb.to(device)
        pred = model(xb)
        loss = torch.mean((pred - yb)**2)
        mae = torch.mean(torch.abs(pred - yb))
        val_losses.append(loss.item())
        val_mae.append(mae.item())
        val_wape_num += torch.sum(torch.abs(pred - yb)).item()
        val_wape_den += torch.sum(torch.abs(yb)).item()
avg_loss = sum(val_losses)/len(val_losses)
avg_mae = sum(val_mae)/len(val_mae)
wape = math.nan if val_wape_den == 0 else 100.0 * val_wape_num/val_wape_den
print(f'Validation MSE: {avg_loss:.4f} | MAE: {avg_mae:.4f} | WAPE: {wape:.2f}%')

Validation MSE: 2.5122 | MAE: 1.2421 | WAPE: 7.78%


In [17]:
# Save artifacts
CKPT_PATH = ARTIFACTS_DIR / 'nbeats_notebook.ckpt'
torch.save(model.state_dict(), CKPT_PATH)
metrics = {
    'validation_mse': avg_loss,
    'validation_mae': avg_mae,
    'validation_wape': wape,
    'config': cfg_model.__dict__,
    'n_train_windows': len(train_ds),
    'n_val_windows': len(val_ds),
}
with open(ARTIFACTS_DIR / 'nbeats_notebook_metrics.json', 'w') as f:
    json.dump(metrics, f, indent=2)
print('Saved checkpoint ->', CKPT_PATH)
print('Saved metrics ->', ARTIFACTS_DIR / 'nbeats_notebook_metrics.json')

Saved checkpoint -> artifacts/models/nbeats_notebook.ckpt
Saved metrics -> artifacts/models/nbeats_notebook_metrics.json


## Next Steps
- Integrate with backtesting harness.
- Add basis (trend/seasonality) blocks for improved decomposition.
- Introduce quantile heads for probabilistic forecasts.
- Add per-item embeddings & categorical covariates.
- Promote best checkpoint to API service for live forecasts.