In [None]:
# Статистические модели для прогнозирования временных рядов

В этом блокноте тестируем статистические модели из пакета ETNA:
- AutoARIMA
- SARIMAX
- Holt-Winters
- StatsForecast ARIMA

Для статистических моделей важно правильно подготовить данные (приведение к стационарности).


In [None]:
# Настройка среды
import sys
print(sys.executable)


In [None]:
# Установка необходимых пакетов
import subprocess
import os
import sys

# Путь к pip в активном ядре
pip_path = os.path.join(sys.prefix, "bin", "pip")

# Установка
subprocess.check_call([pip_path, "install", "etna[all]", "pandas", "matplotlib", "statsmodels", "pmdarima"])


In [None]:
# Импорты
from datetime import datetime
import pandas as pd
from math import floor
import numpy as np
import os
from pathlib import Path
import time
import warnings
warnings.filterwarnings('ignore')

# ETNA
from etna.datasets import TSDataset
from etna.models import (
    AutoARIMAModel, 
    SARIMAXModel, 
    HoltWintersModel,
    StatsForecastARIMAModel
)
from etna.metrics import MAE, MAPE, RMSE, SMAPE
from etna.analysis import plot_forecast

# Статистические тесты и преобразования
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.diagnostic import acorr_ljungbox
import matplotlib.pyplot as plt
import seaborn as sns

# Установим читаемый формат для вывода чисел
pd.options.display.float_format = '{:.3f}'.format


In [None]:
# Функция для расчета Directional Accuracy
def directional_accuracy(actual, predicted):
    """
    Calculates Directional Accuracy by comparing each predicted value's
    direction with the actual value's direction.
    """
    actual_direction = np.sign(actual - np.roll(actual, 1))  # Direction of actual values
    predicted_direction = np.sign(predicted - np.roll(predicted, 1))  # Direction of predictions

    # Ignore the first element (due to np.roll)
    actual_direction[0] = np.nan
    predicted_direction[0] = np.nan

    # Calculate accuracy for elements where both actual and predicted directions are not NaN
    mask = ~np.isnan(actual_direction) & ~np.isnan(predicted_direction)

    return np.mean(actual_direction[mask] == predicted_direction[mask]) * 100


In [None]:
# ADF тест для проверки стационарности
def check_stationarity(timeseries, title):
    """
    Выполняет Augmented Dickey-Fuller тест для проверки стационарности.
    """
    print(f'Results of Augmented Dickey-Fuller Test for {title}:')
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)'%key] = value
    print(dfoutput)
    
    if dftest[1] <= 0.05:
        print("*** Series is Stationary ***")
        return True
    else:
        print("*** Series is Non-Stationary ***")
        return False


In [None]:
# Функции для приведения к стационарности и обратного преобразования
def make_stationary(ts, method='diff'):
    """
    Приводит временной ряд к стационарному виду.
    
    Parameters:
    ts: pandas Series - временной ряд
    method: str - метод преобразования ('diff', 'log_diff', 'pct_change')
    
    Returns:
    transformed_ts: pandas Series - преобразованный ряд
    """
    if method == 'diff':
        return ts.diff().dropna()
    elif method == 'log_diff':
        return np.log(ts).diff().dropna()
    elif method == 'pct_change':
        return ts.pct_change().dropna()
    else:
        raise ValueError("Method should be 'diff', 'log_diff', or 'pct_change'")

def inverse_transform(original_ts, transformed_predictions, method='diff'):
    """
    Обращает преобразование для получения прогнозов в исходном масштабе.
    
    Parameters:
    original_ts: pandas Series - исходный временной ряд
    transformed_predictions: array-like - прогнозы в преобразованном масштабе
    method: str - метод преобразования, который нужно обратить
    
    Returns:
    original_scale_predictions: array - прогнозы в исходном масштабе
    """
    if method == 'diff':
        # Обращение разности
        last_value = original_ts.iloc[-1]
        predictions = []
        current_value = last_value
        
        for diff_pred in transformed_predictions:
            next_value = current_value + diff_pred
            predictions.append(next_value)
            current_value = next_value
            
        return np.array(predictions)
    
    elif method == 'log_diff':
        # Обращение лог-разности
        last_log_value = np.log(original_ts.iloc[-1])
        predictions = []
        current_log_value = last_log_value
        
        for log_diff_pred in transformed_predictions:
            next_log_value = current_log_value + log_diff_pred
            next_value = np.exp(next_log_value)
            predictions.append(next_value)
            current_log_value = next_log_value
            
        return np.array(predictions)
    
    elif method == 'pct_change':
        # Обращение процентного изменения
        last_value = original_ts.iloc[-1]
        predictions = []
        current_value = last_value
        
        for pct_pred in transformed_predictions:
            next_value = current_value * (1 + pct_pred)
            predictions.append(next_value)
            current_value = next_value
            
        return np.array(predictions)
    
    else:
        raise ValueError("Method should be 'diff', 'log_diff', or 'pct_change'")


In [None]:
# Загрузка данных
def load_series_map(series_dir="../../data/series"):
    series_map = {}
    series_path = Path(series_dir)
    if not series_path.exists():
        raise FileNotFoundError(f"Папка не найдена: {series_dir}")
    for csv_file in series_path.glob("*.csv"):
        ticker = csv_file.stem.upper()
        df = pd.read_csv(csv_file)
        if "timestamp" not in df.columns or "close" not in df.columns:
            raise ValueError(f"{csv_file.name} не содержит 'timestamp' или 'close'")
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        df = df[["timestamp", "close"]].sort_values("timestamp").reset_index(drop=True)
        series_map[ticker] = df
        print(f"✔ Loaded: {csv_file} → ticker='{ticker}', rows={len(df)}")
    return series_map

series_map = load_series_map("../../data/series")


In [None]:
# Подготовка данных для моделирования
series = {}
for ticker, df in series_map.items():
    tmp = df.copy()
    tmp["timestamp"] = pd.to_datetime(tmp["timestamp"], utc=True)
    tmp = tmp.rename(columns={"close": "target"})
    tmp["segment"] = ticker
    tmp = tmp[["timestamp", "segment", "target"]]

    # 1) Сначала вручную реиндексируем по business-day (freq="B") и заполним пропуски ffill→bfill
    tmp = tmp.set_index("timestamp")
    all_biz = pd.date_range(start=tmp.index.min(), end=tmp.index.max(), freq="B", tz="UTC")
    tmp = tmp.reindex(all_biz)
    tmp["target"] = tmp["target"].ffill().bfill()
    tmp = tmp.reset_index().rename(columns={"index": "timestamp"})
    tmp["segment"] = ticker

    # 2) Строим одно-сегментный TSDataset с freq="B"
    ts = TSDataset(tmp, freq="B")

    # 3) Разбиваем 90% / 10% по количеству точек
    n_points = ts.df.shape[0]
    train_size = int(n_points * 0.9)
    test_size = n_points - train_size

    train_ts, test_ts = ts.train_test_split(test_size=test_size)

    # Проверим, что в конце train нет NaN
    df_train_flat = train_ts.to_pandas(flatten=True)
    last_val = df_train_flat.iloc[-1]['target']
    assert not pd.isna(last_val), f"NaN в последнем значении train для {ticker}"

    series[ticker] = (ts, train_ts, test_ts)


In [None]:
## AutoARIMA Model

AutoARIMA автоматически определяет наилучшие параметры ARIMA для временного ряда.


In [None]:
# AutoARIMA Model
results_map = {}

for ticker, (full_ts, train_ts, test_ts) in series.items():
    print(f"\n--- Тикер {ticker} ---")
    
    # 1) Инициализируем и обучаем AutoARIMA модель
    model = AutoARIMAModel()
    start_time = time.time()
    model.fit(train_ts)
    fit_time = time.time() - start_time
    
    # 2) Выполняем прогноз на test_size точек и замеряем время
    test_size = test_ts.df.shape[0]
    start_time = time.time()
    forecast_ts = model.forecast(ts=train_ts, prediction_size=test_size)
    end_time = time.time()
    forecast_time = ((end_time - start_time) / test_size) * 100  # Время в секундах на один прогноз
    
    # 3) Собираем прогноз и реальные значения
    df_pred = forecast_ts.to_pandas(flatten=True)
    df_true = test_ts.to_pandas(flatten=True)
    
    y_true = df_true["target"].values
    y_pred = df_pred["target"].values

    # 4) Считаем метрики
    mae_v   = MAE()(test_ts, forecast_ts)
    mape_v  = MAPE()(test_ts, forecast_ts)
    rmse_v  = RMSE()(test_ts, forecast_ts)
    smape_v = SMAPE()(test_ts, forecast_ts)
    da_v = directional_accuracy(y_true, y_pred)
    
    # 5) Сохраняем метрики и прогноз
    results_map[ticker] = {
        "MAE": mae_v[ticker],
        "MAPE (%)": mape_v[ticker],
        "RMSE": rmse_v[ticker],
        "SMAPE (%)": smape_v[ticker],
        "DA (%)": da_v,
        "Fit time (s)": fit_time,
        "Forecast time (s)": forecast_time
    }
    
    # 6) Визуализируем результаты
    plot_forecast(forecast_ts, test_ts)
    plt.title(f'{ticker} - AutoARIMA Forecast')
    plt.show()

# 7) Собираем сводную таблицу метрик
metrics_df = pd.DataFrame.from_dict(results_map, orient="index")
metrics_df.loc["AVERAGE"] = metrics_df.mean(numeric_only=True)
metrics_df = metrics_df.round(3)

print("\n=== Итоговая таблица метрик AutoARIMA ===")
display(metrics_df)


In [None]:
## SARIMAX Model

SARIMA с экзогенными переменными. Подходит для рядов с сезонностью.


In [None]:
# SARIMAX Model
results_map = {}

for ticker, (full_ts, train_ts, test_ts) in series.items():
    print(f"\n--- Тикер {ticker} ---")
    
    # 1) Инициализируем и обучаем SARIMAX модель
    # Параметры можно настроить: order=(p,d,q), seasonal_order=(P,D,Q,s)
    model = SARIMAXModel(order=(1,1,1), seasonal_order=(1,1,1,12))
    start_time = time.time()
    model.fit(train_ts)
    fit_time = time.time() - start_time
    
    # 2) Выполняем прогноз на test_size точек и замеряем время
    test_size = test_ts.df.shape[0]
    start_time = time.time()
    forecast_ts = model.forecast(ts=train_ts, prediction_size=test_size)
    end_time = time.time()
    forecast_time = ((end_time - start_time) / test_size) * 100  # Время в секундах на один прогноз
    
    # 3) Собираем прогноз и реальные значения
    df_pred = forecast_ts.to_pandas(flatten=True)
    df_true = test_ts.to_pandas(flatten=True)
    
    y_true = df_true["target"].values
    y_pred = df_pred["target"].values

    # 4) Считаем метрики
    mae_v   = MAE()(test_ts, forecast_ts)
    mape_v  = MAPE()(test_ts, forecast_ts)
    rmse_v  = RMSE()(test_ts, forecast_ts)
    smape_v = SMAPE()(test_ts, forecast_ts)
    da_v = directional_accuracy(y_true, y_pred)
    
    # 5) Сохраняем метрики и прогноз
    results_map[ticker] = {
        "MAE": mae_v[ticker],
        "MAPE (%)": mape_v[ticker],
        "RMSE": rmse_v[ticker],
        "SMAPE (%)": smape_v[ticker],
        "DA (%)": da_v,
        "Fit time (s)": fit_time,
        "Forecast time (s)": forecast_time
    }
    
    # 6) Визуализируем результаты
    plot_forecast(forecast_ts, test_ts)
    plt.title(f'{ticker} - SARIMAX Forecast')
    plt.show()

# 7) Собираем сводную таблицу метрик
metrics_df = pd.DataFrame.from_dict(results_map, orient="index")
metrics_df.loc["AVERAGE"] = metrics_df.mean(numeric_only=True)
metrics_df = metrics_df.round(3)

print("\n=== Итоговая таблица метрик SARIMAX ===")
display(metrics_df)


In [None]:
## Holt-Winters Model

Экспоненциальное сглаживание с трендом и сезонностью.


In [None]:
# Holt-Winters Model
results_map = {}

for ticker, (full_ts, train_ts, test_ts) in series.items():
    print(f"\n--- Тикер {ticker} ---")
    
    # 1) Инициализируем и обучаем Holt-Winters модель
    # Параметры: seasonal_periods - период сезонности, trend, seasonal - типы тренда и сезонности
    model = HoltWintersModel(seasonal_periods=12, trend="add", seasonal="add")
    start_time = time.time()
    model.fit(train_ts)
    fit_time = time.time() - start_time
    
    # 2) Выполняем прогноз на test_size точек и замеряем время
    test_size = test_ts.df.shape[0]
    start_time = time.time()
    forecast_ts = model.forecast(ts=train_ts, prediction_size=test_size)
    end_time = time.time()
    forecast_time = ((end_time - start_time) / test_size) * 100  # Время в секундах на один прогноз
    
    # 3) Собираем прогноз и реальные значения
    df_pred = forecast_ts.to_pandas(flatten=True)
    df_true = test_ts.to_pandas(flatten=True)
    
    y_true = df_true["target"].values
    y_pred = df_pred["target"].values

    # 4) Считаем метрики
    mae_v   = MAE()(test_ts, forecast_ts)
    mape_v  = MAPE()(test_ts, forecast_ts)
    rmse_v  = RMSE()(test_ts, forecast_ts)
    smape_v = SMAPE()(test_ts, forecast_ts)
    da_v = directional_accuracy(y_true, y_pred)
    
    # 5) Сохраняем метрики и прогноз
    results_map[ticker] = {
        "MAE": mae_v[ticker],
        "MAPE (%)": mape_v[ticker],
        "RMSE": rmse_v[ticker],
        "SMAPE (%)": smape_v[ticker],
        "DA (%)": da_v,
        "Fit time (s)": fit_time,
        "Forecast time (s)": forecast_time
    }
    
    # 6) Визуализируем результаты
    plot_forecast(forecast_ts, test_ts)
    plt.title(f'{ticker} - Holt-Winters Forecast')
    plt.show()

# 7) Собираем сводную таблицу метрик
metrics_df = pd.DataFrame.from_dict(results_map, orient="index")
metrics_df.loc["AVERAGE"] = metrics_df.mean(numeric_only=True)
metrics_df = metrics_df.round(3)

print("\n=== Итоговая таблица метрик Holt-Winters ===")
display(metrics_df)


In [None]:
## Анализ стационарности и работа с преобразованными данными

Проверим стационарность рядов и покажем, как работать с преобразованиями.


In [None]:
# Проверим стационарность для нескольких рядов
sample_tickers = ['SBER', 'LKOH', 'MOEX']  # Берем несколько тикеров для анализа

for ticker in sample_tickers:
    if ticker in series_map:
        print(f"\n{'='*50}")
        print(f"Анализ стационарности для {ticker}")
        print(f"{'='*50}")
        
        ts_data = series_map[ticker]['close']
        
        # 1) Проверим исходный ряд
        print(f"\n1. Исходный ряд {ticker}:")
        is_stationary_orig = check_stationarity(ts_data, f"{ticker} Original")
        
        # 2) Проверим после взятия первой разности
        print(f"\n2. Ряд {ticker} после взятия разности:")
        ts_diff = make_stationary(ts_data, method='diff')
        is_stationary_diff = check_stationarity(ts_diff, f"{ticker} Differenced")
        
        # 3) Проверим логарифмические доходности
        print(f"\n3. Ряд {ticker} - логарифмические доходности:")
        ts_log_diff = make_stationary(ts_data, method='log_diff')
        is_stationary_log_diff = check_stationarity(ts_log_diff, f"{ticker} Log Returns")
        
        # 4) Визуализация
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        
        # Исходный ряд
        axes[0,0].plot(ts_data)
        axes[0,0].set_title(f'{ticker} - Исходный ряд')
        axes[0,0].grid(True)
        
        # Разности
        axes[0,1].plot(ts_diff)
        axes[0,1].set_title(f'{ticker} - Первые разности')
        axes[0,1].grid(True)
        
        # Логарифмические доходности
        axes[1,0].plot(ts_log_diff)
        axes[1,0].set_title(f'{ticker} - Логарифмические доходности')
        axes[1,0].grid(True)
        
        # Процентные изменения
        ts_pct = make_stationary(ts_data, method='pct_change')
        axes[1,1].plot(ts_pct)
        axes[1,1].set_title(f'{ticker} - Процентные изменения')
        axes[1,1].grid(True)
        
        plt.tight_layout()
        plt.show()
        
        print(f"\nРезультаты для {ticker}:")
        print(f"Исходный ряд стационарен: {is_stationary_orig}")
        print(f"Ряд разностей стационарен: {is_stationary_diff}")
        print(f"Лог-доходности стационарны: {is_stationary_log_diff}")
