# Modeling SARIMA par PCType / région (multi_3m)
Prévision mensuelle de `pc_price` par PCType/région avec SARIMA, split hold-out pour MAPE et option MLflow.


## Imports et configuration

In [None]:
import os

from dotenv import load_dotenv
import matplotlib.pyplot as plt
import mlflow
import numpy as np
import pandas as pd
import seaborn as sns
from statsmodels.tsa.statespace.sarimax import SARIMAX

import constants.constants as cst
from constants.paths import PROCESSED_DATA_DIR

sns.set_style("whitegrid")
plt.rcParams["figure.figsize"] = (12, 4)

DATA_PATH = PROCESSED_DATA_DIR / "multi_3m.csv"
assert DATA_PATH.exists(), DATA_PATH

## Configurer MLflow

In [None]:
load_dotenv()
tracking_uri = os.getenv("MLFLOW_TRACKING_URI")
if tracking_uri:
    mlflow.set_tracking_uri(tracking_uri)
    print(f"Tracking URI: {tracking_uri}")

experiment_id = os.getenv("MLFLOW_EXPERIMENT_ID")
experiment_name = os.getenv("MLFLOW_EXPERIMENT_NAME")
active_experiment_id = experiment_id

if experiment_name:
    mlflow.set_experiment(experiment_name)
    exp = mlflow.get_experiment_by_name(experiment_name)
    if exp and not active_experiment_id:
        active_experiment_id = exp.experiment_id
    print("Experiment name:", experiment_name)

if active_experiment_id:
    print("Using experiment_id:", active_experiment_id)
else:
    print("No experiment_id provided; runs will use the active/default experiment.")

## Charger les données

In [None]:
def load_data(horizon: int = 3) -> pd.DataFrame:
    """Load multi_hm.csv and sort."""
    df_local = pd.read_csv(
        PROCESSED_DATA_DIR / f"multi_{horizon}m.csv", parse_dates=["date"]
    )
    return df_local.sort_values(["pc_type", "region", "date"])


df = load_data(3)
df.head()

## Disponibilité des PCType (Enum vs données)

Vérifie quelles valeurs de `PCType` existent dans les données processees et le coverage par région.

In [None]:
from constants.constants import PCType

pc_enum = [p.value for p in PCType]
pc_seen = sorted(df.pc_type.unique())
counts = (
    df.groupby(["pc_type", "region"])
    .agg(start=("date", "min"), end=("date", "max"), rows=("date", "size"))
    .reset_index()
)
missing_enum = [p for p in pc_enum if p not in pc_seen]
print("PCType vus dans les données:", pc_seen)
print("PCType Enum manquants dans les données:", missing_enum)
counts

## Fonction MAPE
Utilisée pour les baselines et SARIMA.

In [None]:
def mape(y_true: pd.Series, y_pred: pd.Series) -> float | None:
    """MAPE en ignorant les zéros/NaN."""
    mask = (y_true != 0) & y_true.notna() & y_pred.notna()
    if mask.sum() == 0:
        return None
    return (np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])).mean() * 100

## Baselines MAPE par horizon (naïf et saisonnier t-12)

In [None]:
def baseline_mape(
    horizon: int,
    strategy: str = "naive",
    df_full: pd.DataFrame | None = None,
    test_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
    """Compute naïf/saisonnier MAPE par pc_type/region."""
    rows = []
    data = df_full if df_full is not None else df
    for (pc, reg), g in data.groupby(["pc_type", "region"]):
        g = g.sort_values("date").set_index("date")
        y = g["pc_price"]
        if strategy == "naive":
            pred = y.shift(horizon)
        elif strategy == "seasonal_naive":
            pred = y.shift(12)
        else:
            raise ValueError("strategy must be naive or seasonal_naive")
        if test_df is not None:
            mask_dates = test_df[(test_df.pc_type == pc) & (test_df.region == reg)][
                "date"
            ]
            test_mask = g.index.isin(mask_dates)
            m = mape(y[test_mask], pred[test_mask])
        else:
            m = mape(y, pred)
        rows.append(
            {
                "pc_type": pc,
                "region": reg,
                "horizon": horizon,
                "strategy": strategy,
                "MAPE": m,
            }
        )
    return pd.DataFrame(rows)

## Split temporel adaptatif
Assure un minimum d'observations train/test par (pc_type, region).

In [None]:
def adaptive_train_test_split(
    df_local: pd.DataFrame,
    group_cols: list[str],
    target_test_ratio: float = 0.2,
    min_train_samples: int = 20,
    min_test_samples: int = 5,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Split temporel tout en garantissant un minimum par groupe."""
    df_local = df_local.sort_values(["date"] + group_cols).copy()
    split_dates = []
    for _, g in df_local.groupby(group_cols):
        g = g.sort_values("date")
        n_obs = len(g)
        if n_obs < min_train_samples + min_test_samples:
            split_dates.append(g["date"].iloc[min_train_samples - 1])
            continue
        idx_train = max(min_train_samples, int(n_obs * (1 - target_test_ratio)))
        idx_train = min(idx_train, n_obs - min_test_samples)
        split_dates.append(g["date"].iloc[idx_train - 1])
    global_split = max(split_dates)
    train_df = df_local[df_local["date"] <= global_split]
    test_df = df_local[df_local["date"] > global_split]
    return train_df, test_df

## Baselines sur le jeu de validation (test_df)

In [None]:
splits = {}
baseline_tables = []
for horizon in [3, 6, 9]:
    df_h = load_data(horizon)
    train_df_h, test_df_h = adaptive_train_test_split(
        df_h,
        group_cols=["pc_type", "region"],
        target_test_ratio=0.2,
        min_train_samples=20,
        min_test_samples=5,
    )
    splits[horizon] = {"df": df_h, "train": train_df_h, "test": test_df_h}
    baseline_tables.append(
        baseline_mape(horizon, "naive", df_full=df_h, test_df=test_df_h)
    )
    baseline_tables.append(
        baseline_mape(horizon, "seasonal_naive", df_full=df_h, test_df=test_df_h)
    )
baseline_df = pd.concat(baseline_tables, ignore_index=True)
baseline_df

## Fonctions utilitaires

## SARIMA + logging MLflow (optionnel)

In [None]:
def evaluate_sarima(
    pc_type: str,
    region: str,
    df_source: pd.DataFrame,
    order: tuple[int, int, int] = (1, 1, 0),
    seasonal_order: tuple[int, int, int, int] = (0, 0, 0, 12),
    horizon: int = 6,
    log_mlflow: bool = False,
    experiment_id: str | None = None,
    train_df: pd.DataFrame | None = None,
    test_df: pd.DataFrame | None = None,
) -> dict:
    """Fit SARIMA pour un pc_type/region et évalue sur y_test."""
    train_slice = (
        train_df[(train_df.pc_type == pc_type) & (train_df.region == region)]
        if train_df is not None
        else df_source[(df_source.pc_type == pc_type) & (df_source.region == region)]
    )
    test_slice = (
        test_df[(test_df.pc_type == pc_type) & (test_df.region == region)]
        if test_df is not None
        else None
    )
    train_slice = train_slice.sort_values("date")
    y_train = train_slice.set_index("date")["pc_price"]
    y_test = None
    if test_slice is not None and not test_slice.empty:
        y_test = test_slice.sort_values("date").set_index("date")["pc_price"]
    if y_test is None:
        if len(y_train) <= horizon + 2:
            return {
                "pc_type": pc_type,
                "region": region,
                "MAPE": None,
                "note": "série trop courte",
            }
        y_test = y_train.iloc[-horizon:]
        y_train = y_train.iloc[:-horizon]
    if len(y_train) <= max(order[0], seasonal_order[0] * seasonal_order[3]):
        return {
            "pc_type": pc_type,
            "region": region,
            "MAPE": None,
            "note": "train trop court",
        }
    try:
        model = SARIMAX(
            y_train,
            order=order,
            seasonal_order=seasonal_order,
            enforce_stationarity=False,
            enforce_invertibility=False,
        )
        res = model.fit(disp=False)
        steps = min(len(y_test), horizon)
        fc = res.get_forecast(steps=steps).predicted_mean
        m = mape(y_test.iloc[:steps], fc)
        result = {
            "pc_type": pc_type,
            "region": region,
            "MAPE": m,
            "order": order,
            "seasonal_order": seasonal_order,
            "horizon": horizon,
        }
        if log_mlflow:
            with mlflow.start_run(
                run_name=f"SARIMA_{pc_type}_{region}_h{horizon}",
                experiment_id=experiment_id,
            ):
                mlflow.set_tags(
                    {
                        cst.MLFLOW_MODEL_TYPE: "sarima",
                        cst.MLFLOW_MODEL_PHILOSOPHY: "statistical",
                        cst.MLFLOW_HORIZON: horizon,
                        cst.MLFLOW_FUNCTION: "forecast",
                        "pc_type": pc_type,
                        "region": region,
                    }
                )
                mlflow.log_params(
                    {
                        "order": order,
                        "seasonal_order": seasonal_order,
                        "horizon": horizon,
                    }
                )
                if m is not None:
                    mlflow.log_metric("MAPE", m)
        return result
    except Exception as exc:
        return {
            "pc_type": pc_type,
            "region": region,
            "MAPE": None,
            "error": str(exc),
        }

## Run diagnostic (sans logging)

In [None]:
results = []
for horizon, objs in splits.items():
    df_h = objs["df"]
    train_h = objs["train"]
    test_h = objs["test"]
    for pc in sorted(df_h.pc_type.unique()):
        for reg in sorted(df_h.region.unique()):
            results.append(
                evaluate_sarima(
                    pc, reg, df_h, horizon=horizon, train_df=train_h, test_df=test_h
                )
            )
pd.DataFrame(results)

## Run avec logging MLflow (optionnel)

In [None]:
# results_mlflow = []
# for horizon, objs in splits.items():
#     df_h = objs['df']
#     train_h = objs['train']
#     test_h = objs['test']
#     for pc in sorted(df_h.pc_type.unique()):
#         for reg in sorted(df_h.region.unique()):
#             results_mlflow.append(
#                 evaluate_sarima(
#                     pc,
#                     reg,
#                     df_h,
#                     horizon=horizon,
#                     train_df=train_h,
#                     test_df=test_h,
#                     log_mlflow=True,
#                     experiment_id=active_experiment_id,
#                 )
#             )
# pd.DataFrame(results_mlflow)
