Time Series Processing and Forecasting with Darts

This notebook demonstrates a comprehensive pipeline for processing time series data and forecasting using the Darts library. The steps include data loading, preprocessing, splitting, model training, prediction, backtesting, and evaluation.

In [1]:
import logging
import warnings
from datetime import datetime

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from darts import TimeSeries
from darts.dataprocessing import Pipeline
from darts.dataprocessing.transformers import (
    MissingValuesFiller,
    Scaler,
    StaticCovariatesTransformer,
)
from darts.metrics import mae, mape, r2_score, rmse
from darts.models import TFTModel
from darts.utils.statistics import check_seasonality, plot_acf
from darts.utils.timeseries_generation import datetime_attribute_timeseries
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from darts.explainability import TFTExplainer
from darts.utils.likelihood_models import QuantileRegression
from scipy.optimize import minimize
from scipy.special import expit as sigmoid
from tqdm import tqdm

Setup Logging and Suppress Warnings

Configure the logging system to capture and display information throughout the pipeline execution. Suppress any non-critical warnings to keep the output clean.


In [2]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("timeseries_processing.log"),
        logging.StreamHandler(),
    ],
)
logger = logging.getLogger(__name__)

warnings.filterwarnings("ignore")

Load and Prepare Data

Load the preprocessed blood data from a CSV file and convert relevant columns to datetime objects. Ensure consistent data types across all columns for reliable processing.

In [3]:
logger.info("Loading data...")
# Load the data
df = pd.read_csv(
    "data/df_blood_preprocessed.csv",
)

# Convert date columns to datetime
logger.info("Converting date columns...")
df["ds"] = pd.to_datetime(df["ds"])
df["first_infusion_date"] = pd.to_datetime(df["first_infusion_date"])
df["next_infusion"] = pd.to_datetime(df["next_infusion"])

# Define data types for consistency
dtype_map = {
    "ds": "datetime64[ns]",
    "first_infusion_date": "datetime64[ns]",
    "next_infusion": "datetime64[ns]",
    "unique_id": str,
    "sex": int,
    "age_at_diagdate": float,
    "weight_change_cycles": float,
    "component": str,
    "value": float,
    "unit": str,
    "days_since_first_infusion": int,
    "infno_day": int,
    "infno": int,
}

logger.info("Converting data types...")
df = df.astype(dtype_map)

2024-10-24 21:48:06,695 - INFO - Loading data...
2024-10-24 21:48:06,890 - INFO - Converting date columns...
2024-10-24 21:48:06,929 - INFO - Converting data types...


Anonymize Unique Patient IDs

For privacy reasons, anonymize the unique_id column by converting numerical digits to corresponding letters. This ensures that patient identities are protected in the dataset.


In [4]:
def anonymize_unique_id(df):
    logger.info("Anonymizing patient IDs...")
    digit_to_letter = {str(i): chr(ord("a") + i - 1) for i in range(1, 10)}
    digit_to_letter["0"] = "j"
    df["unique_id"] = (
        df["unique_id"]
        .astype(str)
        .apply(lambda x: "".join(digit_to_letter.get(d, "k") for d in x))
    )
    return df

df = anonymize_unique_id(df)
df = df.drop(columns=["unit", "transer", "transth"])

2024-10-24 21:48:06,969 - INFO - Anonymizing patient IDs...


Define Target Variable

Specify the target column for forecasting. In this case, we aim to predict Neutrophilocytes_B.


In [5]:
TARGET_COLUMN = "Neutrophilocytes_B"

Time Series Preprocessor Class

Define a TimeSeriesPreprocessor class to handle various preprocessing tasks such as handling missing values, encoding temporal information, and completing the timeline. This class ensures that the data is clean and ready for modeling.

In [6]:
class TimeSeriesPreprocessor:
    def __init__(
        self,
        target_column=None,
        complete_timeline=False,
        encode_temporal_distance=False,
        add_decay=False,
        fill_residual_nan=False,
        exclude_columns=None,
    ):
        self.target_column = target_column
        self.complete_timeline = complete_timeline
        self.encode_temporal_distance = encode_temporal_distance
        self.add_decay = add_decay
        self.decay_params = {}
        self.beta_params = {}
        self.fill_residual_nan = fill_residual_nan
        self.exclude_columns = exclude_columns or []
        logger.info(
            f"Initialized TimeSeriesPreprocessor with target column: {target_column}"
        )

    def _calculate_delta_matrix(self, df, col):
        """Calculate delta matrix according to Equation 1"""
        logger.debug(f"Calculating delta matrix for column: {col}")
        df = df.sort_values(["unique_id", "normalized_time"])
        mask = (~df[col].isna()).astype(int)
        time_diff = df.groupby("unique_id")["normalized_time"].diff()

        delta = pd.Series(index=df.index, dtype=float)

        for uid in tqdm(
            df["unique_id"].unique(), desc=f"Processing delta matrix for {col}"
        ):
            mask_group = mask[df["unique_id"] == uid]
            time_diff_group = time_diff[df["unique_id"] == uid]
            delta_group = pd.Series(index=mask_group.index, dtype=float)
            delta_group.iloc[0] = 0

            for t in range(1, len(mask_group)):
                if mask_group.iloc[t - 1] == 0:
                    delta_group.iloc[t] = (
                        time_diff_group.iloc[t] + delta_group.iloc[t - 1]
                    )
                else:
                    delta_group.iloc[t] = time_diff_group.iloc[t]

            delta[delta_group.index] = delta_group

        return delta

    def _calculate_decay_parameter(self, delta, W_gamma=0.1, b_gamma=0.0):
        """Calculate temporal decay parameter gamma according to Equation 2"""
        logger.debug("Calculating decay parameter")
        return np.exp(-np.maximum(0, W_gamma * delta + b_gamma))

    def _initialize_missing_values(self, df, col, gamma):
        """Initialize missing values according to Equation 3"""
        logger.debug(f"Initializing missing values for column: {col}")
        mask = (~df[col].isna()).astype(int)
        col_mean = df[col].mean()
        last_observed = df.groupby("unique_id")[col].ffill()
        initialized_values = gamma * last_observed + (1 - gamma) * col_mean
        df[col] = df[col].fillna(initialized_values)
        return df

    def _calculate_beta_parameter(self, gamma, mask, W_beta=0.1, b_beta=0.0):
        """Calculate trade-off parameter beta according to Equation 4"""
        logger.debug("Calculating beta parameter")
        combined = np.column_stack([gamma, mask])
        return sigmoid(np.dot(combined, W_beta) + b_beta)

    def prepare_and_process(self, df):
        logger.info("Starting data preparation and processing...")
        df = df.copy()
        df = self._initial_preparation(df)

        if self.complete_timeline:
            logger.info("Completing timeline...")
            df = self._complete_timeline(df)

        feature_columns = [
            col
            for col in df.columns
            if col not in self.exclude_columns
            and not col.endswith(("_delta", "_missing"))
        ]

        logger.info("Processing features with temporal knowledge...")
        for col in tqdm(feature_columns, desc="Processing features"):
            if df[col].isna().any():
                delta = self._calculate_delta_matrix(df, col)
                gamma = self._calculate_decay_parameter(delta)
                df = self._initialize_missing_values(df, col, gamma)
                mask = (~df[col].isna()).astype(int)
                beta = self._calculate_beta_parameter(gamma, mask)
                self.decay_params[col] = {"gamma": gamma, "delta": delta}
                self.beta_params[col] = beta

        if self.encode_temporal_distance:
            logger.info("Encoding temporal distance...")
            df = self._encode_temporal_distance(df)

        if self.fill_residual_nan:
            logger.info("Filling residual NaN values...")
            df = self._fill_residual_nan(df)

        logger.info("Data preparation and processing completed")
        return df

    def _initial_preparation(self, X):
        logger.info("Performing initial data preparation...")
        X = self._aggregate_daily(X)
        X["ds_date"] = pd.to_datetime(X["ds_date"])
        X["normalized_time"] = X.groupby("unique_id")["ds_date"].transform(
            lambda x: (x - x.min()).dt.days
        )

        # Calculate infno_day here
        X["infno_first_time"] = X.groupby(["unique_id", "infno"])[
            "normalized_time"
        ].transform("min")
        X["infno_day"] = X["normalized_time"] - X["infno_first_time"]
        X.drop(columns=["infno_first_time"], inplace=True)

        X = X.pivot_table(
            index=[
                "unique_id",
                "normalized_time",
                "age_at_diagdate",
                "sex",
                "infno",
                "infno_day",
                "ds_date",
            ],
            columns="component",
            values="value",
            aggfunc="first",
        ).reset_index()

        if self.target_column in X.columns:
            X = X.rename(columns={self.target_column: "y"})
        else:
            raise ValueError(
                f"Target column '{self.target_column}' not found in DataFrame."
            )

        return X

    def _aggregate_daily(self, X):
        logger.info("Aggregating daily data...")
        X["ds_date"] = X["ds"].dt.date
        grouped = X.groupby(
            ["unique_id", "ds_date", "age_at_diagdate", "sex", "infno", "component"]
        )
        daily_data = grouped.agg({"value": "mean"}).reset_index()
        return daily_data

    def _complete_timeline(self, df):
        logger.info("Completing timeline...")
        unique_ids = df["unique_id"].unique()
        complete_timeline = []

        for uid in tqdm(unique_ids, desc="Processing timelines"):
            uid_df = df[df["unique_id"] == uid]
            min_time = uid_df["normalized_time"].min()
            max_time = uid_df["normalized_time"].max()
            timeline = pd.DataFrame(
                {"unique_id": uid, "normalized_time": np.arange(min_time, max_time + 1)}
            )
            complete_timeline.append(timeline)

        complete_timeline = pd.concat(complete_timeline, ignore_index=True)
        df = pd.merge(
            complete_timeline, df, on=["unique_id", "normalized_time"], how="left"
        )

        static_columns = ["unique_id", "age_at_diagdate", "sex"]
        for col in static_columns:
            df[col] = df.groupby("unique_id")[col].ffill().bfill()

        dynamic_columns = ["infno", "infno_day"]
        for col in dynamic_columns:
            df[col] = df.groupby("unique_id")[col].ffill()

        return df

    def _encode_temporal_distance(self, df):
        logger.info("Encoding temporal distance...")
        feature_columns = [
            col
            for col in df.columns
            if col not in self.exclude_columns and not col.endswith("_delta")
        ]
        temporal_distance_data = {}

        for col in tqdm(feature_columns, desc="Processing temporal distances"):
            if not col.endswith("_delta"):
                temporal_distance_series = (
                    df.groupby("unique_id")[col]
                    .apply(
                        lambda x: x.isna()
                        .astype(int)
                        .groupby(x.notna().astype(int).cumsum())
                        .cumsum()
                    )
                    .astype(float)
                )
                temporal_distance_data[col + "_delta"] = temporal_distance_series

        temporal_distance_df = pd.DataFrame(temporal_distance_data)
        temporal_distance_df = temporal_distance_df.set_index(df.index)
        df = pd.concat([df, temporal_distance_df], axis=1)
        return df

    def _fill_residual_nan(self, df):
        logger.info("Filling residual NaN values...")
        feature_columns = [col for col in df.columns if col not in ["unique_id", "ds"]]
        df[feature_columns] = df.groupby("unique_id")[feature_columns].ffill()
        df[feature_columns] = df.groupby("unique_id")[feature_columns].bfill()
        return df

Initialize and Execute Preprocessor

Create an instance of the TimeSeriesPreprocessor with specified parameters and process the dataset to prepare it for modeling.

In [7]:
# Initialize preprocessor
logger.info("Initializing preprocessor...")
preprocessor = TimeSeriesPreprocessor(
    target_column=TARGET_COLUMN,
    complete_timeline=True,
    encode_temporal_distance=True,
    add_decay=True,
    fill_residual_nan=True,
    exclude_columns=[
        "unique_id",
        "normalized_time",
        "sex",
        "age_at_diagdate",
        "infno",
        "infno_day",
        "ds_date",
    ],
)

# Process the data
logger.info("Processing data...")
processed_df = preprocessor.prepare_and_process(df)

2024-10-24 21:48:07,197 - INFO - Initializing preprocessor...
2024-10-24 21:48:07,197 - INFO - Initialized TimeSeriesPreprocessor with target column: Neutrophilocytes_B
2024-10-24 21:48:07,198 - INFO - Processing data...
2024-10-24 21:48:07,198 - INFO - Starting data preparation and processing...
2024-10-24 21:48:07,207 - INFO - Performing initial data preparation...
2024-10-24 21:48:07,208 - INFO - Aggregating daily data...
2024-10-24 21:48:07,409 - INFO - Completing timeline...
2024-10-24 21:48:07,409 - INFO - Completing timeline...
Processing timelines: 100%|██████████| 182/182 [00:00<00:00, 1404.81it/s]
2024-10-24 21:48:07,605 - INFO - Processing features with temporal knowledge...
Processing delta matrix for Alanine_transaminase_ALAT: 100%|██████████| 182/182 [00:02<00:00, 89.82it/s]
Processing delta matrix for Albumin_P: 100%|██████████| 182/182 [00:02<00:00, 84.24it/s]
Processing delta matrix for Alkaline_phosphatase_P: 100%|██████████| 182/182 [00:02<00:00, 83.94it/s]
Processin

In [None]:
# Save the processed data
processed_df.to_csv("data/processed_df.csv", index=False)

In [20]:
# head
processed_df.head()

Unnamed: 0,unique_id,normalized_time,age_at_diagdate,sex,infno,infno_day,ds_date,Alanine_transaminase_ALAT,Albumin_P,Alkaline_phosphatase_P,...,Promyelocytes_B_delta,Protein_cerebrospinal_fluid_CSF_delta,Reticulocytes_total_B_delta,Sodium_ion_P_delta,Thrombocytes_B_delta,Thyrotropin_TSH_delta,Triacylglycerol_lipase_delta,Triglycerides_P_pt_fasting_status_unknown_delta,Urate_delta,eGFR_DSKB_DNS_2009_delta
0,bjaaaaa,0,14.0,2.0,1.0,0.0,2011-05-16,238.0,32.0,82.0,...,1.0,1.0,1.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0
1,bjaaaaa,1,14.0,2.0,1.0,1.0,2011-05-17,162.0,31.997028,90.455602,...,2.0,2.0,2.0,0.0,0.0,2.0,2.0,2.0,2.0,2.0
2,bjaaaaa,2,14.0,2.0,1.0,2.0,2011-05-18,140.333333,30.0,71.0,...,3.0,3.0,3.0,0.0,0.0,3.0,3.0,3.0,3.0,3.0
3,bjaaaaa,3,14.0,2.0,1.0,2.0,2011-05-18,142.288006,30.187353,80.502391,...,4.0,4.0,4.0,0.0,0.0,4.0,4.0,4.0,4.0,4.0
4,bjaaaaa,4,14.0,2.0,1.0,2.0,2011-05-18,144.056667,30.356876,89.100509,...,5.0,5.0,5.0,0.0,0.0,5.0,5.0,5.0,5.0,5.0


Split Data into Train, Validation, and Test Sets

Define a function to split the unique patient IDs into training, validation, and test sets based on specified proportions. This ensures that the model is evaluated on unseen data.


In [8]:
# Function to split data into train, validation, and test sets
def split_data(unique_ids, test_size=0.2, val_size=0.1):
    logger.info("Splitting data into train, validation, and test sets...")
    np.random.seed(42)
    unique_ids = np.sort(unique_ids)
    n_samples = len(unique_ids)
    n_test = int(n_samples * test_size)
    n_val = int(n_samples * val_size)
    test_ids = unique_ids[-n_test:]
    val_ids = unique_ids[-(n_test + n_val) : -n_test]
    train_ids = unique_ids[: -(n_test + n_val)]
    return train_ids.tolist(), val_ids.tolist(), test_ids.tolist()

Create TimeSeries Objects

Define a function to convert patient data into Darts TimeSeries objects, including static and past covariates. This is essential for feeding the data into the forecasting model.


In [9]:
# Function to create TimeSeries objects
def create_time_series(patient_data, value_cols, static_covariates):
    logger.debug(f"Creating time series for columns: {value_cols}")
    patient_data = patient_data.copy()
    patient_data["date"] = pd.to_datetime(patient_data["normalized_time"], unit="D")
    patient_data = patient_data.sort_values("date")

    ts = TimeSeries.from_dataframe(
        df=patient_data,
        time_col="date",
        value_cols=value_cols,
        static_covariates=static_covariates,
        fill_missing_dates=True,
        freq="D",
        fillna_value=None,
    )

    return ts

Prepare Data for Model Training

Split the unique patient IDs into training, validation, and test sets. Then, create TimeSeries objects for each patient and organize them into corresponding lists.

In [10]:
# Prepare data for model training
logger.info("Preparing data for model training...")
unique_ids = processed_df["unique_id"].unique()
train_ids, val_ids, test_ids = split_data(unique_ids)

# Initialize lists for storing time series
train_list, val_list, test_list = [], [], []
train_covariates_list, val_covariates_list, test_covariates_list = [], [], []

logger.info("Creating time series for each patient...")
for unique_id in tqdm(unique_ids, desc="Processing patients"):
    patient_data = processed_df[processed_df["unique_id"] == unique_id]
    static_covariates = patient_data[["sex", "age_at_diagdate"]].iloc[0]
    value_cols = ["y"]

    # Create main series
    patient_series = create_time_series(patient_data, value_cols, static_covariates)

    # Create past covariates series
    covariate_cols = ["infno", "infno_day"]
    covariate_series = create_time_series(patient_data, covariate_cols, None)

    # Append to appropriate list
    if unique_id in train_ids:
        train_list.append(patient_series)
        train_covariates_list.append(covariate_series)
    elif unique_id in val_ids:
        val_list.append(patient_series)
        val_covariates_list.append(covariate_series)
    else:
        test_list.append(patient_series)
        test_covariates_list.append(covariate_series)

2024-10-24 21:50:33,591 - INFO - Preparing data for model training...
2024-10-24 21:50:33,593 - INFO - Splitting data into train, validation, and test sets...
2024-10-24 21:50:33,594 - INFO - Creating time series for each patient...
Processing patients: 100%|██████████| 182/182 [00:01<00:00, 149.43it/s]


Convert TimeSeries to Float32

Ensure that all TimeSeries objects are in float32 format for compatibility and optimized performance during model training.


In [11]:
# Convert series to float32
logger.info("Converting series to float32...")

def convert_to_float32(ts_list):
    return [ts.astype(np.float32) for ts in ts_list]

train_list = convert_to_float32(train_list)
val_list = convert_to_float32(val_list)
test_list = convert_to_float32(test_list)
train_covariates_list = convert_to_float32(train_covariates_list)
val_covariates_list = convert_to_float32(val_covariates_list)
test_covariates_list = convert_to_float32(test_covariates_list)

2024-10-24 21:50:34,820 - INFO - Converting series to float32...


Scale the Data

Apply scaling to both the main time series and covariates using Darts’ Pipeline and Scaler. Scaling ensures that the data is normalized, which can improve model training efficiency and performance.

In [12]:
# Create pipelines for scaling
logger.info("Creating scaling pipelines...")
pipeline_main = Pipeline([Scaler()])
pipeline_covariates = Pipeline([Scaler()])

# Scale the data
logger.info("Scaling the data...")
train_scaled = pipeline_main.fit_transform(train_list)
val_scaled = pipeline_main.transform(val_list)
test_scaled = pipeline_main.transform(test_list)

train_covariates_scaled = pipeline_covariates.fit_transform(train_covariates_list)
val_covariates_scaled = pipeline_covariates.transform(val_covariates_list)
test_covariates_scaled = pipeline_covariates.transform(test_covariates_list)

2024-10-24 21:50:34,971 - INFO - Creating scaling pipelines...
2024-10-24 21:50:34,972 - INFO - Scaling the data...


Check for Missing Values

Verify that there are no missing values (NaNs) in the scaled datasets. Presence of NaNs can lead to errors during model training and evaluation.


In [13]:
# Check for NaNs in scaled data
def check_for_nans(series_list, name="series"):
    logger.info(f"Checking for NaNs in {name}...")
    for i, series in enumerate(series_list):
        if series.pd_dataframe().isna().any().any():
            raise ValueError(f"NaNs found in {name} at index {i}")

# Check scaled data
check_for_nans(train_scaled, "train_scaled")
check_for_nans(val_scaled, "val_scaled")
check_for_nans(test_scaled, "test_scaled")

2024-10-24 21:50:35,330 - INFO - Checking for NaNs in train_scaled...
2024-10-24 21:50:35,357 - INFO - Checking for NaNs in val_scaled...
2024-10-24 21:50:35,362 - INFO - Checking for NaNs in test_scaled...


Create Datetime Features

Generate additional datetime-based features such as month, year, and day of the week. These features can help the model capture seasonal patterns and temporal dependencies in the data.


In [14]:
# Create datetime features
def create_datetime_features(series):
    logger.info("Creating datetime features...")
    features = {}
    for attribute in ["month", "year", "day_of_week"]:
        features[attribute] = datetime_attribute_timeseries(series, attribute=attribute)
    return features

Initialize and Configure the TFT Model

Set up the Temporal Fusion Transformer (TFT) model with specified hyperparameters, including input and output lengths, hidden size, attention heads, dropout rate, batch size, number of epochs, and optimizer settings. The model is configured for quantile regression to enable probabilistic forecasting.


In [15]:
# Create TFT model with quantile forecasts
logger.info("Creating TFT model...")
model = TFTModel(
    input_chunk_length=24,
    output_chunk_length=7,
    hidden_size=64,
    lstm_layers=1,
    num_attention_heads=4,
    dropout=0.1,
    batch_size=32,
    n_epochs=5,
    likelihood=QuantileRegression(quantiles=[0.1, 0.5, 0.9]),
    optimizer_kwargs={"lr": 1e-3},
    add_encoders={"cyclic": {"future": ["month", "day_of_week"]}},
    pl_trainer_kwargs={
        "accelerator": "mps",
        "devices": 1,
        "callbacks": [EarlyStopping(monitor="val_loss", patience=5, mode="min")],
        "gradient_clip_val": 0.1,
    },
    random_state=42,
)

2024-10-24 21:50:35,391 - INFO - Creating TFT model...


Train the TFT Model

Fit the TFT model on the scaled training data using the provided covariates. Monitor the validation loss to implement early stopping and prevent overfitting.

In [16]:
# Train the model
logger.info("Training the model...")
model.fit(
    series=train_scaled,
    past_covariates=train_covariates_scaled,
    val_series=val_scaled,
    val_past_covariates=val_covariates_scaled,
    verbose=True,
)

2024-10-24 21:50:35,412 - INFO - Training the model...
2024-10-24 21:50:35,691 - INFO - Train dataset contains 54912 samples.
2024-10-24 21:50:35,702 - INFO - Time series values are 32-bits; casting model to float32.
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

   | Name                              | Type                             | Params | Mode 
------------------------------------------------------------------------------------------------
0  | train_metrics                     | MetricCollection                 | 0      | train
1  | val_metrics                       | MetricCollection                 | 0      | train
2  | input_embeddings                  | _MultiEmbedding                  | 0      | train
3  | static_covariates_vsn             | _VariableSelectionNetwork        | 3.3 K  | train
4  | encoder_vsn                       | _VariableSelectionNetwork        | 12.5 K | train
5  | decoder_vsn        

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=5` reached.


TFTModel(output_chunk_shift=0, hidden_size=64, lstm_layers=1, num_attention_heads=4, full_attention=False, feed_forward=GatedResidualNetwork, dropout=0.1, hidden_continuous_size=8, categorical_embedding_sizes=None, add_relative_index=False, loss_fn=None, likelihood=QuantileRegression(quantiles: Optional[List[float]] = None), norm_type=LayerNorm, use_static_covariates=True, input_chunk_length=24, output_chunk_length=7, batch_size=32, n_epochs=5, optimizer_kwargs={'lr': 0.001}, add_encoders={'cyclic': {'future': ['month', 'day_of_week']}}, pl_trainer_kwargs={'accelerator': 'mps', 'devices': 1, 'callbacks': [<pytorch_lightning.callbacks.early_stopping.EarlyStopping object at 0x176ac7370>], 'gradient_clip_val': 0.1}, random_state=42)

Make Probabilistic Predictions

Use the trained TFT model to make probabilistic forecasts on the validation set. Generate multiple samples to estimate prediction intervals (e.g., 10%, 50%, 90% quantiles).


In [17]:
# Make probabilistic predictions
logger.info("Making predictions...")
predictions_90 = model.predict(
    n=7,
    series=val_scaled,
    past_covariates=val_covariates_scaled,
    num_samples=100,
)

2024-10-24 22:07:51,469 - INFO - Making predictions...
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


Predicting: |          | 0/? [00:00<?, ?it/s]

Perform Backtesting

Evaluate the model’s performance over historical data by performing backtesting. This involves generating forecasts at multiple points in the validation set to assess consistency and reliability.


In [18]:
# Backtesting
logger.info("Performing backtesting...")
backtest = model.historical_forecasts(
    series=val_scaled,
    past_covariates=val_covariates_scaled,
    start=0.5,
    forecast_horizon=7,
    stride=1,
    retrain=False,
    verbose=True,
)

2024-10-24 22:07:57,494 - INFO - Performing backtesting...
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


Predicting: |          | 0/? [00:00<?, ?it/s]

Explain the TFT Model

Utilize Darts’ TFTExplainer to interpret the model’s predictions. This includes analyzing feature importance and attention mechanisms to understand how the model makes decisions.


In [19]:
# Create TFT explainer
logger.info("Creating TFT explainer...")
explainer = TFTExplainer(model)
explainability_result = explainer.explain()

2024-10-24 22:08:04,566 - INFO - Creating TFT explainer...
2024-10-24 22:08:04,567 - ERROR - ValueError: `background_series` must be provided `model` was fit on multiple time series.


ValueError: `background_series` must be provided `model` was fit on multiple time series.

Define Plotting Functions

Create helper functions to visualize predictions, backtesting results, feature importance, and attention weights. Visualization aids in interpreting the model’s performance and behavior.


In [None]:
# Plotting functions
def plot_predictions(series, predictions, title):
    logger.info(f"Plotting predictions: {title}")
    plt.figure(figsize=(10, 6))
    series.plot(label="actual")
    predictions.plot(label=["10%", "median", "90%"])
    plt.axhline(y=0.5, color="r", linestyle="--", label="Critical threshold (0.5)")
    plt.title(title)
    plt.legend()
    plt.show()

def plot_backtest(series, backtest_predictions, title):
    logger.info(f"Plotting backtest: {title}")
    plt.figure(figsize=(10, 6))
    series.plot(label="actual")
    backtest_predictions.plot(label="backtest")
    plt.axhline(y=0.5, color="r", linestyle="--", label="Critical threshold (0.5)")
    plt.title(title)
    plt.legend()
    plt.show()

Visualize Prediction Results

Plot the model’s predictions alongside actual values to assess forecast accuracy. Additionally, visualize backtesting results to evaluate model consistency over time.


In [None]:
# Plot results
logger.info("Plotting results...")
plot_predictions(
    val_list[0],
    pipeline_main.inverse_transform(predictions_90[0]),
    "TFT Predictions with Confidence Intervals",
)

plot_backtest(
    val_list[0], pipeline_main.inverse_transform(backtest[0]), "TFT Backtest Results"
)

Visualize Feature Importance and Attention

Use the explainer to plot feature importance and attention weights. These plots help in understanding which features the model considers most influential and how it attends to different parts of the input data.

In [None]:
# Plot feature importance and attention
logger.info("Plotting feature importance and attention...")
explainer.plot_variable_selection(explainability_result)
explainer.plot_attention(explainability_result, plot_type="time")

Calculate and Display Evaluation Metrics

Compute key performance metrics such as Mean Absolute Percentage Error (MAPE), Root Mean Squared Error (RMSE), Mean Absolute Error (MAE), and R-squared (R²) to quantitatively assess the model’s forecasting accuracy on the validation set.

In [None]:
# Print metrics
logger.info("Calculating and printing metrics...")
print("\nValidation Metrics:")
print(f"MAPE: {mape(val_list, pipeline_main.inverse_transform(predictions_90)):.2f}%")
print(f"RMSE: {rmse(val_list, pipeline_main.inverse_transform(predictions_90)):.2f}")
print(f"MAE: {mae(val_list, pipeline_main.inverse_transform(predictions_90)):.2f}")
print(f"R2: {r2_score(val_list, pipeline_main.inverse_transform(predictions_90)):.2f}")

Save the Trained Model

Persist the trained TFT model to disk for future use or deployment. Saving the model allows for reproducibility and easy access without retraining.

In [None]:
# Save the model
logger.info("Saving the model...")
model.save("best_tft_model")

logger.info("Processing completed successfully")