# Многомерные временные ряды и VAR
Полный учебный ноутбук с теорией и практикой.

## Введение в многомерные временные ряды

Многомерные временные ряды представляют собой набор нескольких рядов, измеряемых одновременно. Они применяются в экономике, финансах, IoT, метеорологии и других областях, где переменные взаимосвязаны. Такие ряды демонстрируют перекрестные корреляции, общие тренды, коинтеграцию и сложные многомерные паттерны.

## Импорт библиотек

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller, grangercausalitytests
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.stats.stattools import durbin_watson
from sklearn.metrics import mean_squared_error, mean_absolute_error
import warnings
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8-darkgrid')
plt.rcParams['figure.figsize']=(15,8)
plt.rcParams['font.size']=12
np.random.seed(42)

## Генерация синтетических многомерных временных рядов

In [None]:
def generate_multivariate_ts(n_points=200, n_series=3, trend_strength=0.02,
                            seasonality_period=50, noise_level=0.5,
                            cross_correlation=0.7):
    """
    Генерация многомерного временного ряда с корреляцией между компонентами

    Параметры:
    n_points: количество точек
    n_series: количество временных рядов
    trend_strength: сила тренда
    seasonality_period: период сезонности
    noise_level: уровень шума
    cross_correlation: уровень корреляции между рядами
    """
    # Базовые компоненты
    time_index = pd.date_range(start='2023-01-01', periods=n_points, freq='D')

    # Матрица для хранения всех рядов
    all_series = np.zeros((n_points, n_series))

    # Общие компоненты (тренд, сезонность)
    common_trend = trend_strength * np.arange(n_points)
    common_seasonality = 5 * np.sin(2 * np.pi * np.arange(n_points) / seasonality_period)

    # Генерация каждого ряда
    for i in range(n_series):
        # Индивидуальный тренд
        individual_trend = 0.01 * (i+1) * np.arange(n_points)

        # Индивидуальная сезонность
        phase_shift = i * 2 * np.pi / n_series
        individual_seasonality = 3 * np.sin(2 * np.pi * np.arange(n_points) /
                                           seasonality_period + phase_shift)

        # Базовое значение
        base_value = 20 * (i+1) + common_trend + common_seasonality + \
                    individual_trend + individual_seasonality

        # Добавление автокорреляции
        ar_component = np.zeros(n_points)
        for t in range(2, n_points):
            ar_component[t] = 0.7 * base_value[t-1] - 0.2 * base_value[t-2]

        # Перекрестная корреляция с другими рядами
        cross_corr_component = np.zeros(n_points)
        if i > 0:
            for j in range(i):
                cross_corr_component += cross_correlation * 0.3 * all_series[:, j]

        # Шум
        noise = noise_level * (i+1) * np.random.randn(n_points)

        # Итоговый ряд
        all_series[:, i] = base_value + ar_component + cross_corr_component + noise

    # Создание DataFrame
    columns = [f'Series_{i+1}' for i in range(n_series)]
    df = pd.DataFrame(all_series, index=time_index, columns=columns)

    # Добавление описательных статистик
    df['timestamp'] = time_index
    df.set_index('timestamp', inplace=True)

    return df

# Генерация данных
n_series = 4
df = generate_multivariate_ts(n_points=200, n_series=n_series,
                             trend_strength=0.01, seasonality_period=30,
                             cross_correlation=0.6)

# Визуализация многомерного временного ряда
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
colors = ['blue', 'green', 'red', 'purple']

for i, (col, ax) in enumerate(zip(df.columns, axes.flatten())):
    ax.plot(df.index, df[col], color=colors[i], linewidth=2)
    ax.set_title(f'Временной ряд {col}', fontsize=14)
    ax.set_xlabel('Дата')
    ax.set_ylabel('Значение')
    ax.grid(True, alpha=0.3)

    # Добавление статистики на график
    stats_text = f'Mean: {df[col].mean():.2f}\nStd: {df[col].std():.2f}'
    ax.text(0.02, 0.95, stats_text, transform=ax.transAxes,
            fontsize=10, verticalalignment='top',
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))

plt.suptitle('Многомерный временной ряд (4 компоненты)', fontsize=16)
plt.tight_layout()
plt.show()

# Корреляционная матрица
corr_matrix = df.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0,
            square=True, linewidths=1, cbar_kws={"shrink": .8})
plt.title('Корреляционная матрица временных рядов', fontsize=16)
plt.show()

print("Основные статистики многомерного ряда:")
print(df.describe())

## Проверка стационарности

In [None]:
def check_stationarity(series, significance_level=0.05):
    """
    Проверка стационарности временного ряда с помощью ADF теста
    """
    result = adfuller(series)

    print(f'ADF Statistic: {result[0]:.4f}')
    print(f'p-value: {result[1]:.4f}')
    print('Критические значения:')
    for key, value in result[4].items():
        print(f'\t{key}: {value:.4f}')

    if result[1] <= significance_level:
        print(f"Ряд стационарен (p-value ≤ {significance_level})")
        return True
    else:
        print(f"Ряд нестационарен (p-value > {significance_level})")
        return False

# Проверка стационарности каждого ряда
print("=" * 60)
print("Проверка стационарности временных рядов")
print("=" * 60)

stationarity_results = {}
for col in df.columns:
    print(f"\nАнализ ряда: {col}")
    print("-" * 40)
    is_stationary = check_stationarity(df[col])
    stationarity_results[col] = is_stationary

# Разностное преобразование для нестационарных рядов
def make_stationary(df):
    """
    Преобразование рядов к стационарному виду
    """
    df_stationary = df.copy()

    for col in df.columns:
        if not stationarity_results.get(col, False):
            # Первая разность
            df_stationary[col] = df[col].diff().dropna()
            # Проверка после преобразования
            print(f"\nПроверка стационарности после дифференцирования {col}:")
            if len(df_stationary[col].dropna()) > 0:
                check_stationarity(df_stationary[col].dropna())

    return df_stationary.dropna()

# Применяем преобразование если нужно
if not all(stationarity_results.values()):
    print("\n" + "=" * 60)
    print("Применение разностного преобразования")
    print("=" * 60)
    df_stationary = make_stationary(df)
else:
    df_stationary = df.copy()

# Визуализация стационарных рядов
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
for i, (col, ax) in enumerate(zip(df_stationary.columns, axes.flatten())):
    ax.plot(df_stationary.index, df_stationary[col], color=colors[i], linewidth=2)
    ax.set_title(f'Стационарный ряд {col}', fontsize=14)
    ax.set_xlabel('Дата')
    ax.set_ylabel('Значение')
    ax.axhline(y=0, color='red', linestyle='--', alpha=0.5)
    ax.grid(True, alpha=0.3)

plt.suptitle('Стационарные временные ряды', fontsize=16)
plt.tight_layout()
plt.show()

# Построение VAR модели

In [None]:
def select_var_lag(df, max_lag=15):
    """
    Выбор оптимального лага для VAR модели
    """
    # Разделение на обучающую и тестовую выборки
    train_size = int(len(df) * 0.8)
    train_data = df.iloc[:train_size]

    # Подбор оптимального лага
    model = VAR(train_data)

    # Используем разные информационные критерии
    print("Подбор оптимального лага для VAR модели:")
    print("-" * 50)

    for criterion in ['aic', 'bic', 'hqic']:
        lag_results = model.select_order(maxlags=max_lag)
        optimal_lag = getattr(lag_results, criterion)
        print(f"Оптимальный лаг по {criterion.upper()}: {optimal_lag}")

    # Визуализация критериев
    lags = range(1, max_lag + 1)
    aic_values = []
    bic_values = []
    hqic_values = []

    for lag in lags:
        try:
            result = model.fit(lag)
            aic_values.append(result.aic)
            bic_values.append(result.bic)
            hqic_values.append(result.hqic)
        except:
            aic_values.append(np.nan)
            bic_values.append(np.nan)
            hqic_values.append(np.nan)

    # График информационных критериев
    plt.figure(figsize=(12, 6))
    plt.plot(lags, aic_values, marker='o', label='AIC', linewidth=2)
    plt.plot(lags, bic_values, marker='s', label='BIC', linewidth=2)
    plt.plot(lags, hqic_values, marker='^', label='HQIC', linewidth=2)
    plt.xlabel('Порядок лага (p)')
    plt.ylabel('Значение критерия')
    plt.title('Информационные критерии для выбора порядка VAR модели', fontsize=16)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.show()

    # Рекомендуемый лаг (по AIC, но с учетом парсимоничности)
    recommended_lag = int(np.nanargmin(aic_values) + 1)
    print(f"\nРекомендуемый лаг для VAR модели: {recommended_lag}")

    return recommended_lag

# Выбор оптимального лага
optimal_lag = select_var_lag(df_stationary, max_lag=10)

# Разделение данных на обучающую и тестовую выборки
train_size = int(len(df_stationary) * 0.8)
train_data = df_stationary.iloc[:train_size]
test_data = df_stationary.iloc[train_size:]

print(f"\nРазмеры выборок:")
print(f"Обучающая выборка: {len(train_data)} наблюдений")
print(f"Тестовая выборка: {len(test_data)} наблюдений")

# Обучение VAR модели
print("\n" + "=" * 60)
print("Обучение VAR модели")
print("=" * 60)

var_model = VAR(train_data)
var_result = var_model.fit(optimal_lag)
print(var_result.summary())

# Проверка остатков модели
print("\nДиагностика остатков модели:")
print("-" * 40)

# Тест Дарбина-Ватсона на автокорреляцию
dw_stat = durbin_watson(var_result.resid)
print("Статистика Дарбина-Ватсона для остатков:")
for i, col in enumerate(df.columns):
    print(f"  {col}: {dw_stat[i]:.4f}")

# Нормальность остатков
from scipy.stats import jarque_bera

print("\nТест Харке-Бера на нормальность остатков:")
for i, col in enumerate(df.columns):
    jb_stat, jb_pvalue = jarque_bera(var_result.resid[:, i])
    print(f"  {col}: статистика={jb_stat:.4f}, p-value={jb_pvalue:.4f}")

# Прогнозирование с помощью VAR модели

In [None]:
def forecast_var(model, steps, last_observations):
    """
    Прогнозирование с помощью VAR модели
    """
    forecast = model.forecast(last_observations, steps)
    return forecast

# Прогнозирование на тестовой выборке
forecast_steps = len(test_data)
last_train_obs = train_data.values[-optimal_lag:]

# Прогноз
forecast = forecast_var(var_result, forecast_steps, last_train_obs)
forecast_index = test_data.index
forecast_df = pd.DataFrame(forecast, index=forecast_index, columns=df.columns)

# Обратное преобразование (если применяли дифференцирование)
if not all(stationarity_results.values()):
    # Для обратного преобразования нужны исходные данные
    last_values = df.iloc[train_size-1:train_size].values

    # Восстанавливаем уровни
    forecast_levels = forecast_df.copy()
    for i, col in enumerate(df.columns):
        if not stationarity_results.get(col, False):
            forecast_levels[col] = last_values[0, i] + forecast_df[col].cumsum()

    forecast_final = forecast_levels
    test_final = df.iloc[train_size:train_size+forecast_steps]
else:
    forecast_final = forecast_df
    test_final = test_data

# Визуализация прогнозов
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
for i, (col, ax) in enumerate(zip(df.columns, axes.flatten())):
    # Обучающие данные
    ax.plot(train_data.index, train_data[col] if col in train_data.columns else
            df[col].iloc[:train_size],
            color=colors[i], alpha=0.7, label='Обучающие данные')

    # Тестовые данные
    ax.plot(test_final.index, test_final[col],
            color=colors[i], linewidth=2, label='Фактические значения')

    # Прогноз
    ax.plot(forecast_final.index, forecast_final[col],
            color='black', linestyle='--', linewidth=2, label='Прогноз VAR')

    # Доверительный интервал (упрощенный)
    forecast_std = forecast_final[col].std()
    ax.fill_between(forecast_final.index,
                    forecast_final[col] - 1.96 * forecast_std,
                    forecast_final[col] + 1.96 * forecast_std,
                    alpha=0.2, color='gray', label='95% доверительный интервал')

    ax.set_title(f'Прогноз для {col}', fontsize=14)
    ax.set_xlabel('Дата')
    ax.set_ylabel('Значение')
    ax.legend()
    ax.grid(True, alpha=0.3)

plt.suptitle('Прогнозирование многомерного временного ряда с помощью VAR', fontsize=16)
plt.tight_layout()
plt.show()

# Оценка качества прогноза
print("=" * 60)
print("Оценка качества прогноза")
print("=" * 60)

metrics = pd.DataFrame(index=df.columns, columns=['RMSE', 'MAE', 'MAPE', 'R²'])

for col in df.columns:
    y_true = test_final[col].values
    y_pred = forecast_final[col].values

    # Вычисление метрик
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / (y_true + 1e-10))) * 100

    # R-квадрат
    ss_res = np.sum((y_true - y_pred) ** 2)
    ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
    r2 = 1 - (ss_res / (ss_tot + 1e-10))

    metrics.loc[col] = [rmse, mae, mape, r2]

print("\nМетрики качества прогноза:")
print(metrics.round(4))

# Анализ импульсных откликов

In [None]:
def plot_impulse_responses(var_model, periods=20, plot_variance_decomposition=False):
    """
    Визуализация импульсных откликов VAR модели
    """
    # Импульсные отклики
    irf = var_model.irf(periods=periods)

    # Графики импульсных откликов
    fig, axes = plt.subplots(n_series, n_series, figsize=(16, 12))

    for i in range(n_series):
        for j in range(n_series):
            ax = axes[i, j]
            ax.plot(irf.irfs[:, i, j], linewidth=2)
            ax.axhline(y=0, color='red', linestyle='--', alpha=0.5)
            ax.set_title(f'{df.columns[j]} → {df.columns[i]}', fontsize=10)
            ax.grid(True, alpha=0.3)

            if i == n_series - 1:
                ax.set_xlabel('Период')
            if j == 0:
                ax.set_ylabel('Отклик')

    plt.suptitle('Импульсные отклики VAR модели', fontsize=16)
    plt.tight_layout()
    plt.show()

    # Декомпозиция дисперсии ошибки прогноза
    if plot_variance_decomposition:
        print("\nДекомпозиция дисперсии ошибки прогноза:")
        print("-" * 50)

        # Декомпозиция на горизонте 10 периодов
        fevd = irf.fevd(periods=10)
        fevd_df = pd.DataFrame(
            fevd.decomp.mean(axis=0),  # Среднее по горизонтам
            index=df.columns,
            columns=df.columns
        )

        print("\nСредний вклад в ошибку прогноза (%):")
        print((fevd_df * 100).round(2))

        # Визуализация декомпозиции дисперсии
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        for idx, (col, ax) in enumerate(zip(df.columns, axes.flatten())):
            contributions = fevd.decomp[:, idx, :]
            periods_range = range(1, len(contributions) + 1)

            bottom = np.zeros(len(contributions))
            for i, comp_col in enumerate(df.columns):
                ax.fill_between(periods_range, bottom, bottom + contributions[:, i],
                               label=comp_col, alpha=0.7)
                bottom += contributions[:, i]

            ax.set_title(f'Декомпозиция дисперсии для {col}', fontsize=12)
            ax.set_xlabel('Горизонт прогноза')
            ax.set_ylabel('Доля дисперсии')
            ax.legend(loc='upper right', fontsize=8)
            ax.grid(True, alpha=0.3)

        plt.suptitle('Декомпозиция дисперсии ошибки прогноза', fontsize=16)
        plt.tight_layout()
        plt.show()

    return irf

# Построение импульсных откликов
print("\n" + "=" * 60)
print("Анализ импульсных откликов")
print("=" * 60)

irf_results = plot_impulse_responses(var_result, periods=15, plot_variance_decomposition=True)

# Тест причинности по Грейнджеру

In [None]:
def granger_causality_test(df, max_lag=5, significance_level=0.05):
    """
    Проверка причинно-следственных связей по Грейнджеру
    """
    n_vars = len(df.columns)
    causality_matrix = pd.DataFrame(np.zeros((n_vars, n_vars)),
                                    index=df.columns,
                                    columns=df.columns)

    print("Тест причинности по Грейнджеру:")
    print("-" * 50)

    for cause in df.columns:
        for effect in df.columns:
            if cause != effect:
                try:
                    test_result = grangercausalitytests(
                        df[[effect, cause]],
                        maxlag=max_lag,
                        verbose=False
                    )

                    # Берем p-value для оптимального лага
                    p_values = [test_result[i+1][0]['ssr_chi2test'][1]
                               for i in range(max_lag)]
                    min_p_value = min(p_values)

                    causality_matrix.loc[effect, cause] = min_p_value

                except Exception as e:
                    causality_matrix.loc[effect, cause] = 1.0

    # Визуализация матрицы причинности
    plt.figure(figsize=(10, 8))
    mask = np.zeros_like(causality_matrix.values, dtype=bool)
    np.fill_diagonal(mask, True)

    sns.heatmap(causality_matrix,
                mask=mask,
                annot=True,
                fmt=".3f",
                cmap='RdYlGn_r',
                center=significance_level,
                square=True,
                cbar_kws={'label': 'p-value'},
                linewidths=1)

    plt.title('Матрица причинности по Грейнджеру\n(ячейки: p-value для H0: нет причинности)',
              fontsize=14)
    plt.xlabel('Причина (Granger-causes)')
    plt.ylabel('Следствие (is caused by)')
    plt.tight_layout()
    plt.show()

    # Интерпретация результатов
    print("\nЗначимые причинно-следственные связи (p-value < 0.05):")
    print("-" * 50)

    significant_links = []
    for cause in df.columns:
        for effect in df.columns:
            if cause != effect:
                p_val = causality_matrix.loc[effect, cause]
                if p_val < significance_level:
                    significant_links.append((cause, effect, p_val))
                    print(f"{cause} → {effect}: p-value = {p_val:.4f}")

    if not significant_links:
        print("Не обнаружено значимых причинно-следственных связей")

    return causality_matrix, significant_links

# Проведение теста Грейнджера
causality_matrix, significant_links = granger_causality_test(df_stationary, max_lag=5)

# Проверка коинтеграции

In [None]:
def check_cointegration(df, significance_level=0.05):
    """
    Проверка коинтеграции многомерного временного ряда
    """
    print("Проверка коинтеграции (тест Йохансена):")
    print("-" * 50)

    # Тест Йохансена
    coint_test = coint_johansen(df, det_order=0, k_ar_diff=1)

    # Критические значения
    trace_critical_values = coint_test.cvt[:, 0]
    max_eigen_critical_values = coint_test.cvm[:, 0]

    print(f"Количество рядов: {len(df.columns)}")
    print(f"\nСобственные значения: {coint_test.eig}")

    print("\nРезультаты trace теста:")
    print("Ранг  Собст.знач  Trace-стат   Крит.знач 10%  Гипотеза")
    print("-" * 60)

    for i in range(len(df.columns)):
        hypothesis = "Нет коинтеграции" if i == 0 else f"Ранг ≤ {i}"
        trace_stat = coint_test.lr1[i]
        crit_value = trace_critical_values[i]
        is_significant = trace_stat > crit_value

        print(f"{i:4d}  {coint_test.eig[i]:10.4f}  {trace_stat:10.2f}  {crit_value:13.2f}  "
              f"{'Отвергаем' if is_significant else 'Принимаем'} {hypothesis}")

    print("\nРезультаты максимального собственного значения:")
    print("Ранг  Собст.знач  Max-стат    Крит.знач 10%  Гипотеза")
    print("-" * 60)

    for i in range(len(df.columns)):
        hypothesis = "Нет коинтеграции" if i == 0 else f"Ранг ≤ {i}"
        max_stat = coint_test.lr2[i]
        crit_value = max_eigen_critical_values[i]
        is_significant = max_stat > crit_value

        print(f"{i:4d}  {coint_test.eig[i]:10.4f}  {max_stat:9.2f}  {crit_value:13.2f}  "
              f"{'Отвергаем' if is_significant else 'Принимаем'} {hypothesis}")

    # Определение ранга коинтеграции
    rank_trace = 0
    rank_max = 0

    for i in range(len(df.columns)):
        if coint_test.lr1[i] > trace_critical_values[i]:
            rank_trace = i + 1
        if coint_test.lr2[i] > max_eigen_critical_values[i]:
            rank_max = i + 1

    print(f"\nРанг коинтеграции (по trace тесту): {rank_trace}")
    print(f"Ранг коинтеграции (по max eigen тесту): {rank_max}")

    if rank_trace > 0 or rank_max > 0:
        print("\nОбнаружена коинтеграция! Ряды имеют долгосрочное равновесное соотношение.")
        print("Коинтегрирующие векторы:")
        print(coint_test.evec[:, :max(rank_trace, rank_max)])
    else:
        print("\nКоинтеграция не обнаружена.")

    return coint_test, max(rank_trace, rank_max)

# Проверка коинтеграции
coint_result, coint_rank = check_cointegration(df)

# Задания для самостоятельной работы

Задание 1: VAR с экзогенными переменными (VARX)

Задача: Расширьте базовую VAR модель, добавив экзогенные переменные (VARX). Сгенерируйте данные с 2 эндогенными и 2 экзогенными переменными.

Требования:

    Сгенерируйте многомерный ряд с:

        2 эндогенными переменными (зависят от своих лагов и друг от друга)

        2 экзогенными переменными (влияют на эндогенные, но не наоборот)

    Реализуйте VARX модель

    Сравните прогнозную способность VAR и VARX

    Проанализируйте влияние экзогенных переменных на эндогенные

In [None]:
# Шаблон для решения задания 1
def generate_varx_data():
    """Генерация данных для VARX модели"""
    pass

def fit_varx_model():
    """Обучение VARX модели"""
    pass

Задание 2: Байесовская VAR (BVAR)

Задача: Реализуйте Байесовскую версию VAR модели с априорными распределениями коэффициентов.

Требования:

    Изучите концепции байесовского подхода:

        Minnesota prior

        Normal-Wishart prior

    Реализуйте BVAR с использованием:

        Марковских цепей Монте-Карло (MCMC)

        Вариационного вывода

    Сравните с классической VAR:

        Точность прогнозов

        Интервалы прогнозирования

        Устойчивость к переобучению

In [None]:
# Шаблон для решения задания 2
class BayesianVAR:
    """Реализация Байесовской VAR"""
    pass