In [1]:
from abc import ABC, abstractmethod
from typing import Optional, Tuple

import pandas as pd
import numpy as np
from sklearn.linear_model import LinearRegression

In [2]:
class BaseForecaster(ABC):
    @abstractmethod
    def fit(self, y_train: pd.DataFrame, x_hog_train: Optional[pd.DataFrame] = None):
        pass

    @abstractmethod
    def forecast(self, h: int, x_hog: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        pass

    @abstractmethod
    def fit_forecast(
        self,
        y_train: pd.DataFrame,
        h: int,
        x_hog_train: Optional[pd.DataFrame] = None,
        x_hog: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        pass

    @abstractmethod
    def in_sample_forcast(self) -> pd.DataFrame:
        pass


class ReduceWindow(BaseForecaster):
    def __init__(self, window_len: int):
        super().__init__()
        self.__window_len = window_len

    def fit(self, y_train: pd.DataFrame, x_hog_train: Optional[pd.DataFrame] = None):
        pass

    def forecast(self, h: int, x_hog: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        pass

    def fit_forecast(
        self,
        y_train: pd.DataFrame,
        h: int,
        x_hog_train: Optional[pd.DataFrame] = None,
        x_hog: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        pass

    def in_sample_forcast(self) -> pd.DataFrame:
        pass

    @property
    def window_len(self):
        return self.__window_len

    def __reduce_window_xy(
        self, y_train: pd.DataFrame, x_hog: pd.DataFrame
    ) -> Tuple[np.ndarray, np.ndarray]:
        x_data, y_data = [], []
        y_train_np = y_train.to_numpy()
        x_hog_np = x_hog.to_numpy()
        for i in range(self.window_len, len(y_train)):
            window_data = (
                x_hog_np[i - self.window_len : i, :].flatten(),
                y_train_np[i - self.window_len : i].flatten(),
            )
            x_data.append(np.hstack(window_data))
            y_data.append(y_train_np[i].item())
        return np.array(x_data), np.array(y_data)

    def __reduce_window_y(self, y_train: pd.DataFrame) -> Tuple[np.ndarray, np.ndarray]:
        x_data, y_data = [], []
        y_train_np = y_train.to_numpy()
        for i in range(self.window_len, len(y_train)):
            x_data.append(y_train_np[i - self.window_len : i, :].flatten())
            y_data.append(y_train_np[i].item())
        return np.array(x_data), np.array(y_data)

    def reduce_window(
        self, y_data: pd.DataFrame, x_hog: Optional[pd.DataFrame]
    ) -> Tuple[np.ndarray, np.ndarray]:
        if x_hog is None:
            return self.__reduce_window_y(y_data)
        else:
            return self.__reduce_window_xy(y_data, x_hog)

In [36]:
class Forecaster(ReduceWindow):
    def __init__(self, window_len: int = 10):
        super().__init__(window_len)
        self._estimator = LinearRegression()
        self._last_fited_index = None
        self._last_fited_data = None
        self._freq = None
        self._fitted_with_xhog = False

    def fit(self, y_train: pd.DataFrame, x_hog_train: Optional[pd.DataFrame] = None):
        x_data, y_data = self.reduce_window(y_train, x_hog_train)
        self._estimator.fit(x_data, y_data)
        self._last_fited_index = y_train.index[-1]
        if x_hog_train is None:
            self._last_fited_data = y_train.to_numpy()[-self.window_len :].reshape(
                1, -1
            )
        else:
            self._last_fited_data = np.hstack(
                [
                    x_hog_train.to_numpy()[-self.window_len :].flatten(),
                    y_train.to_numpy()[-self.window_len :].flatten(),
                ]
            ).reshape(1, -1)
        self._freq = y_train.index.freqstr
        self._fitted_with_xhog = False if x_hog_train is None else True
        return self

    def forcast_horizon(self, h: int):
        return pd.date_range(
            start=self._last_fited_index,
            freq=self._freq,
            inclusive="right",
            periods=h + 1,
        )

    def __prepare_forecast_df(self, h):
        return pd.DataFrame(columns=["predicted"], index=self.forcast_horizon(h))

    def __forecast_without_exhog(self, h: int) -> pd.DataFrame:
        forecast_df = self.__prepare_forecast_df(h)
        forecast_values = []
        x_data = self._last_fited_data
        for i in range(h):
            fcast_t = self._estimator.predict(x_data)
            x_data = np.append(x_data, fcast_t.reshape(1, -1), axis=1)[
                :, -self.window_len :
            ]
            forecast_values.append(fcast_t.flatten().item())
        forecast_df["predicted"] = forecast_values
        return forecast_df["predicted"]

    def __forecast_with_exhog(self, h: int, x_hog: pd.DataFrame) -> pd.DataFrame:
        forecast_df = self.__prepare_forecast_df(h)
        forecast_values = []
        x_data = self._last_fited_data

        x_hog_np = x_hog.to_numpy()
        for i in range(h):
            fcast_t = self._estimator.predict(x_data)
            y_lags = x_data[:, -self.window_len :]
            x_lags = x_data[:, : -self.window_len]
            y_new = np.append(y_lags, fcast_t[np.newaxis, :], axis=1)[
                :, -self.window_len :
            ]
            x_new = np.append(
                x_lags.reshape(self.window_len, -1), x_hog_np[i : i + 1, :], axis=0
            )[-self.window_len :, :]
            x_data = np.hstack([x_new.flatten(), y_new.flatten()])[np.newaxis, :]
            forecast_values.append(fcast_t.flatten().item())
        forecast_df["predicted"] = forecast_values
        return forecast_df

    def forecast(self, h: int, x_hog: Optional[pd.DataFrame] = None) -> pd.DataFrame:
        if (self._fitted_with_xhog is True) and (x_hog is None):
            raise Exception("Xhog is needed , coz estimater was fitted with xhog")
        elif (self._fitted_with_xhog is True) and (x_hog is not None):
            return self.__forecast_with_exhog(h, x_hog)
        elif (self._fitted_with_xhog is False) and (x_hog is not None):
            print("Xhog will be ignored as it was fitted with out xhog")
            return self.__forecast_without_exhog(h)
        else:
            return self.__forecast_without_exhog(h)

    def fit_forecast(
        self,
        y_train: pd.DataFrame,
        h: int,
        x_hog_train: Optional[pd.DataFrame] = None,
        x_hog: Optional[pd.DataFrame] = None,
    ) -> pd.DataFrame:
        self.fit(y_train, x_hog_train)
        return self.forecast(h, x_hog)

    def in_sample_forcast(self) -> pd.DataFrame:
        pass

In [37]:
test_data = pd.DataFrame(
    columns=["v1", "v2", "t"],
    index=pd.date_range(start="2023-01-01", freq="D", periods=20),
)

test_data["v1"] = np.arange(1, 21)
test_data["v2"] = test_data["v1"] + 0.1
test_data["t"] = test_data["v1"] + 0.01

y_d = test_data[["t"]].copy(deep=True)
x_d = test_data[["v1", "v2"]].copy(deep=True)

print(test_data.shape)

test_data.head(15)

(20, 3)


Unnamed: 0,v1,v2,t
2023-01-01,1,1.1,1.01
2023-01-02,2,2.1,2.01
2023-01-03,3,3.1,3.01
2023-01-04,4,4.1,4.01
2023-01-05,5,5.1,5.01
2023-01-06,6,6.1,6.01
2023-01-07,7,7.1,7.01
2023-01-08,8,8.1,8.01
2023-01-09,9,9.1,9.01
2023-01-10,10,10.1,10.01


In [38]:
fcast = Forecaster(3)

fcast.fit(y_d.head(15))
fcast.forecast(5)

2023-01-16    16.01
2023-01-17    17.01
2023-01-18    18.01
2023-01-19    19.01
2023-01-20    20.01
Freq: D, Name: predicted, dtype: float64

In [39]:
y_d.head(15).index[-1]

Timestamp('2023-01-15 00:00:00', freq='D')

In [40]:
from numpy.lib.stride_tricks import sliding_window_view

In [41]:
test_data.to_numpy()

array([[ 1.  ,  1.1 ,  1.01],
       [ 2.  ,  2.1 ,  2.01],
       [ 3.  ,  3.1 ,  3.01],
       [ 4.  ,  4.1 ,  4.01],
       [ 5.  ,  5.1 ,  5.01],
       [ 6.  ,  6.1 ,  6.01],
       [ 7.  ,  7.1 ,  7.01],
       [ 8.  ,  8.1 ,  8.01],
       [ 9.  ,  9.1 ,  9.01],
       [10.  , 10.1 , 10.01],
       [11.  , 11.1 , 11.01],
       [12.  , 12.1 , 12.01],
       [13.  , 13.1 , 13.01],
       [14.  , 14.1 , 14.01],
       [15.  , 15.1 , 15.01],
       [16.  , 16.1 , 16.01],
       [17.  , 17.1 , 17.01],
       [18.  , 18.1 , 18.01],
       [19.  , 19.1 , 19.01],
       [20.  , 20.1 , 20.01]])