# 04 — Forecasting
Build an explainable baseline forecast for monthly total consumption, validate it with a historical backtest, and derive stress projection under a simple supply assumption.


In [None]:
import csv
import math
import statistics
from datetime import datetime
from pathlib import Path

DATA_DIR = Path("data/processed")
REPORTS_DIR = Path("reports")
FIG_DIR = REPORTS_DIR / "figures"
FIG_DIR.mkdir(parents=True, exist_ok=True)

def read_csv(path):
    with open(path, newline="", encoding="utf-8") as f:
        return list(csv.DictReader(f))

def parse_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d")

def month_range(start, end):
    months = []
    cur = datetime(start.year, start.month, 1)
    while cur <= end:
        months.append(cur.strftime("%Y-%m-%d"))
        if cur.month == 12:
            cur = datetime(cur.year + 1, 1, 1)
        else:
            cur = datetime(cur.year, cur.month + 1, 1)
    return months


In [None]:
def write_svg_line_chart(path, title, x_labels, series_dict, y_label="Value"):
    width, height = 1100, 420
    ml, mr, mt, mb = 70, 20, 50, 70
    pw, ph = width - ml - mr, height - mt - mb
    values = [v for vals in series_dict.values() for v in vals if v is not None]
    y_min, y_max = min(values), max(values)
    if y_max == y_min:
        y_max += 1.0
    def sx(i):
        return ml + (i/(len(x_labels)-1))*pw if len(x_labels) > 1 else ml
    def sy(v):
        return mt + (1 - (v - y_min)/(y_max-y_min))*ph
    colors = ["#1f77b4", "#d62728", "#2ca02c", "#9467bd"]
    lines = [f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}">',
             f'<text x="{width/2}" y="24" text-anchor="middle" font-size="18" font-family="Arial">{title}</text>',
             f'<line x1="{ml}" y1="{mt}" x2="{ml}" y2="{mt+ph}" stroke="#333"/>',
             f'<line x1="{ml}" y1="{mt+ph}" x2="{ml+pw}" y2="{mt+ph}" stroke="#333"/>',
             f'<text x="16" y="{mt+ph/2}" transform="rotate(-90 16,{mt+ph/2})" font-size="12" font-family="Arial">{y_label}</text>']
    for i in range(6):
        yv = y_min + (y_max-y_min)*i/5
        y = sy(yv)
        lines.append(f'<line x1="{ml}" y1="{y}" x2="{ml+pw}" y2="{y}" stroke="#eee"/>')
        lines.append(f'<text x="{ml-8}" y="{y+4}" text-anchor="end" font-size="10" font-family="Arial">{yv:.2f}</text>')
    tick_step = max(1, len(x_labels)//12)
    for i, label in enumerate(x_labels):
        if i % tick_step == 0:
            x = sx(i)
            lines.append(f'<line x1="{x}" y1="{mt+ph}" x2="{x}" y2="{mt+ph+5}" stroke="#333"/>')
            lines.append(f'<text x="{x}" y="{mt+ph+18}" transform="rotate(45 {x},{mt+ph+18})" font-size="9" font-family="Arial">{label[:7]}</text>')
    for idx, (name, vals) in enumerate(series_dict.items()):
        color = colors[idx % len(colors)]
        segment = []
        for i, v in enumerate(vals):
            if v is None:
                if len(segment) > 1:
                    lines.append(f'<polyline fill="none" stroke="{color}" stroke-width="2" points="{" ".join(segment)}"/>')
                segment = []
            else:
                segment.append(f"{sx(i):.1f},{sy(v):.1f}")
        if len(segment) > 1:
            lines.append(f'<polyline fill="none" stroke="{color}" stroke-width="2" points="{" ".join(segment)}"/>')
        ly = mt + 16*idx
        lines.append(f'<rect x="{ml+pw-170}" y="{ly-10}" width="10" height="10" fill="{color}"/>')
        lines.append(f'<text x="{ml+pw-154}" y="{ly}" font-size="11" font-family="Arial">{name}</text>')
    lines.append('</svg>')
    Path(path).write_text("\n".join(lines), encoding="utf-8")

def write_svg_bar_chart(path, title, labels, values, y_label="Value"):
    width, height = 1100, 420
    ml, mr, mt, mb = 70, 20, 50, 120
    pw, ph = width - ml - mr, height - mt - mb
    y_min = min(0.0, min(values))
    y_max = max(values)
    if y_max == y_min:
        y_max += 1.0
    def sx(i):
        bw = pw/len(labels)
        return ml + i*bw + bw*0.15
    def sw():
        return (pw/len(labels))*0.7
    def sy(v):
        return mt + (1 - (v-y_min)/(y_max-y_min))*ph
    lines=[f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}">',
           f'<text x="{width/2}" y="24" text-anchor="middle" font-size="18" font-family="Arial">{title}</text>',
           f'<line x1="{ml}" y1="{mt}" x2="{ml}" y2="{mt+ph}" stroke="#333"/>',
           f'<line x1="{ml}" y1="{sy(0)}" x2="{ml+pw}" y2="{sy(0)}" stroke="#333"/>']
    for i,v in enumerate(values):
        x=sx(i); y=sy(max(v,0)); h=abs(sy(0)-sy(v))
        color="#d62728" if v>=0 else "#1f77b4"
        lines.append(f'<rect x="{x}" y="{y}" width="{sw()}" height="{h}" fill="{color}" opacity="0.8"/>')
        lx=x+sw()/2
        lines.append(f'<text x="{lx}" y="{mt+ph+15}" transform="rotate(45 {lx},{mt+ph+15})" font-size="9" font-family="Arial">{labels[i][:7]}</text>')
    lines.append(f'<text x="16" y="{mt+ph/2}" transform="rotate(-90 16,{mt+ph/2})" font-size="12" font-family="Arial">{y_label}</text>')
    lines.append('</svg>')
    Path(path).write_text("\n".join(lines), encoding="utf-8")


In [None]:

cons_rows = [r for r in read_csv(DATA_DIR / 'electricity_consumption_clean.csv') if r['sector'] == 'total']
cons_rows = sorted(cons_rows, key=lambda r: parse_date(r['date']))
months = [r['date'] for r in cons_rows]
series = [float(r['consumption']) for r in cons_rows]
cons = dict(zip(months, series))

# Backtest split
train_end = '2023-06-30'
backtest_start = '2023-07-01'
backtest_end = '2024-11-30'

train_months = [d for d in months if d <= train_end]
backtest_months = [d for d in months if backtest_start <= d <= backtest_end]

if not train_months:
    raise ValueError('No training months found for configured split date.')
if not backtest_months:
    raise ValueError('No backtest months found for configured window.')

train_cons = {d: cons[d] for d in train_months}
backtest_actual = [cons[d] for d in backtest_months]

# Forecasting method: seasonal naive (same month previous year, fallback last observed)
last_train_value = train_cons[train_months[-1]]
backtest_pred = []
for d in backtest_months:
    dt = parse_date(d)
    prev_year = f"{dt.year-1:04d}-{dt.month:02d}-01"
    backtest_pred.append(train_cons.get(prev_year, last_train_value))

# Baseline: last-value naive (flat at last training observation)
baseline_pred = [last_train_value] * len(backtest_months)

# Metrics

def mae(actual, pred):
    return sum(abs(a - p) for a, p in zip(actual, pred)) / len(actual)

def rmse(actual, pred):
    return math.sqrt(sum((a - p) ** 2 for a, p in zip(actual, pred)) / len(actual))

def mape(actual, pred):
    pct_errors = [abs((a - p) / a) for a, p in zip(actual, pred) if a != 0]
    return (sum(pct_errors) / len(pct_errors)) * 100 if pct_errors else math.nan

model_metrics = {
    'MAE': mae(backtest_actual, backtest_pred),
    'RMSE': rmse(backtest_actual, backtest_pred),
    'MAPE': mape(backtest_actual, backtest_pred),
}
baseline_metrics = {
    'MAE': mae(backtest_actual, baseline_pred),
    'RMSE': rmse(backtest_actual, baseline_pred),
    'MAPE': mape(backtest_actual, baseline_pred),
}

residuals = [a - p for a, p in zip(backtest_actual, backtest_pred)]

print(f"Train period: {train_months[0]} to {train_months[-1]}")
print(f"Backtest period: {backtest_months[0]} to {backtest_months[-1]}")
print('')
print('Model (seasonal naive) metrics:')
for k, v in model_metrics.items():
    print(f"  {k}: {v:.3f}")
print('Baseline (last-value naive) metrics:')
for k, v in baseline_metrics.items():
    print(f"  {k}: {v:.3f}")


In [None]:

# Backtest figures
write_svg_line_chart(
    FIG_DIR / 'forecast_consumption_backtest_vs_actual.svg',
    'Backtest: actual vs forecast consumption (total sector)',
    backtest_months,
    {
        'actual': backtest_actual,
        'seasonal_naive_forecast': backtest_pred,
        'last_value_naive_baseline': baseline_pred,
    },
    y_label='MWh',
)

write_svg_line_chart(
    FIG_DIR / 'forecast_backtest_residuals.svg',
    'Backtest residuals over time (actual - forecast)',
    backtest_months,
    {'residual': residuals},
    y_label='MWh',
)

# Error distribution (residual histogram)
hist_bins = 10
if residuals:
    r_min, r_max = min(residuals), max(residuals)
    if r_min == r_max:
        bin_edges = [r_min + i for i in range(hist_bins + 1)]
    else:
        step = (r_max - r_min) / hist_bins
        bin_edges = [r_min + i * step for i in range(hist_bins + 1)]
    counts = [0] * hist_bins
    for r in residuals:
        idx = hist_bins - 1 if r == bin_edges[-1] else int((r - bin_edges[0]) / (bin_edges[-1] - bin_edges[0]) * hist_bins)
        idx = min(max(idx, 0), hist_bins - 1)
        counts[idx] += 1
    labels = [f"{bin_edges[i]:.1f}..{bin_edges[i+1]:.1f}" for i in range(hist_bins)]
    write_svg_bar_chart(
        FIG_DIR / 'forecast_backtest_error_distribution.svg',
        'Backtest residual distribution',
        labels,
        counts,
        y_label='Count',
    )

print('Saved backtest figures.')


In [None]:

# Projection section (kept after backtest)
sup = {r['date']: float(r['supply']) for r in read_csv(DATA_DIR / 'electricity_supply_clean.csv') if r['sector'] == 'total'}
shared_months = sorted(set(cons).intersection(sup))

# Baseline forward forecast: seasonal naive (same month last year)
horizon = 6
future = []
last_date = parse_date(shared_months[-1])
for h in range(1, horizon + 1):
    if last_date.month == 12:
        last_date = datetime(last_date.year + 1, 1, 1)
    else:
        last_date = datetime(last_date.year, last_date.month + 1, 1)
    d = last_date.strftime('%Y-%m-%d')

    prev_year = f"{last_date.year-1:04d}-{last_date.month:02d}-01"
    fc = cons.get(prev_year, cons[shared_months[-1]])
    supply_assumption = fc * 1.01  # simple +1% reserve assumption
    stress_ratio = fc / supply_assumption if supply_assumption else math.nan
    future.append({'date': d, 'forecast_consumption': fc, 'assumed_supply': supply_assumption, 'projected_stress_ratio': stress_ratio})

# Save projection figures
hist_x = shared_months[-24:]
hist_cons = [cons[d] for d in hist_x]
fc_x = [f['date'] for f in future]
fc_cons = [f['forecast_consumption'] for f in future]

write_svg_line_chart(
    FIG_DIR / 'forecast_consumption_projection.svg',
    'Consumption: recent history + 6-month baseline forecast',
    hist_x + fc_x,
    {
        'history': hist_cons + [None] * len(fc_x),
        'forecast': [None] * len(hist_x) + fc_cons,
    },
    y_label='MWh',
)

write_svg_line_chart(
    FIG_DIR / 'forecast_stress_projection.svg',
    'Projected Stress Ratio (forecast consumption / assumed supply)',
    fc_x,
    {'projected_stress_ratio': [f['projected_stress_ratio'] for f in future]},
    y_label='ratio',
)

print('Saved forecast projection figures.')
print('')
print('Baseline forecast table (6 months):')
print('date       | forecast_consumption | assumed_supply | projected_stress_ratio')
for f in future:
    print(f"{f['date']} | {f['forecast_consumption']:20.3f} | {f['assumed_supply']:14.3f} | {f['projected_stress_ratio']:.4f}")
