# Darts Time Series Lab

In [None]:
# Import libraries
import matplotlib.pyplot as plt

import numpy as np
import pandas as pd
from darts_config import *
from darts_models import (
    NaiveSeasonalGrowth
)

from darts_functions import (
    create_csv,
    full_series_backtesting,
    full_series_backtesting_nn,
    count_zeroes,
    plot_models,
    custom_backtest,
    Optimizer,
)

%matplotlib inline
import warnings

from darts import TimeSeries
from darts.dataprocessing.transformers import Scaler
from darts.metrics import rmse
from darts.models import (
    ExponentialSmoothing,
    NaiveDrift,
    NaiveMean,
    NaiveMovingAverage,
    NaiveSeasonal,
    NHiTSModel,
    Prophet,
    StatsForecastAutoCES,
    StatsForecastAutoETS,
    StatsForecastAutoTheta,
    TiDEModel,
    RegressionEnsembleModel,
    NaiveEnsembleModel
)
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from collections import Counter

warnings.filterwarnings("ignore")
warnings.simplefilter("ignore", category=FutureWarning)

### Data Loading

In [None]:
# Read sales data
file_path = file_name
raw_df = pd.read_excel(file_path)

# Get the index of the forecast date
FORECAST_DATE = raw_df["Forecast Date"][0]
end_col = raw_df.columns.get_loc(FORECAST_DATE)

# Get the index of the first date column
start_col = 0
for index, value in enumerate(raw_df.columns):
    try:
        pd.to_datetime(value, format="%d/%m/%Y")
        start_col = index
        break
    except ValueError:
        pass

df = raw_df.copy()
df = df.iloc[:, start_col:end_col]  # slice dataset to include only relevant dates

### Preprocessing

In [None]:
# Preprocess the dataset

df.columns = pd.to_datetime(df.columns, format="%d%m%Y")  # convert date columns to datetime and transpose dataframe
df.set_index(raw_df["Item Code"], inplace=True)  # set item code as index
df = df.T  # transpose dataframe
df.columns = df.columns.astype(str)  # convert item names to string
df.dropna(axis=1, inplace=True)  # drop all columns with at least one NaN value
df.iloc[:, 1:] = df.iloc[:, 1:].clip(lower=0)  # clip negative values to zero
df = df.loc[
    :, (df == 0).sum(axis=0) <= (ZEROS_MAX_PERCENT * df.shape[0])
]  # drop columns with more than n percent of zeros
df.replace(0, np.nan, inplace=True)  # replace zeros with NaN
df.interpolate(method="linear", axis=1, inplace=True)  # interpolate missing values column wise
df.fillna(method="ffill", inplace=True)  # forward fill missing values

df.shape

In [None]:
# Sanity check for date range
print(f"Actual sales dates range from {df.index[0]} to {df.index[-1]}")

In [None]:
# Sanity check for no zero or nan values
print(f" Number of Zeros: {count_zeroes(df)}\n Number of NaNs: {(df.isna().sum().sum())}")

### Model Preparation

In [None]:
# Create list of series
all_series = [TimeSeries.from_dataframe(df, value_cols=col) for col in df.columns]

# Scale data for improved modeling
scaler = Scaler()
all_series_scaled = [scaler.fit_transform(series) for series in all_series]  # scale each series object

# Set train window of backtesting
TRAIN_WINDOW = len(df) - VAL_WINDOW

# Model Testing

In [None]:
# Choose what to run
RUN_StatModels = "Yes"
GET_StatModels_results = "Yes"

RUN_DeepLearning = "Yes"
GET_DeepLearning_results = "Yes"

RUN_ExSmoothing_NHITS = "Yes"
PLOT_NHiTS = "Yes"
PLOT_Loop = "Yes"

### Baseline Modeling

In [None]:
# Initialize our Naive Seasonal Growth model
baseline_model = NaiveSeasonalGrowth(K=SEASONALITY)

### Statisical Modeling

In [None]:
# Choose statistical models for testing
stat_models = [
    (StatsForecastAutoETS, {"season_length": SEASONALITY}),
    (StatsForecastAutoCES, {"season_length": SEASONALITY}),
    (StatsForecastAutoTheta, {"season_length": SEASONALITY}),
    (ExponentialSmoothing, {"seasonal_periods": SEASONALITY}),
    (NaiveSeasonal, {"K": SEASONALITY}),
    (NaiveDrift, {}),
    (NaiveMovingAverage, {"input_chunk_length": SEASONALITY}),
    (NaiveMean, {}),
    (
        Prophet,
        {
            "add_seasonality": {
                "name": "monthly",
                "seasonal_periods": DAYS_PER_MONTH,
                "fourier_order": FOURIER_ORDER,
            }
        },
    ),
]

In [None]:
# Fit statistical models using backtesting
if RUN_StatModels == "Yes":
    stat_results, stat_instances = full_series_backtesting(
        model_list=stat_models,
        data=df,
        series_list=all_series,
        file_path=file_path,
        train_percentage=TRAIN_WINDOW,
        prediction_length=FORECAST_PERIODS,
    )

In [None]:
# Get model results
if GET_StatModels_results == "Yes":
    create_csv(stat_results, file_path, "stat__results")

In [None]:
# Find the number of times a model won
results = {}
for index, row in stat_results.iterrows():
    item = row[0]
    subset = stat_results[stat_results["Item"] == item]
    optimizer = Optimizer(subset)
    model = optimizer.get_best_model(column_name="RMSE")
    results[item] = model

model_counts = Counter(results.values())
model_counts

### Backtest Validation Testing

In [None]:
# Validate that number of backtest predictions is 3
NUM_PLOTS = 1
for series in all_series[:NUM_PLOTS]:
    model = NaiveMean()
    forecast = custom_backtest(model=model, series=series, train_window=TRAIN_WINDOW, forecast_hoizon=FORECAST_PERIODS)

    baseline_forecast = custom_backtest(
        model=baseline_model, series=series, train_window=TRAIN_WINDOW, forecast_hoizon=FORECAST_PERIODS
    )

    plot_models(model=model, series=series, forecast=forecast, baseline_forecast=baseline_forecast, label="forecast")

## Ensemble Modeling

In [None]:
# Gather subsetted data and optimized models for specified SKU
SKU_INDEX = 30
series = all_series[SKU_INDEX]
if RUN_StatModels == "Yes":
    stat_subset = stat_results[stat_results["Item"] == series.columns[0]]
    stat_optimizer = Optimizer(stat_subset)
    stat_model = stat_optimizer.get_best_model(column_name="RMSE")

stat_subset.head(3)

### Naive Ensemble

Naive ensembling simply takes the average (mean) of the forecasts generated by the ensembled forecasting models.

In [None]:
# Fit ensemble models against best model

if RUN_StatModels == "Yes":
    NUM_PLOTS = 40
    baseline_model = NaiveSeasonalGrowth(K=12)
    for series in all_series[:NUM_PLOTS]:
        # Retrieve winning model for series
        item = series.components[0]
        subset = stat_results[stat_results["Item"] == item]
        optimizer = Optimizer(subset)

        # Initialize fitted ensemble model
        best_model = optimizer.get_best_model(column_name="RMSE")
        winning_model = stat_instances[best_model]
        print("Winning Model:", best_model)

        ensemble_models = [winning_model, baseline_model]
        naive_ensemble_model = NaiveEnsembleModel(forecasting_models=ensemble_models)
        regression_ensemble_model = RegressionEnsembleModel(
            forecasting_models=ensemble_models, regression_train_n_points=SEASONALITY
        )

        # Predictions
        winning_model_forecast = custom_backtest(
            model=winning_model,
            series=series,
            train_window=TRAIN_WINDOW,
            forecast_hoizon=FORECAST_PERIODS,
            verbose=False,
        )

        naive_ensemble_forecast = custom_backtest(
            model=naive_ensemble_model,
            series=series,
            train_window=TRAIN_WINDOW,
            forecast_hoizon=FORECAST_PERIODS,
            verbose=False,
        )

        regression_ensemble_forecast = custom_backtest(
            model=regression_ensemble_model,
            series=series,
            train_window=TRAIN_WINDOW,
            forecast_hoizon=FORECAST_PERIODS,
            verbose=False,
        )

        # Calculate RMSE
        rmse_winning_model = rmse(series, winning_model_forecast)
        rmse_naive_ensemble = rmse(series, naive_ensemble_forecast)
        rmse_regression_ensemble = rmse(series, regression_ensemble_forecast)

        # Plotting
        plt.figure(figsize=(12, 6))
        series[-16:].plot(label=item)
        winning_model_forecast.plot(label=f"Winning Model (RMSE: {rmse_winning_model:.2f})")
        naive_ensemble_forecast.plot(label=f"Naive Ensemble (RMSE: {rmse_naive_ensemble:.2f})")
        regression_ensemble_forecast.plot(label=f"Regression Ensemble (RMSE: {rmse_regression_ensemble:.2f})")
        plt.title("Ensemble Model Testing")
        plt.legend()
        plt.show()

### Deep Learning Modeling

In [None]:
# Establish callbacks for deep learning models
pl_trainer_kwargs["callbacks"] = [EarlyStopping(**early_stopping_args)]  # callbacks need to be passed as a list

# Add extra parameter to TiDEModel
tide_model_args = common_model_args.copy()
tide_model_args.update(
    {
        "use_reversible_instance_norm": True,
    }
)

# Choose deep learning models for testing
deep_learning_models = [
    (
        NHiTSModel,
        common_model_args,
    ),  # DARTS configues the model and training parameters (number of layers,nodes, and type of layers (convolutional for NHiTS))
    (TiDEModel, tide_model_args),
]  # 100% of terminal output from pytorch lightning, which executes the training

In [None]:
# Fit deep learning models
if RUN_DeepLearning == "Yes":
    nn_results, nn_instances = full_series_backtesting_nn(
        model_list=deep_learning_models,
        data=df,
        series_list=all_series,
        file_path=file_path,
        train_percentage=TRAIN_PERCENTAGE,
    )

In [None]:
# Create csv with deep learning results
if GET_DeepLearning_results == "Yes":
    create_csv(nn_results, file_path, "nn_results")

### Deep Learning Winnning Model

In [None]:
# Fit naive ensemble model against baseline model

if RUN_DeepLearning == "Yes":
    NUM_PLOTS = 5
    scaler = Scaler()
    baseline_model = NaiveSeasonalGrowth(K=SEASONALITY)
    for series in all_series[:NUM_PLOTS]:
        # Retrieve winning model for series
        item = series.components[0]
        subset = nn_results[nn_results["Item"] == item]
        optimizer = Optimizer(subset)
        series_scaled = scaler.fit_transform(series)

        best_model = optimizer.get_best_model(column_name="RMSE")
        model = nn_instances[best_model]
        print("Winning Model:", best_model)

        forecast = custom_backtest(
            model=model,
            series=series_scaled,
            train_window=TRAIN_WINDOW,
            forecast_hoizon=FORECAST_PERIODS,
            retrain=False,
            verbose=False,
        )

        baseline_forecast = custom_backtest(
            model=baseline_model,
            series=series_scaled,
            train_window=TRAIN_WINDOW,
            forecast_hoizon=FORECAST_PERIODS,
            retrain=True,
            verbose=False,
        )

        forecast = scaler.inverse_transform(forecast)  # inverse scaled predictions
        baseline_forecast = scaler.inverse_transform(baseline_forecast)  # inverse scaled predictions

        # Calculate RMSE
        rmse_winning_model = rmse(series, forecast)
        rmse_baseline = rmse(series, baseline_forecast)

        # Plotting
        plt.figure(figsize=(12, 6))
        series[-16:].plot(label=item)
        forecast.plot(label=f"Neural Network (RMSE: {rmse_winning_model:.2f})")
        baseline_forecast.plot(label=f"NaiveGrowthSeasonal (RMSE: {rmse_baseline:.2f})")
        plt.title("Deep Learning Testing")
        plt.legend()
        plt.show()

### N-HiTS vs. Exponential Smoothing

In [None]:
# Choose an SKU to forecast
SKU_NAME = "ADR-7001"
SKU_INDEX = df.columns.get_loc(SKU_NAME)

# Create series
series = all_series[SKU_INDEX]
seres_scaled = all_series_scaled[SKU_INDEX]

In [None]:
# Train-Test split
train_series, val_series = series[:-FORECAST_PERIODS], series[-FORECAST_PERIODS:]
print(f"Training set includes {len(train_series)} months & Validation set includes {len(val_series)} months")

In [None]:
# Instantiate the models
if RUN_ExSmoothing_NHITS == "Yes":
    model_es = ExponentialSmoothing(seasonal_periods=SEASONALITY)
    model_es.fit(train_series)  # fit on scaled training series
    model_nh = nn_instances["NHiTSModel"]  # fit on scaled training series

    # Predict forecasting periods for each model
    pred_es = model_es.predict(FORECAST_PERIODS)
    pred_nh = model_nh.predict(FORECAST_PERIODS, series=train_series)

In [None]:
# Optional: Sort standard deviation for each series to find a stable series for plotting
if RUN_ExSmoothing_NHITS == "Yes":
    std_devs = {}
    for i, series in enumerate(all_series):
        series_df = series.pd_dataframe()
        std_dev = series_df.std().iloc[0]
        std_devs[f"Series {i}"] = std_dev

    sorted(std_devs.items(), key=lambda item: item[1], reverse=False)[:5]  # top 10 series by lowest standard deviation

In [None]:
# Plot scaled data
if PLOT_NHiTS == "Yes":
    plt.figure(figsize=(12, 6))
    series[-36:].plot(label="actual sales")  # scaled series
    pred_nh.plot(label="n-hits forecast", lw=3)  # scaled series
    plt.legend()
    plt.title("Sales Forecasting Comparison")
    plt.xlabel("Date")
    plt.grid(True)
    plt.tight_layout()
    plt.show()

In [None]:
# Compare results for each model
if RUN_ExSmoothing_NHITS == "Yes":
    hits_metric = METRIC(pred_nh, val_series)
    es_metric = METRIC(pred_es, val_series)

    print(f"N-HiTS {METRIC_NAME} = {hits_metric:.2f}")
    print(f"Exponential Smoothing {METRIC_NAME} = {es_metric:.2f}")
    print(f"Improvement = {((es_metric - hits_metric) / es_metric) * 100:.2f}%")

### Plot Loop

In [None]:
# Loop through all series to compare forecasting models
if PLOT_Loop == "Yes":
    for i, item_series in enumerate(all_series_scaled):
        train_series, val_series = item_series[:-FORECAST_PERIODS], item_series[-FORECAST_PERIODS:]
        print(
            f"Processing SKU {i}: Training set includes {len(train_series)} months & Validation set includes {len(val_series)} months"
        )

        # Fit models
        pred_nh = model_nh.predict(FORECAST_PERIODS, series=train_series)

        # You might need to fit the model inside the loop if it requires retraining for each series
        model_es.fit(train_series)
        pred_es = model_es.predict(FORECAST_PERIODS)

        # Plotting
        item_series.plot(label="Actual sales", lw=2)  # Original series
        pred_nh.plot(label="N-HiTS forecast", lw=2)
        # pred_tide.plot(label="TiDE forecast", lw=2)
        pred_es.plot(label="Exponential Smoothing forecast", lw=2)
        plt.legend()
        plt.title(f"Sales Forecasting Comparison for SKU {i}")
        plt.xlabel("Date")
        plt.ylabel("Scaled Sales")
        plt.grid(True)
        plt.tight_layout()
        plt.show()