In [4]:
import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
import numpy as np


def get_user_inputs():
    # Get file name
    file_name = input("Enter the file name (with extension, e.g., data.csv): ").strip()

    # Get list of time series IDs to forecast
    product_classes_input = input("Enter time series IDs to forecast, separated by commas: ").strip()
    product_classes_to_forecast = [id_.strip() for id_ in product_classes_input.split(",") if id_.strip()]

    # Get metric and validate
    metric = input("Enter the metric ('mape' or 'mse'): ").strip().lower()
    if metric not in ['mape', 'mse']:
        raise ValueError("Invalid metric. Must be 'mape' or 'mse'.")

    return file_name, product_classes_to_forecast, metric


def load_selected_timeseries(file_name: str, product_classes: list[str]) -> dict:
    # Load the CSV
    df = pd.read_csv(file_name)

    # Drop rows with missing essential values
    df.dropna(subset=["product_class", "Month", "sales_volume"], inplace=True)

    # Convert Month to datetime
    df["Month"] = pd.to_datetime(df["Month"], errors="coerce")
    df.dropna(subset=["Month"], inplace=True)

    # Filter for selected product_classes
    df = df[df["product_class"].isin(product_classes)]

    # Sort by date
    df = df.sort_values("Month")

    # Group and extract time series with product class as name
    time_series_dict = {
        product_class: group.set_index("Month")["sales_volume"].sort_index().rename(product_class)
        for product_class, group in df.groupby("product_class")
    }

    return time_series_dict


def check_seasonality_strength(ts: pd.Series, freq: int = 12, threshold: float = 0.5) -> str:
    # Check for enough data
    if len(ts) < 2 * freq:
        return "low"  # Not enough data to assess seasonality

    # Decompose the time series
    decomposition = seasonal_decompose(ts, model='additive', period=freq, extrapolate_trend='freq')

    # Compute strength of seasonality
    resid = decomposition.resid.dropna()
    seasonal = decomposition.seasonal.loc[resid.index]

    var_resid = np.var(resid)
    var_combined = np.var(resid + seasonal)

    if var_combined == 0:
        return "low"  # Avoid division by zero or flat series

    strength = 1 - (var_resid / var_combined)

    return "high" if strength >= threshold else "low"


def select_category(ts: pd.Series):
    """Determines to which category a time series belongs."""
    if len(ts) < 48:
        return "Category 1"
    elif check_seasonality_strength(ts) == "high" and (ts == 0).any():
        return "Category 2"
    elif check_seasonality_strength(ts) == "high" and (ts > 0).all():
        return "Category 3"
    elif check_seasonality_strength(ts) == "low" and (ts == 0).any():
        return "Category 4"
    else:
        return "Category 5"


#--------------------------------------------------------

categories = {
    'Category 1': ["naive", "drift"],
    'Category 2': ["naive", "ETS", "seasonal naive"],
    'Category 3': ["naive", "AutoARIMA", "ETS", "seasonal naive"],
    'Category 4': ["naive", "AutoARIMA", "drift", "mean"],
    'Category 5': ["naive", "AutoARIMA", "mean"]
}

file_name, product_classes_to_forecast, metric = get_user_inputs()
print(
    f"\nInputs received:\nFile: {file_name}\nProduct Classes to Forecast: {product_classes_to_forecast}\nMetric: {metric}")

ts_dict = load_selected_timeseries(file_name, product_classes_to_forecast)

for p_class, series in ts_dict.items():
    print(f"\nTime Series for {p_class}:")
    print(series)
    print(select_category(series))



Inputs received:
File: proj1_exampleinput.csv
Product Classes to Forecast: ['C1038', 'C2716', 'C6022']
Metric: mape

Time Series for C1038:
Month
2013-01-01    14212.0
2013-02-01    11424.0
2013-03-01    13432.0
2013-04-01    13078.0
2013-05-01    14356.0
2013-06-01    17142.0
2013-07-01    17377.0
2013-08-01    13449.0
2013-09-01    10185.0
2013-10-01    10455.0
2013-11-01    11432.0
2013-12-01    31873.0
2014-01-01    11997.0
2014-02-01     9787.0
2014-03-01     9037.0
2014-04-01     7863.0
2014-05-01     9472.0
2014-06-01    14928.0
2014-07-01    23856.0
2014-08-01    26547.0
2014-09-01    19960.0
2014-10-01    18338.0
2014-11-01    21364.0
2014-12-01    45749.0
2015-01-01    12426.0
2015-02-01    12485.0
2015-03-01    15622.0
2015-04-01    15469.0
2015-05-01    17695.0
2015-06-01    14329.0
2015-07-01    14573.0
2015-08-01    17184.0
2015-09-01    15990.0
2015-10-01    17959.0
2015-11-01    14598.0
2015-12-01    32284.0
2016-01-01    11957.0
2016-02-01    12719.0
2016-03-01    138

In [16]:
from dateutil.relativedelta import relativedelta
from tqdm import tqdm
import warnings
from datetime import datetime
import tempfile
from statsforecast.models import AutoARIMA
from statsforecast import StatsForecast


def naive_forecast(series: pd.Series, horizon: int) -> np.ndarray:
    last_value = series.iloc[-1]
    return np.repeat(last_value, horizon)


def drift_forecast(series: pd.Series, horizon: int) -> np.ndarray:
    n = len(series)
    if n < 2:
        return np.repeat(series.iloc[-1], horizon)
    drift = (series.iloc[-1] - series.iloc[0]) / (n - 1)
    return series.iloc[-1] + drift * np.arange(1, horizon + 1)


def seasonal_naive_forecast(series: pd.Series, horizon: int, season_length: int = 12) -> np.ndarray:
    if len(series) < season_length:
        raise ValueError("Not enough data for seasonal naive forecast")
    last_season = series.iloc[-season_length:]
    reps = int(np.ceil(horizon / season_length))
    return np.tile(last_season.values, reps)[:horizon]


from statsmodels.tsa.holtwinters import ExponentialSmoothing


def ets_forecast(series: pd.Series, horizon: int) -> np.ndarray:
    try:
        # try multiplicative
        model = ExponentialSmoothing(series, trend="add", seasonal="mul", seasonal_periods=12)
        fitted_model = model.fit()
    except Exception:
        # additive
        model = ExponentialSmoothing(series, trend="add", seasonal="add", seasonal_periods=12)
        fitted_model = model.fit()
    return fitted_model.forecast(horizon)


def autoarima_forecast(series: pd.Series, horizon: int) -> np.ndarray:

    df = pd.DataFrame({
        'unique_id': ['ts1'] * len(series),
        'ds': series.index,
        'y': series.values
    })

    with tempfile.TemporaryDirectory() as tmpdir:
        sf = StatsForecast(
            models=[AutoARIMA(season_length=12)],
            freq='MS',
            n_jobs=1
        )
        forecasts_df = sf.forecast(df=df, h=horizon)
        return forecasts_df['AutoARIMA'].values


class HistoricAverageModel:
    def __init__(self):
        self.mean = None

    def fit(self, series: pd.Series):
        self.mean = series.mean()

    def forecast(self, horizon: int) -> np.ndarray:
        return np.repeat(self.mean, horizon)


In [24]:

def simulate_forecasting(ts_dict, categories, metric, forecast_horizon=12, retrain_every=6, eval_periods=24):
    all_results = []

    for ts_id, series in tqdm(ts_dict.items(), desc="Processing Time Series"):
        series = series.dropna()
        category = select_category(series)
        models = categories[category]

        if len(series) < 32:
            print(f"\nSkipping {ts_id} - only {len(series)} observations.")
            continue

        max_date = series.index.max()

        for i in range(0, eval_periods, retrain_every):
            train_end = max_date - relativedelta(months=(forecast_horizon + i))
            test_start = train_end + relativedelta(months=1)
            test_end = test_start + relativedelta(months=forecast_horizon - 1)

            train = series[:train_end]
            test = series[test_start:test_end]

            if len(test) < forecast_horizon or len(train) < 12:
                continue

            for model_name in models:
                try:
                    # Select prediction method
                    if model_name == "naive":
                        pred = naive_forecast(train, forecast_horizon)
                    elif model_name == "drift":
                        pred = drift_forecast(train, forecast_horizon)
                    elif model_name == "seasonal naive":
                        pred = seasonal_naive_forecast(train, forecast_horizon)
                    elif model_name == "ETS":
                        pred = ets_forecast(train, forecast_horizon)
                    elif model_name == "AutoARIMA":
                        pred = autoarima_forecast(train, forecast_horizon)
                    elif model_name == "mean":
                        model = HistoricAverageModel()
                        model.fit(train)
                        pred = model.forecast(forecast_horizon)
                    else:
                        continue  # Skip unknown model
                except Exception as e:
                    print(f"Model {model_name} failed for {ts_id}: {e}")
                    pred = np.full(forecast_horizon, np.nan)

                # Evaluate
                pred = pd.Series(pred, index=test.index)
                if metric == "mape":
                    if (test == 0).any():
                        warnings.warn(f"MAPE warning for {ts_id} – true values contain zeros.")
                    error = np.mean(np.abs((test - pred) / np.maximum(test, 1e-8)))
                else:
                    error = np.mean((test - pred) ** 2)

                all_results.append({
                    "ts_id": ts_id,
                    "category": category,
                    "model": model_name,
                    "train_end": train_end.date(),
                    "test_start": test_start.date(),
                    "test_end": test_end.date(),
                    metric: error
                })

    return pd.DataFrame(all_results)


In [25]:
results_df = simulate_forecasting(
    ts_dict=ts_dict,
    categories=categories,
    metric=metric,
    forecast_horizon=12,
    retrain_every=6,
    eval_periods=24
)

results_df

  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
  self._init_dates(dates, freq)
Processing Time Series: 100%|██████████| 3/3 [00:01<00:00,  2.74it/s]


Unnamed: 0,ts_id,category,model,train_end,test_start,test_end,mape
0,C1038,Category 3,naive,2016-07-01,2016-08-01,2017-07-01,0.226352
1,C1038,Category 3,ARIMA,2016-07-01,2016-08-01,2017-07-01,0.22892
2,C1038,Category 3,ETS,2016-07-01,2016-08-01,2017-07-01,0.363437
3,C1038,Category 3,seasonal naive,2016-07-01,2016-08-01,2017-07-01,0.233362
4,C1038,Category 3,naive,2016-01-01,2016-02-01,2017-01-01,0.212497
5,C1038,Category 3,ARIMA,2016-01-01,2016-02-01,2017-01-01,0.296125
6,C1038,Category 3,ETS,2016-01-01,2016-02-01,2017-01-01,0.19124
7,C1038,Category 3,seasonal naive,2016-01-01,2016-02-01,2017-01-01,0.15681
8,C1038,Category 3,naive,2015-07-01,2015-08-01,2016-07-01,0.141168
9,C1038,Category 3,ARIMA,2015-07-01,2015-08-01,2016-07-01,0.125909


In [27]:
def evaluate_models(
    results_df: pd.DataFrame,
    ts_dict: dict,
    metric: str,
    naive_model_name: str = 'naive',
    verbose: bool = True
) -> pd.DataFrame:
    """
    Evaluate model performance and select the best model per time series.
    Warn if MAPE is used on series with zero values.

    Returns a DataFrame with columns: 'ts_id', 'best_model', 'metric', 'naive_metric'
    """
    if results_df.empty:
        print("No results to evaluate.")
        return pd.DataFrame(columns=['ts_id', 'best_model', metric, 'naive_metric'])

    if metric not in results_df.columns:
        raise ValueError(f"Metric '{metric}' not found in results_df columns: {results_df.columns.tolist()}")

    # Warn about MAPE on zero values
    if metric.lower() == 'mape':
        zero_series = [ts_id for ts_id, ts in ts_dict.items() if (ts == 0).any()]
        if zero_series:
            print("WARNING: MAPE is being used on series with zero values:")
            print("Affected time series IDs:", zero_series)

    # Compute mean error per model per series
    avg_perf = (
        results_df
        .groupby(['ts_id', 'model'])[metric]
        .mean()
        .reset_index()
        .rename(columns={metric: f'avg_{metric}'})
    )

    # Find best model per time series (lowest error)
    best_model_df = (
        avg_perf.loc[
            avg_perf.groupby('ts_id')[f'avg_{metric}'].idxmin()
        ]
        .reset_index(drop=True)
    )
    best_model_df.rename(columns={
        'model': 'best_model',
        f'avg_{metric}': metric
    }, inplace=True)

    # Add baseline (naive) model's performance
    naive_perf = avg_perf[avg_perf['model'] == naive_model_name][['ts_id', f'avg_{metric}']]
    naive_perf.rename(columns={f'avg_{metric}': 'naive_metric'}, inplace=True)

    result_df = pd.merge(best_model_df, naive_perf, on='ts_id', how='left')

    # Ensure desired column order
    result_df = result_df[['ts_id', 'best_model', metric, 'naive_metric']]

    if verbose:
        print("Best model per time series based on lowest average error:")
        print(result_df)

    return result_df

best_models_df = evaluate_models(results_df, ts_dict, metric)

Best model per time series based on lowest average error:
   ts_id best_model      mape  naive_metric
0  C1038      naive  0.199065      0.199065
1  C2716      naive  0.051749      0.051749
2  C6022      drift  0.212629      0.250856
