# Import libs

In [4]:
"""This is basline statistic models for regression time-series task.

Todo:
    * Google docstings
    * unittest
    * refactoring HoltWinters model to reduce time consuming.
"""

import os
import sys
sys.path.append(os.path.abspath("../.."))
import pandas as pd
import numpy as np

from typing import Union, Tuple, List
from scipy.signal import savgol_filter
import scipy.signal as signal
from scipy.signal import find_peaks
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_percentage_error
from sklearn.metrics import mean_absolute_error
from scipy.stats import mode
from statsmodels.tsa.holtwinters import ExponentialSmoothing

# Baseline classes

In [6]:
class MEV:
    """ Mean Expected Values model (MEV).

        Notes:
            Main idea is to predict next values based on mean value per day of week and hour.
            
        Attributes:
            is_savgol (bool): applie or not savgol filtration over ts. By default, is False due the fact that we apply filter earlier.
            is_manual_model (bool): applie small window rolling mean over ts to reduce data lags. By default, is true. Should be deleted in the future.
            
    """
    __slots__ = ['df', 'df_train', 'df_test', 'target_columns_to_detrend', 'start_hour', 'end_hour', 'is_manual_model',
                 'target_day_of_year', 'target_end_hour_of_day', 'is_savgol', 'df_predicted', 'df_expected']

    def __init__(self,
                 is_savgol: bool = False,
                 is_manual_model: bool = True):
        self.is_savgol = is_savgol
        self.is_manual_model = is_manual_model
        
        # int: start time of the day which we use to analyse data.
        self.start_hour = 8
        # int: end time of the day which we stop for future data analyse.
        self.end_hour = 22

    def remove_noise(self, ts: pd.Series) -> pd.Series:
        """Savgol Filter to delete noisy in data.
        
        Notes:
            This is baseline idea. By default window is 5 and polyorder is 2. In the future it should be auto selected with some optimization stategy.
            
        Args:
            ts: pandas time series data where index in timestamp format.
        
        Returns:
            Filtered time series data in the same pandas series format.
        """
        return pd.Series(data=savgol_filter(ts, 3, 2), index=ts.index, name=ts.name)

    def fit(self, df: pd.DataFrame):
        self.df = df.copy()
        self.df_train = df[:-5][['real_weight', 'real_wagon_count']]
        self.df_test = df[-5:][['real_weight', 'real_wagon_count']]

        if self.is_savgol:
            for column in ['real_weight', 'real_wagon_count']:
                self.df_train[column] = self.remove_noise(self.df_train[column])

        self.df_predicted = self.predict()
        self.df_expected = self.df_test

        return self

    def load_linear_model(self, ts: pd.Series) -> LinearRegression:
        """scilearn linear regression model to get trend over ts. This is baseline idea."""
        return LinearRegression().fit(np.arange(ts.shape[0]).reshape(-1, 1), ts.values)

    def remove_trend_ts(self, ts: pd.Series, model: LinearRegression) -> pd.Series:
        """detrend selected time series with linear model"""
        return pd.Series(data=(ts.values - model.predict(np.arange(ts.shape[0]).reshape(-1, 1))), index=ts.index,
                         name=ts.name)

    def get_aggregated_df(self, df_train: pd.DataFrame) -> pd.DataFrame:
        """ just to some agg function over dataframe based on dayofweek and hour """
        df = df_train.groupby([df_train.index.quater]).mean()
        df.index = df.index.set_names(['season'])
        return df

    def get_mean_expected_df(self, df_train: pd.DataFrame) -> pd.DataFrame:
        """ return mean values for each KPI day of week and hours """
        df = self.get_aggregated_df(df_train)
        return df[((df.index.get_level_values(1) >= 0) &
                   (df.index.get_level_values(1) <= 23))]  # 8 and 20

    def get_mean_df(self, df_train: pd.DataFrame) -> pd.DataFrame:
        """ just get expected dataframe each day of week and hour """
        mean_df = self.get_mean_expected_df(df_train)
        return pd.concat([mean_df[mean_df.index.get_level_values(0) == self.df[self.df.index.dayofyear == day]
                         .index.dayofweek[0]].reset_index(drop=True) for day in
                          sorted(set(self.df.index.dayofyear))]).reset_index(drop=True)

    # def get_mean_df_manual(self, df_train: pd.DataFrame) -> pd.DataFrame:
    #     """ get expected dataframe each day of week and hour based on specific logic.
    #     The idea is to calculate mean for [i0,i1,i3] (three) nearest indexes on each step."""
    #     mean_df = self.get_mean_expected_df(df_train.rolling(window=3, center=True).sum()/3)\
    #     .fillna(method="bfill").fillna(method="ffill")

    def get_list_linear_models(self) -> pd.DataFrame:
        return [self.load_linear_model(self.df_train[column]) for column in self.df.columns]

    def get_detrended_df(self, list_linear_models: List[LinearRegression]) -> pd.DataFrame:
        df = self.df_train.copy()
        for ind, column in enumerate(df.columns):
            df[column] = self.remove_trend_ts(df[column], list_linear_models[ind])
        return df

    def get_trended_df(self, df_pred_mean: pd.DataFrame, model_list: List[LinearRegression]) -> pd.DataFrame:
        shape_past = self.df_train.shape[0]
        index_future = shape_past + df_pred_mean.shape[0]
        columns = self.df.columns.to_list()
        for ind, model in enumerate(model_list):
            df_pred_mean[columns[ind]] = (df_pred_mean[columns[ind]].values
                                          + model.predict(np.arange(shape_past, index_future).reshape(-1, 1)))
        return df_pred_mean

    def predict(self) -> pd.DataFrame:
        """ get train and expected values """
        list_linear_models = self.get_list_linear_models()
        df_detrended = self.get_detrended_df(list_linear_models)
        if self.is_manual_model:
            df_pred_mean = self.get_mean_df((df_detrended.rolling(window=3, center=True).sum() / 3) \
                                            .fillna(method="bfill").fillna(method="ffill"))
        else:
            df_pred_mean = self.get_mean_df(df_detrended)  # каждый день с 0 до 24 часов
        df_pred_mean.index = pd.date_range(start=self.df.index.date[0],
                                           end=self.df.index.date[-1] + pd.Timedelta(days=1),
                                           freq='1H')[:-1]  # -1 because of 00:00 hour
        df_pred_mean = df_pred_mean[(df_pred_mean.index.dayofyear == self.df.index.dayofyear[-1])]
        df_mean_expected = self.get_trended_df(df_pred_mean, list_linear_models)
        return df_mean_expected[(df_mean_expected.index.hour >= self.start_hour) & (
                    df_mean_expected.index.hour <= self.target_end_hour_of_day)]

    def mape_score(self) -> Tuple[float]:
        """ return tuple MAPE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_percentage_error(df_expected[column], df_predicted[column]), 2) * 100 for column
                  in self.df.columns]
        return scores

    def mae_score(self) -> Tuple[float]:
        """ return tuple MAE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_error(df_expected[column], df_predicted[column]), 2) for column in
                  self.df.columns]
        return scores


class FourierModel:
    __slots__ = ['df', 'df_train', 'df_test', 'target_columns_to_detrend', 'df_residuals', 'df_predicted_full',
                 'start_hour',
                 'end_hour', 'target_day_of_year', 'target_end_hour_of_day', 'is_savgol', 'df_predicted', 'df_expected']

    def __init__(self,
                 is_savgol: bool = False):
        self.is_savgol = is_savgol
        self.start_hour = 8
        self.end_hour = 22

    def fit(self, df: pd.DataFrame):
        self.df = df
        self.target_day_of_year: int = self.df.index.dayofyear[-1]
        self.target_end_hour_of_day: int = self.df[(self.df.index.dayofyear == self.target_day_of_year) &
                                                   ((self.df.index.hour >= self.start_hour) & (
                                                               self.df.index.hour <= self.end_hour))].index.hour[-1]
        
        df_train_ = df[df.index.dayofyear != self.target_day_of_year] #problem is here
        df_train_night_ = df[(df.index.dayofyear == self.target_day_of_year) & (df.index.hour < self.start_hour)] #problem code
        self.df_train = pd.concat([df_train_, df_train_night_]) #problem code
        
        self.df_test = df[(df.index.dayofyear == self.target_day_of_year) & (df.index.hour >= self.start_hour)] #check shape!

        if self.is_savgol:
            for column in df.columns:
                self.df_train[column] = self.remove_noise(self.df_train[column])

        self.df_predicted = self.predict()
        self.df_predicted = self.df_predicted[
            ((self.df_predicted.index.hour >= self.start_hour) & (self.df_predicted.index.hour <= self.end_hour))
            & (self.df_predicted.index.dayofyear == self.df_predicted.index.dayofyear[-1])]
        self.df_expected = self.df[((self.df.index.hour >= self.start_hour) & (self.df.index.hour <= self.end_hour))
                                   & (self.df.index.dayofyear == self.df.index.dayofyear[-1])]

        self.df_predicted_full = self.get_train_and_predicted_in_one_df()
        self.df_residuals = self.df_predicted_full - self.df

        return self

    def remove_noise(self, ts: pd.Series) -> pd.Series:
        """Savgol Filter to delete noisy in data. This is baseline idea."""
        return pd.Series(data=savgol_filter(ts, 5, 2), index=ts.index, name=ts.name)

    def load_linear_model(self, ts: np.array) -> LinearRegression:
        """scilearn linear regression model to get trend over ts. This is baseline idea."""
        return LinearRegression().fit(np.arange(ts.shape[0]).reshape(-1, 1), ts)

    def get_list_linear_models(self) -> pd.DataFrame:
        return [self.load_linear_model(self.df_train[column]) for column in self.df.columns]

    def remove_trend_ts(self, ts: pd.Series, model: LinearRegression) -> pd.Series:
        """detrend selected time series with linear model"""
        return pd.Series(data=(ts.values - model.predict(np.arange(ts.shape[0]).reshape(-1, 1))), index=ts.index,
                         name=ts.name)

    def get_detrended_df(self, list_linear_models: List[LinearRegression]) -> pd.DataFrame:
        df = self.df_train.copy()
        for ind, column in enumerate(df.columns):
            df[column] = self.remove_trend_ts(df[column], list_linear_models[ind])
        return df

    def get_trended_df(self, df_pred_mean: pd.DataFrame, model_list: List[LinearRegression]) -> pd.DataFrame:
        shape_past = self.df_train.shape[0]
        index_future = shape_past + df_pred_mean.shape[0]
        columns = self.df.columns.to_list()
        for ind, model in enumerate(model_list):
            df_pred_mean[columns[ind]] = (df_pred_mean[columns[ind]].values
                                          + model.predict(np.arange(shape_past, index_future).reshape(-1, 1)))
        return df_pred_mean

    def mape_score(self) -> Tuple[float]:
        """ return tuple MAPE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_percentage_error(df_expected[column], df_predicted[column]), 2) * 100 for column
                  in self.df.columns]
        return scores

    def mae_score(self) -> Tuple[float]:
        """ return tuple MAE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_error(df_expected[column], df_predicted[column]), 2) for column in
                  self.df.columns]
        return scores

    def train_fourier_model(self, detrended_ts_train: np.array) -> np.array:
        """get fourier coefficient from observed time series"""
        return np.fft.fft(detrended_ts_train)

    def restore_signal_from_fft(self, fft_x, fft_y, N, extrapolate_with, frac_harmonics):
        """get signal based on fouriere coefficients from train time series and chosen frac_harmonics value"""
        xvalues_full = np.arange(0, N + extrapolate_with)
        restored_sig = np.zeros(N + extrapolate_with)
        indices = list(range(N))

        # The number of harmonics we want to include in the reconstruction
        indices.sort(key=lambda i: np.absolute(fft_x[i]))
        max_no_harmonics = len(fft_y)
        no_harmonics = int(frac_harmonics * max_no_harmonics)

        for i in indices[:1 + no_harmonics * 2]:
            ampli = np.absolute(fft_y[i]) / N
            phase = np.angle(fft_y[i])
            restored_sig += ampli * np.cos(2 * np.pi * fft_x[i] * xvalues_full + phase)
        return restored_sig

    def predict(self) -> pd.DataFrame:
        """pipline to get signal time series in future"""
        list_linear_models = self.get_list_linear_models()
        df_detrended = self.get_detrended_df(list_linear_models)
        df_preds = pd.DataFrame(index=self.df_test.index, data=None, columns=self.df_test.columns)

        for ind, column in enumerate(df_detrended.columns):
            fft_y = self.train_fourier_model(df_detrended[column])
            restore_signal_from_fft = self.restore_signal_from_fft(
                fft_x=np.fft.fftfreq(self.df_train.shape[0]),
                fft_y=fft_y,
                extrapolate_with=self.df_test.shape[0],
                N=fft_y.shape[0],
                frac_harmonics=0.33)
            preds = np.add(restore_signal_from_fft[self.df_train.shape[0]:],
                           list_linear_models[ind].predict(np.arange(self.df_train.shape[0],  # MAIN PROBLEM
                                                                     (self.df_train.shape[0]) + (
                                                                     self.df_test.shape[0]), step=1).reshape(-1, 1))) # MAIN PROBLEM here
            df_preds[column] = preds

        return df_preds[(df_preds.index.hour >= self.start_hour) & (df_preds.index.hour <= self.target_end_hour_of_day)]

    def get_train_and_predicted_in_one_df(self) -> pd.DataFrame:
        """pipline to get signal time series in future"""
        list_linear_models = self.get_list_linear_models()
        df_detrended = self.get_detrended_df(list_linear_models)
        df_pred = pd.DataFrame(index=self.df.index, data=None, columns=self.df.columns)

        for ind, column in enumerate(df_detrended.columns):
            fft_y = self.train_fourier_model(df_detrended[column])
            restore_signal_from_fft = self.restore_signal_from_fft(
                fft_x=np.fft.fftfreq(self.df_train.shape[0]),
                fft_y=fft_y,
                extrapolate_with=self.df_test.shape[0],
                N=fft_y.shape[0],
                frac_harmonics=0.33)
            preds = np.add(list_linear_models[ind].predict(np.arange(0,
                                                                     (self.df.shape[0])).reshape(-1, 1)),
                           restore_signal_from_fft)
            df_pred[column] = preds
        return df_pred

    def mape_score(self) -> Tuple[float]:
        """ return tuple MAPE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_percentage_error(df_expected[column], df_predicted[column]), 2) * 100 for column
                  in self.df.columns]
        return scores

    def mae_score(self) -> Tuple[float]:
        """ return tuple MAE score for each column via sklearn at the same columns order """
        df_predicted = self.df_predicted
        df_expected = self.df_expected
        scores = [round(mean_absolute_error(df_expected[column], df_predicted[column]), 2) for column in
                  self.df.columns]
        return scores