In [6]:
# Notebook 3. Forecasting & Business Application 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
from glob import glob
from statsmodels.tsa.statespace.sarimax import SARIMAX

# find data/processed 
def find_processed_dir():
    for base in (Path.cwd(), Path.cwd().parent, Path.cwd().parent.parent):
        cand = base / "data" / "processed"
        if cand.exists():
            return cand
    raise FileNotFoundError("data/processed not found (run from repo root or notebooks/)")

processed = find_processed_dir()
print(f"Using processed dir: {processed}")

# load train data from partitioned folder or single parquet
def read_min(path):
    try:
        return pd.read_parquet(path, columns=["date", "sales"])
    except Exception:
        return pd.read_parquet(path, columns=["date", "unit_sales"])

def load_train(proc: Path) -> pd.DataFrame:
    folder = proc / "train_clean_parquet"
    if folder.is_dir():
        files = sorted(glob(str(folder / "**" / "*.parquet"), recursive=True))
        if not files:
            raise FileNotFoundError(f"No parquet files under {folder}")
        df = pd.concat((read_min(f) for f in files), ignore_index=True)
    else:
        for name in ("train_clean.parquet", "analysis_ready.parquet"):
            p = proc / name
            if p.is_file():
                df = read_min(p)
                break
        else:
            raise FileNotFoundError("Missing train parquet (looked for partitioned/, train_clean.parquet, analysis_ready.parquet)")
    if "sales" not in df.columns and "unit_sales" in df.columns:
        df = df.rename(columns={"unit_sales": "sales"})
    return df

df = load_train(processed)

# Clean and daily aggregate
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date"])
df["sales"] = pd.to_numeric(df["sales"], errors="coerce").fillna(0.0).astype("float64").clip(lower=0)

daily = (df.groupby("date", as_index=False)["sales"].sum().sort_values("date"))

# ensure continuous daily index (fill missing days with 0)
idx = pd.date_range(daily["date"].min(), daily["date"].max(), freq="D")
daily = (daily.set_index("date").reindex(idx, fill_value=0.0).rename_axis("date").reset_index())

print(f"Rows loaded: {len(df):,} | days after reindex: {len(daily):,}")

# Train/test split
split_date = pd.Timestamp("2017-07-01")
train = daily[daily["date"] < split_date].set_index("date")
test  = daily[daily["date"] >= split_date].set_index("date")

# SARIMA: simple weekly seasonality 
model = SARIMAX(train["sales"],
                order=(1,1,1),
                seasonal_order=(1,1,1,7),
                enforce_stationarity=False,
                enforce_invertibility=False)
res = model.fit(disp=False)

# forecast 
fcst = res.get_forecast(steps=len(test))
test = test.assign(forecast=fcst.predicted_mean.values)

# Plotting
out_dir = processed.parent / "results" / "figures"
out_dir.mkdir(parents=True, exist_ok=True)

plt.figure(figsize=(12,5))
plt.plot(train.index, train["sales"], label="Train")
plt.plot(test.index,  test["sales"],  label="Test")
plt.plot(test.index,  test["forecast"], label="Forecast")
plt.title("Sales Forecast vs Actual")
plt.legend()
plt.tight_layout()
plt.savefig(out_dir / "sales_forecast.png")
plt.close()

# Metrics
mae = (test["sales"] - test["forecast"]).abs().mean()
print(f"Mean Absolute Error: {mae:,.0f}")
print(f"Saved figure -> {out_dir / 'sales_forecast.png'}")


Using processed dir: c:\Users\Yizi\New folder\Payday-surge-favorita\data\processed
Rows loaded: 125,497,040 | days after reindex: 1,688


  self._init_dates(dates, freq)
  self._init_dates(dates, freq)


Mean Absolute Error: 59,561
Saved figure -> c:\Users\Yizi\New folder\Payday-surge-favorita\data\results\figures\sales_forecast.png


In [11]:
# Business insight 
import time
import numpy as np
import pandas as pd
from pathlib import Path

t0 = time.time()
# Assumes df, daily, train, test already defined

# 1) Test dates
if isinstance(getattr(test, "index", None), pd.DatetimeIndex):
    test_dates = test.index
elif "date" in test.columns:
    test_dates = pd.to_datetime(test["date"])
else:
    test_dates = pd.to_datetime(daily["date"].iloc[-len(test):].values)

# 2) Actuals vs model forecast
y_true = test["sales"].astype(float).values
y_hat  = test["forecast"].astype(float).values

# 3) Baseline: weekly seasonal-naive (t-7 calendar if possible, else repeat last 7)
if isinstance(train.index, pd.DatetimeIndex) and isinstance(test_dates, pd.DatetimeIndex):
    combined = pd.concat([train["sales"], pd.Series(index=test_dates, dtype=float)], axis=0)
    y_hat_sn = combined.shift(7).reindex(test_dates).ffill().fillna(0.0).values
else:
    last7 = train["sales"].astype(float).tail(7).values
    y_hat_sn = np.tile(last7, int(np.ceil(len(test) / 7)))[:len(test)]

# 4) Metrics
mae_model = float(np.mean(np.abs(y_true - y_hat)))
mae_sn    = float(np.mean(np.abs(y_true - y_hat_sn)))
impr_pct  = 100.0 * (1.0 - mae_model / mae_sn) if mae_sn > 0 else np.nan
smape     = 100.0 * np.mean(2.0 * np.abs(y_hat - y_true) / (np.abs(y_true) + np.abs(y_hat) + 1e-9))

# 5) Payday diagnostic (15th and month-end)
dates = pd.DatetimeIndex(test_dates)
paydayish = (dates.day == 15) | dates.is_month_end
if paydayish.any() and (~paydayish).any():
    resid = y_true - y_hat
    mae_payday = float(np.mean(np.abs(resid[paydayish])))
    mae_non    = float(np.mean(np.abs(resid[~paydayish])))
    payday_lift = ((mae_payday / mae_non) - 1.0) * 100.0 if mae_non > 0 else np.nan
else:
    payday_lift = np.nan

# 6) Ops/scale 
n_rows  = len(df)
n_days  = len(daily)
mem_mb  = df.memory_usage(index=False, deep=True).sum() / (1024**2)
runtime = time.time() - t0

# 7) Summary text (prints and saves to results/metrics/forecast_summary.txt)
lines = [
    "## Forecast Summary",
    f"- MAE (model): {mae_model:,.0f}",
    f"- MAE (seasonal-naive, 7D): {mae_sn:,.0f}  → Improvement vs naive: {impr_pct:,.1f}%",
    f"- SMAPE: {smape:,.1f}%"
]
if np.isfinite(payday_lift):
    if payday_lift > 10:
        lines.append(f"- Payday diagnostic: residuals {payday_lift:,.1f}% higher on 15th/month-end → add payday/holiday/promo signals.")
    elif payday_lift < -10:
        lines.append(f"- Payday diagnostic: residuals {abs(payday_lift):,.1f}% lower → spikes largely captured.")
    else:
        lines.append("- Payday diagnostic: effect modest.")
lines += [
    "",
    "## Recommendations",
    "- Keep SARIMAX as baseline; add exogenous flags (payday, holidays, promos).",
    "- Track weekly MAE vs seasonal-naive to detect drift.",
    "",
    "## Ops/Scale",
    f"- Rows: {n_rows:,} | Days: {n_days:,} | ~{mem_mb:,.1f} MB | Runtime: {runtime:,.1f}s"
]
summary = "\n".join(lines)
print(summary)

# Save to results/metrics/forecast_summary.txt
project_root = (processed_dir.parent if "processed_dir" in globals() else processed.parent)
metrics_dir = project_root / "results" / "metrics"
metrics_dir.mkdir(parents=True, exist_ok=True)
out_path = metrics_dir / "forecast_summary.txt"
with open(out_path, "w", encoding="utf-8") as f:
    f.write(summary + "\n")
print(f"\nSaved summary -> {out_path}")


## Forecast Summary
- MAE (model): 59,561
- MAE (seasonal-naive, 7D): 112,577  → Improvement vs naive: 47.1%
- SMAPE: 6.7%
- Payday diagnostic: residuals 27.2% lower → spikes largely captured.

## Recommendations
- Keep SARIMAX as baseline; add exogenous flags (payday, holidays, promos).
- Track weekly MAE vs seasonal-naive to detect drift.

## Ops/Scale
- Rows: 125,497,040 | Days: 1,688 | ~1,914.9 MB | Runtime: 0.0s

Saved summary -> c:\Users\Yizi\New folder\Payday-surge-favorita\data\results\metrics\forecast_summary.txt


In [12]:
import numpy as np, time

t0 = time.time()

# actuals vs forecast
y_true = test["sales"].values
y_pred = test["forecast"].values

# simple 7-day naive baseline
last_week = train["sales"].tail(7).values
naive_pred = np.tile(last_week, int(np.ceil(len(test)/7)))[:len(test)]

# errors
mae_model = np.mean(np.abs(y_true - y_pred))
mae_naive = np.mean(np.abs(y_true - naive_pred))
smape = 100 * np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_true) + np.abs(y_pred) + 1e-9))

print("MAE (model):", round(mae_model))
print("MAE (naive 7d):", round(mae_naive))
print("SMAPE:", round(smape, 1), "%")

# payday check (15th + month-end)
resid = y_true - y_pred
dates = test.index
payday = (dates.day == 15) | dates.is_month_end

if payday.any():
    mae_payday = np.mean(np.abs(resid[payday]))
    mae_other  = np.mean(np.abs(resid[~payday]))
    print("Payday residual MAE:", round(mae_payday))
    print("Other days residual MAE:", round(mae_other))
    if mae_payday > mae_other * 1.1:
        print("→ Spikes under-predicted on paydays (add payday/holiday flags).")

print(f"Rows: {len(df):,} | Days: {len(daily):,} | Runtime: {time.time()-t0:.1f}s")


MAE (model): 59561
MAE (naive 7d): 69395
SMAPE: 6.7 %
Payday residual MAE: 44132
Other days residual MAE: 60637
Rows: 125,497,040 | Days: 1,688 | Runtime: 0.0s
