In [138]:
import pandas as pd
import matplotlib.pyplot as plt
from datetime import date, datetime, timedelta
import os
import logging
import urllib.parse as url
import warnings
import numpy as np
from pytorch_forecasting import TimeSeriesDataSet, DeepAR, TemporalFusionTransformer, LogNormalDistributionLoss, NHiTS, MAE
from pytorch_forecasting.data.encoders import EncoderNormalizer
from typing import Union, Dict, Optional, Tuple
from torch.utils.data import DataLoader
import torch
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping
from sklearn.metrics import mean_absolute_error

warnings.filterwarnings('ignore')

pl.seed_everything(666)

class ForecastingModel:

    def __init__(
            self,
            history_start_date: Union[date, datetime],
            history_end_date: Union[date, datetime],
            forecast_end_date: Union[date, datetime],
            target_data: Dict[str, list],
            exog_data: Dict[str, list],
            target_data_interval: str = 'hour',
            exog_data_interval: str = 'hour',
    ) -> None:
        """
        This method initializes training, validation and testing datasets for training our model.
        :param history_start_date: starting date of our training dataset
        :param history_end_date: ending date of our training dataset / starting date of our testing dataset
        :param forecast_end_date: ending date of our testing dataset
        :param target_data: dictionary contained series names and corresponding column names for target data
        :param exog_data: dictionary contained series names and corresponding column names for exogenous data
        """

        """
        First we fetch data from syspower
        """
        self._dates = {
            'history_start_date': history_start_date,
            'history_end_date': history_end_date,
            'forecast_end_date': forecast_end_date
        }
        self._target_data = self.get_data(data_dict=target_data, interval=target_data_interval)
        self._decoder_length = 0
        self._encoder_length = 0

        temp_date = forecast_end_date + timedelta(days=1)
        exog_data_df = self.get_data(data_dict=exog_data, interval=exog_data_interval, end_date=temp_date)

        exog_data_df['ds'] = pd.to_datetime(exog_data_df['ds'])
        exog_data_df = exog_data_df.set_index('ds').resample('H').interpolate()
        exog_data_df = exog_data_df.reset_index()
        self._exog_data = exog_data_df.loc[exog_data_df['ds'] < temp_date, :]
        self._full_data = pd.merge(left=self._target_data, right=self._exog_data, on='ds', how='left')
        self._train_dataloader, self._val_dataloader, self._result_df = self.create_dataloaders(
            target_column_names=target_data.get('column_names', ''),
            exog_column_names=exog_data.get('column_names', '')
        )
        self._networks = ()
        self._trainers = ()
        self._fitted_networks = []


    @property
    def target_data(self):
        return self._target_data

    @property
    def exog_data(self):
        return self._exog_data

    @property
    def dates(self):
        return self._dates

    @property
    def train_dataloader(self):
        return self._train_dataloader

    @property
    def val_dataloader(self):
        return self._val_dataloader

    @property
    def result_df(self):
        return self._result_df

    @property
    def networks(self):
        return self._networks

    @property
    def trainers(self):
        return self._trainers

    @property
    def fitted_networks(self):
        return self._fitted_networks

    @property
    def encoder_length(self):
        return self._encoder_length

    @property
    def decoder_length(self):
        return self._decoder_length

    def get_data(
            self,
            data_dict: Dict[str, list],
            end_date: Optional[datetime] = None,
            interval: str = 'hour'
    ) -> pd.DataFrame:
        """
        This method loads data from syspower
        :param data_dict: dictionary with series names and corresponding column names
        :param end_date: date where data should ends
        :param interval: string that defines time interval on which data should be fetched
        :return: dataframe with the loaded data
        """
        if end_date:
            request = {
                "series": ','.join(data_dict.get('data_series')),
                'interval': interval,
                'start': self.dates.get('history_start_date').strftime('%Y-%m-%d'),
                'end': end_date.strftime('%Y-%m-%d'),
                'token': '8ycj3jSf2DJZOtX',
                'emptydata': 'yes',
                'currency': '',
                'dateFormat': 'nbno',
                'numberFormat': 'nothousandsdot',
                'fileformat': 'csv',
                'headers': 'no'
            }
        else:
            request = {
                "series": ','.join(data_dict.get('data_series')),
                'interval': interval,
                'start': self.dates.get('history_start_date').strftime('%Y-%m-%d'),
                'end': self.dates.get('forecast_end_date').strftime('%Y-%m-%d'),
                'token': '8ycj3jSf2DJZOtX',
                'emptydata': 'yes',
                'currency': '',
                'dateFormat': 'nbno',
                'numberFormat': 'nothousandsdot',
                'fileformat': 'csv',
                'headers': 'no'
            }

        data_url = f'https://syspower5.skm.no/api/webquery/execute?{url.urlencode(request)}'
        data = pd.read_csv(
            data_url,
            sep=';',
            index_col=0,
            parse_dates=True,
            dayfirst=True,
            header=None,
            names = ['ds'] + data_dict.get('column_names', [])
        ).interpolate().reset_index()
        return data

    def create_dataloaders(
            self,
            target_column_names: list,
            exog_column_names: list,
            encoder_length: int = 168,
            decoder_length: int = 24
    ) -> Tuple[DataLoader, DataLoader, pd.DataFrame]:
        """
        This method creates datasets and dataloaders in a format expected by pytorch_forecasting.

        :param target_column_names: list with names of columns of target values.
        :param exog_column_names: list with names of columns of exogenous values
        :param encoder_length: number of hours on which we base forecast
        :param decoder_length: number of hours for which we forecast ahead
        :return tuple of shape (train_dataloader, val_dataloader)
        """
        result = self._full_data.melt(
            id_vars=['ds'],
            value_vars=target_column_names
        )
        groups_dict = {key: value for value, key in enumerate(target_column_names)}
        time_idx = np.tile(np.arange(self._full_data.shape[0]), len(target_column_names))
        result['time_idx'] = time_idx
        result['group'] = result.apply(lambda x: groups_dict.get(x['variable'], 0), axis=1)

        result['date'] = result.apply(lambda x: pd.to_datetime(x['ds'].date()), axis=1)
        result['country'] = result.apply(lambda x: x['variable'].split('_')[-1][:2], axis=1)

        special_days = pd.read_excel('syspower_dict.xlsx', 'calendar')
        special_days.columns = ['date', 'country', 'spec_day']
        special_days['country'] = special_days.apply(lambda x: x['country'].lower(), axis=1)
        special_days['spec_day'] = special_days['spec_day'] + 1
        result['weekday'] = result.apply(lambda x: x['ds'].weekday(), axis=1)

        result = pd.merge(left=result, right=special_days, on=['date', 'country'], how='left').fillna(value=0).drop_duplicates().reset_index(drop=True)

        result['day_type'] = result.apply(
            lambda x: 'holiday' if (x['weekday'] == 5 or x['weekday'] == 6 or x['spec_day'] != 0) else 'common',
            axis=1
        )

        result['hour'] = result.apply(lambda x: x['ds'].hour, axis=1).astype(str).astype('category')
        result['weekday'] = result['weekday'].astype(str).astype('category')
        result['month'] = result.apply(lambda x: x['ds'].month, axis=1).astype(str).astype('category')
        result['spec_day'] = result['spec_day'].astype(str).astype('category')

        df_list = [self.exog_data] * len(target_data.get('column_names', ''))
        df = pd.concat(df_list, ignore_index=True).drop(columns='ds')
        result = pd.concat([result, df], axis=1)
        train_data = result[lambda x: x['ds'] < history_end_date]

        training_cutoff = train_data['time_idx'].max() - decoder_length
        training_dataset = TimeSeriesDataSet(
            train_data[lambda x: x.time_idx <= training_cutoff],
            group_ids=['group'],
            target='value',
            time_idx='time_idx',
            min_encoder_length=encoder_length,
            max_encoder_length=encoder_length,
            min_prediction_length=decoder_length,
            max_prediction_length=decoder_length,
            time_varying_unknown_reals=['value'],
            time_varying_known_reals=exog_column_names,
            time_varying_known_categoricals=['day_type', 'country'],
            target_normalizer=EncoderNormalizer(transformation='log'),
            #add_relative_time_idx=True,
            add_target_scales=True,
            add_encoder_length=True
        )
        validation_dataset = TimeSeriesDataSet.from_dataset(
            training_dataset,
            train_data,
            predict=True
        )

        batch_size = 64
        train_dataloader = training_dataset.to_dataloader(
            train=True,
            batch_size=batch_size,
            num_workers=0
        )
        val_dataloader = validation_dataset.to_dataloader(
            train=False,
            batch_size=batch_size,
            num_workers=0
        )

        self._encoder_length = encoder_length
        self._decoder_length = decoder_length

        return train_dataloader, val_dataloader, result

    def add_networks(
            self,
            nn_hyperparameters: Dict[str, Dict[str, Union[int, float]]],
            tr_hyperparameters: Dict[str, Dict[str, Union[int, float]]],
            optimize: bool = False
    ):
        """
        This method adds desired number of different neural networks to forecasting model

        :param nn_hyperparameters: dictionary with desired values of hyperparameters for each neural network
        :param tr_hyperparameters: dictionary with hyperparamteres for trainers
        :param optimize: argument defines if automatic optimization fo hyperparameters needed. If False (default) hyperparameters defined by nn_hyperparameters and
        default parameters of related networks. If True, then parameters are defined by optuna
        :return: None
        """
        for nn_name in nn_hyperparameters:
            nn_params = nn_hyperparameters.get(nn_name)
            tr_params = tr_hyperparameters.get(nn_name)
            if nn_name == 'deepar':
                current_nn = DeepAR.from_dataset(
                    self.train_dataloader.dataset,
                    **nn_params
                )
            if nn_name == 'tft':
                current_nn = TemporalFusionTransformer.from_dataset(
                    self.train_dataloader.dataset,
                    **nn_params
                )
            if nn_name == 'nhits':
                current_nn = NHiTS.from_dataset(
                    self.train_dataloader.dataset,
                    **nn_params
                )
            current_trainer = pl.Trainer(**tr_params)
            self._networks += (current_nn, )
            self._trainers += (current_trainer, )

    def fit(self):
        """
        This method fits networks in self.networks with respective trainers from self.trainers
        :return:
        """
        for index, network in enumerate(self.networks):
            self.trainers[index].fit(
                network,
                train_dataloaders=self.train_dataloader,
                val_dataloaders=self.val_dataloader
            )
            if network.__class__ == DeepAR:
                self._fitted_networks += [DeepAR.load_from_checkpoint(self.trainers[index].checkpoint_callback.best_model_path)]
            if network.__class__ == TemporalFusionTransformer:
                self._fitted_networks += [TemporalFusionTransformer.load_from_checkpoint(self.trainers[index].checkpoint_callback.best_model_path)]
            if network.__class__ == NHiTS:
                self._fitted_networks += [NHiTS.load_from_checkpoint(self.trainers[index].checkpoint_callback.best_model_path)]

    def predict(self):
        final_prediction = np.array([])
        current_prediction = np.array([])

        self.target_data['cnp_fi_pred'] = 0
        dates_range = pd.date_range(
            start=self.dates.get('history_end_date') + timedelta(hours=self.decoder_length),
            end=self.dates.get('forecast_end_date'),
            freq='D'
        )
        for current_date in dates_range:
            final_prediction = np.array([])
            for fitted_network in self.fitted_networks:
                cur_df = self.result_df.loc[
                         (self.result_df['ds'] >= current_date - timedelta(hours=self.encoder_length+self.decoder_length)) &
                         (self.result_df['ds'] < current_date),
                         :
                         ]
                current_prediction = fitted_network.predict(
                    cur_df,
                    mode='prediction',
                    return_x=False
                )
                current_prediction = current_prediction.detach().numpy()[-1, :]
                if not final_prediction.size:
                    final_prediction = current_prediction
                else:
                    final_prediction += current_prediction
            self.target_data.loc[
                (self.target_data['ds'] >= current_date - timedelta(days=1)) &
                (self.target_data['ds'] < current_date),
                'cnp_fi_pred'
            ] = final_prediction / len(self.fitted_networks)

        forecast_results = self.target_data.loc[
            (self.target_data['ds'] >= history_end_date) &
            (self.target_data['ds'] < forecast_end_date),
            ['ds', 'cnp_fi', 'cnp_fi_pred']
        ].set_index('ds')
        forecast_results.plot(figsize=(16, 6))

        mae = mean_absolute_error(
            y_true=forecast_results.loc[:, 'cnp_fi'],
            y_pred=forecast_results.loc[:, 'cnp_fi_pred']
        )
        print(f'MAE = {mae}')

        forecast_results.reset_index(inplace=True)
        forecast_results['day'] = forecast_results.apply(lambda x: x['ds'].day, axis=1)
        forecast_results['hour'] = forecast_results.apply(lambda x: x['ds'].hour, axis=1)
        forecast_results['diff'] = forecast_results.apply(lambda x: np.abs(x['cnp_fi'] - x['cnp_fi_pred']), axis=1)
        diff = forecast_results.groupby(by='day').max()['diff'].to_numpy()
        print(f'max deviation = {np.mean(diff * 1000)}')

Global seed set to 666


In [139]:
target_data = {
    'data_series': [
        'CNPNP',
        'CNPSE', 'CNPSE1', 'CNPSE2', 'CNPSE3', 'CNPSE4',
        'CNPNO', 'CNPNO1', 'CNPNO2', 'CNPNO3', 'CNPNO4', 'CNPNO5',
        'CNPDEN', 'CNPDK1', 'CNPDK2',
        'CNPEE', 'CNPLV', 'CNPLT',
        'CNPDE', 'CNPNL', 'CNPUK',
        'CNPFI'
    ],
    'column_names': [
        'cnp_np',
        'cnp_se', 'cnp_se1', 'cnp_se2', 'cnp_se3', 'cnp_se4',
        'cnp_no', 'cnp_no1', 'cnp_no2', 'cnp_no3', 'cnp_no4', 'cnp_no5',
        'cnp_dk', 'cnp_dk1', 'cnp_dk2',
        'cnp_ee', 'cnp_lv', 'cnp_lt',
        'cnp_de', 'cnp_nl', 'cnp_uk',
        'cnp_fi'
    ]
}

exog_data = {
    'data_series': ['SMHITEMPFI_F', 'SMHITEMPSE_F', 'SMHITEMPNO_F', 'SMHITEMPNP_F', 'SMHITEMPDK_F'],
    'column_names': ['temp_fi', 'temp_se', 'temp_no', 'temp_np', 'temp_dk']
}

history_start_date = datetime(year=2020, month=12, day=1)
history_end_date = datetime(year=2021, month=12, day=5)
forecast_end_date = datetime(year=2021, month=12, day=12)

forecasting_model = ForecastingModel(
    history_start_date=history_start_date,
    history_end_date=history_end_date,
    forecast_end_date=forecast_end_date,
    target_data=target_data,
    exog_data=exog_data,
    target_data_interval='hour',
    exog_data_interval='day'
)

In [140]:
"""
deepar:
the best params = {'gradient_clip_val': 0.436058692807735, 'hidden_size': 155, 'rnn_layers': 2, 'learning_rate': 0.00997853051614353}; the best value = -1.2212425470352173

tft:
{'gradient_clip_val': 0.27155203157679625, 'hidden_size': 75, 'dropout': 0.2193904175901326, 'hidden_continuous_size': 43, 'attention_head_size': 5,
 'learning_rate': 0.0012277693365326608}
"""

nn_hyperparameters = {
    'deepar': {
        'hidden_size': 40,
        'rnn_layers': 3,
        'learning_rate': 1e-3,
        'cell_type': 'GRU',
        'loss': LogNormalDistributionLoss()
    },
    'tft': {
        'hidden_size': 128,
        'dropout': 0.219,
        'hidden_continuous_size': 32,
        'attention_head_size': 6,
        'learning_rate': 5e-3
    },
    'nhits': {
        'hidden_size': 1024,
        'learning_rate': 1e-3,
        'pooling_sizes': [16, 8, 1],
        'dropout': 0.1,
        'loss': MAE(),
        'reduce_on_plateau_patience': 5
    }
}

early_stop_callback_deepar = EarlyStopping(
    monitor='val_loss',
    min_delta=1e-4,
    patience=10,
    verbose=False,
    mode='min'
)
early_stop_callback_tft = EarlyStopping(
    monitor='val_loss',
    min_delta=1e-4,
    patience=10,
    verbose=False,
    mode='min'
)
early_stop_callback_nhits = EarlyStopping(
    monitor='val_loss',
    min_delta=1e-4,
    patience=10,
    verbose=False,
    mode='min'
)

tr_hyperparameters = {
    'deepar': {
        'max_epochs': 100,
        'min_epochs': 50,
        'gpus': [0] if torch.cuda.is_available() else None,
        'callbacks': [early_stop_callback_deepar],
        'limit_train_batches': 40,
        'limit_val_batches': 10,
        'gradient_clip_val': 0.25
    },
    'tft': {
        'max_epochs': 10,
        'gpus': [0] if torch.cuda.is_available() else None,
        'callbacks': [early_stop_callback_tft],
        'limit_train_batches': 40,
        'limit_val_batches': 10,
        'gradient_clip_val': 0.25
    },
    'nhits': {
        'max_epochs': 100,
        'min_epochs': 50,
        'gpus': [0] if torch.cuda.is_available() else None,
        'callbacks': [early_stop_callback_nhits],
        'limit_train_batches': 40,
        'limit_val_batches': 10,
        'gradient_clip_val': 0.25
    }
}
forecasting_model.add_networks(nn_hyperparameters=nn_hyperparameters, tr_hyperparameters=tr_hyperparameters)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [141]:
forecasting_model.fit()


  | Name                   | Type                      | Params
---------------------------------------------------------------------
0 | loss                   | LogNormalDistributionLoss | 0     
1 | logging_metrics        | ModuleList                | 0     
2 | embeddings             | MultiEmbedding            | 319   
3 | rnn                    | LSTM                      | 310 K 
4 | distribution_projector | Linear                    | 312   
---------------------------------------------------------------------
310 K     Trainable params
0         Non-trainable params
310 K     Total params
1.243     Total estimated model params size (MB)


Validation sanity check: 0it [00:00, ?it/s]

Global seed set to 666


Training: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

Validating: 0it [00:00, ?it/s]

IsADirectoryError: [Errno 21] Is a directory: '/home/bathory/DataspellProjects/ZoneCnpForecast'

In [None]:
forecasting_model.predict()