## Устанавливаем зависимости

In [None]:
!python --version

In [None]:
!pip install sktime==0.37.0 pytorch-forecasting==1.3.0 pytorch-lightning==2.1.3

## Подготавливаем данные

### Подготоваливаем данные в df

In [None]:
import pandas as pd

In [None]:
df = pd.read_csv('data/SANDUSDT.csv', sep=',', decimal='.', index_col='datetime', parse_dates=['datetime'])
df.head()

Копируем df чтобы избежать ошибок изменения

In [None]:
df = df.copy()

Проверяем типы

In [None]:
print(df.dtypes)

Изучаем полученные данные

In [None]:
df.describe()

### Визуализируем входные данные

In [None]:
!pip install plotly

In [None]:
import plotly.graph_objects as go

vdf = df.copy()

vdf['Close'] = vdf["Open"] + vdf["open_close_delta"]

fig = go.Figure(data=[go.Candlestick(
    #x=vdf["datetime"],
    x=vdf.index,
    open=vdf["Open"],
    high=vdf["High"],
    low=vdf["Low"],
    close=vdf["Close"],
    increasing_line_color='green',
    decreasing_line_color='red'
)])

fig.update_layout(
    title="OHLC график по торговым данным",
    xaxis_title="Время",
    yaxis_title="Цена",
    xaxis_rangeslider_visible=False
)

fig.show()

Полученные данные неравномерные. Заполняем промежуточные данные

In [None]:
df.index = pd.to_datetime(df.index)
df = df.sort_index()
df = df.resample('1min').asfreq()
#df = df.infer_objects(copy=False)

# Интерполируем только числовые колонки
numeric_cols = df.select_dtypes(include=["number"]).columns
df[numeric_cols] = df[numeric_cols].interpolate(method="linear")

Проверяем есть ли пустые данные

In [None]:
df.isna().sum()

In [None]:
df = df[:1200]

### Разделяем на тестовые данные и контрольные

In [None]:
import numpy as np

# Целевая переменная
y = df["High"].astype(np.float32)

# Остальные признаки: Неправильно, т.к. нам недоступен объем торгов и т.д.(Это надо будет упомянуть в презентации, это называется "data leakage")
# X = df.drop(columns=["High"])

# Правильный вариант
X = df[["sin_hour", "cos_hour", "sin_day_of_week", "cos_day_of_week"]] # datetime тут не упоминаем, потому что это индекс

In [None]:
from sktime.split import temporal_train_test_split
from sktime.forecasting.base import ForecastingHorizon

# Последние N шагов — на тест
y_train, y_test, X_train, X_test = temporal_train_test_split(y, X, test_size=48)

# Горизонт предсказания
fh = ForecastingHorizon(y_test.index, is_relative=False)

Проверяем валидность данных

In [None]:
y_train.isna().sum(), X_train.isna().sum()

## Конфигурируем пайплайн

### Пишем адаптер для DeepAR

Т.к. напрямую использование `PytorchForecastingDeepAR` в пайплайне невозможно(видимо баг, хз), нужно написать адаптер который будет расширять шаблонный класс BaseForecaster.

Похожий PR: https://github.com/sktime/sktime/pull/7447

In [None]:
from sktime.forecasting.base import BaseForecaster
from sktime.utils.validation.series import check_series


class DeepARAdapter(BaseForecaster):
    def __init__(self, deep_ar_forecaster):
        self.deep_ar_forecaster = deep_ar_forecaster
        super().__init__()

    def fit(self, y, X=None, fh=None):
        y = check_series(y)  # Проверить входные данные
        self._set_fh(fh)  # Установить (или проверить) горизонт прогноза
        self.deep_ar_forecaster.fit(y, X, fh=self._fh)  # Обучение модели DeepAR
        return self

    def predict(self, fh=None, X=None):
        self.check_is_fitted()  # Проверка, была ли модель обучена
        fh = self._check_fh(fh)  # Проверка горизонта прогноза
        res = self.deep_ar_forecaster.predict(fh, X)  # Получить прогноз

        print(res)

        return res

    def _set_fh(self, fh):
        self._fh = self._check_fh(fh)

    def is_fitted(self):
        return True

### (Опционально) Пишем свой SoftplusScaler
Сделал чтобы прверить что такое вообще возможно. Вроде как дружит с TorchNormalizer-ом

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import torch

class SoftplusScaler(BaseEstimator, TransformerMixin):
    def __init__(self, epsilon=1e-6):  # теперь epsilon — параметр конструктора
        self.epsilon = epsilon

    def fit(self, X, y=None):
        X = self._to_numpy(X)
        self.min_ = X.min()
        return self

    def transform(self, X):
        X = self._to_numpy(X)
        return np.log1p(np.exp(X))  # softplus

    def inverse_transform(self, X):
        X = self._to_numpy(X)
        return np.log(np.expm1(X) + self.epsilon)  # inverse softplus

    def _to_numpy(self, X):
        if isinstance(X, torch.Tensor):
            return X.detach().cpu().numpy()
        elif hasattr(X, "to_numpy"):
            return X.to_numpy()
        return X


 ### Пишем кастомный LagFeatureTransformer для X

In [None]:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd

class LagFeatureTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, columns, lags):
        self.columns = columns
        self.lags = lags

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X = X.copy()
        for col in self.columns:
            for lag in self.lags:
                X[f"{col}_lag{lag}"] = X[col].shift(lag)
        return X


### Создаем пайплайн

НО!: на маке не работает нормально если использовать ускоритель(trainer_params.accelerator)=auto.
Причина:
> Это известная проблема при обучении PyTorch на MPS (Apple Silicon GPU). Дело не в данных и не в гиперпараметрах модели, а именно в реализации вычислений на GPU с MPS.
> MPS (device='mps:0') пока ещё нестабилен в PyTorch и часто генерирует NaN при вычислениях градиентов и некоторых операций, особенно для сложных моделей (RNN, GRU, LSTM, DeepAR).
Это вызвано тем, что поддержка MPS в PyTorch официально пока не завершена и не до конца стабилизирована.

In [None]:
from sktime.forecasting.compose import TransformedTargetForecaster
from sktime.transformations.series.adapt import TabularToSeriesAdaptor
from sklearn.preprocessing import StandardScaler
from sktime.forecasting.pytorchforecasting import PytorchForecastingDeepAR
from pytorch_forecasting.data.encoders import TorchNormalizer

# Mac: Apple Silicon
platform_dependant_params = {
    "accelerator": "cpu",
    "learning_rate": 1e-4,
}

# # Other
# platform_dependant_params = {
#     "accelerator": "auto",
#     "learning_rate": 1e-5,
# }

trainer_params = {
    "max_epochs": 20,
    "gradient_clip_val": 0.1,
    "accelerator": platform_dependant_params["accelerator"],
    "enable_model_summary": True,
}

model_params = {
    "cell_type": "GRU",
    "hidden_size": 64,  # (опционально) размер GRU
    "dropout": 0.1,  # (опционально)
    "rnn_layers": 2,  # (опционально)
    #"loss": NormalDistributionLoss(),
    "output_transformer": TorchNormalizer(transformation="softplus"),
    "learning_rate": platform_dependant_params["learning_rate"],
}

deep_ar_forecaster = PytorchForecastingDeepAR(
    trainer_params=trainer_params,
    model_params=model_params,
)

pipe_deepar = TransformedTargetForecaster(steps=[
    # Преобразование временного ряда y с помощью StandardScaler
    #("scaler", TabularToSeriesAdaptor(StandardScaler())), # либо TorchNormalizer(сразу два нельзя)

    # Генерация лагов по X. Лаги нужны чтобы учитывались Volume, Open, Low из прошлых значений
    # ("lagger", TabularToSeriesAdaptor(LagFeatureTransformer(columns=["Volume", "Open", "Low"], lags=[1,2]))),

    # Softplus скейлер
    ("scaler", TabularToSeriesAdaptor(SoftplusScaler())),

    # Прогнозирование
    ("forecast", DeepARAdapter(deep_ar_forecaster))
])

## Обучаем модель
**Важно**: DeepAR ожидает на вход только положительные значения, так что нужно поизвращаться с подготовкой значений для входа

In [None]:
pipe_deepar.fit(y_train, X=X_train, fh=fh)

### Получаем прогноз от модели

In [None]:
import matplotlib.pyplot as plt

# Прогнозирование с автоматическим обратным преобразованием
y_pred = pipe_deepar.predict(fh=fh, X=X_test)

scaler = pipe_deepar.named_steps["scaler"].transformer_  # SoftplusScaler

# 1. Получаем объект scaler из пайплайна
scaler = pipe_deepar.named_steps["scaler"].transformer_

# 2. Применяем обратное преобразование к прогнозу
y_pred_raw = scaler.inverse_transform(y_pred.to_frame()).squeeze()

# 3. Создаем Series с правильными индексами
y_pred_descaled = pd.Series(y_pred_raw, index=y_pred.index, name=y_pred.name)

# Визуализация с исходным масштабом
plt.figure(figsize=(15, 7))
plt.plot(y.index, y.values, label='Исходные данные', alpha=0.5)
plt.plot(y_train.index, y_train.values, label='Тренировочные данные', color='blue')
plt.plot(y_pred_descaled.index, y_pred_descaled.values,
         label='Прогноз (исходный масштаб)',
         color='red',
         linestyle='--')
plt.legend()
plt.show()