# Практикум (3 курс). Задание 2 — Анализ временного ряда и прогнозирование

В этом ноутбуке мы шаг за шагом выполним требования задания:
- Загрузка данных `training.csv` и `testing.csv`
- Визуальная проверка стационарности (ряд + скользящие статистики)
- Тест Дики — Фуллера (ADF)
- Сезонное разложение (аддитивная и мультипликативная модели)
- Оценка интегрируемости (определение порядка d) и построение ACF/PACF
- Подбор ARIMA по AIC, прогноз на тестовом отрезке, R² и визуализация

Все пояснения и выводы даны на русском языке.


In [None]:
# Импорт библиотек и базовые настройки
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import r2_score

# Графики внутри ноутбука
%matplotlib inline
sns.set(style="whitegrid")

# Пути к данным и каталогу вывода
TRAIN_PATH = "training.csv"
TEST_PATH = "testing.csv"
OUTPUT_DIR = "outputs_notebook"

os.makedirs(OUTPUT_DIR, exist_ok=True)
print("Каталог для результатов:", os.path.abspath(OUTPUT_DIR))


In [None]:
# Загрузка данных и приведение индекса к датам

def read_series(csv_path, date_col="Date", value_col="Value"):
    df = pd.read_csv(csv_path)
    if date_col not in df.columns or value_col not in df.columns:
        raise ValueError("Ожидаются столбцы: Date, Value")
    df[date_col] = pd.to_datetime(df[date_col])
    df = df.sort_values(date_col).set_index(date_col)
    s = df[value_col].astype(float)
    # Попытка вывести частоту; если не удается — считаем месячной
    try:
        inferred = pd.infer_freq(s.index)
        if inferred is None:
            s.index = s.index.to_period("M").to_timestamp()
    except Exception:
        s.index = s.index.to_period("M").to_timestamp()
    return s

train = read_series(TRAIN_PATH)
print("Размер train:", train.shape)
train.head()


In [None]:
# Визуальная проверка стационарности: ряд + скользящие статистики
window = 12  # месячные данные — типичный выбор
roll_mean = train.rolling(window=window, min_periods=max(1, window // 3)).mean()
roll_std = train.rolling(window=window, min_periods=max(1, window // 3)).std()

plt.figure(figsize=(12, 5))
plt.plot(train.index, train.values, label="Ряд")
plt.plot(roll_mean.index, roll_mean.values, label=f"Скользящее среднее ({window})")
plt.plot(roll_std.index, roll_std.values, label=f"Скользящее СКО ({window})")
plt.title("Исходный ряд и скользящие статистики")
plt.xlabel("Дата"); plt.ylabel("Значение"); plt.legend(); plt.tight_layout()
plt.show()


In [None]:
# ADF-тест на стационарность исходного ряда
result = adfuller(train.dropna(), autolag="AIC")
statistic, pvalue, usedlag, nobs, crit_vals = result[0], result[1], result[2], result[3], result[4]
print("ADF статистика:", statistic)
print("p-value:", pvalue)
print("Исп. лаги:", usedlag)
print("Число наблюдений:", nobs)
print("Критические значения:", crit_vals)

# Краткий вывод
if pvalue < 0.05:
    print("Вывод: ряд можно считать стационарным на уровне значимости 5%.")
else:
    print("Вывод: ряд НЕстационарен на уровне значимости 5%.")


In [None]:
# Сезонное разложение: аддитивная и мультипликативная модели
period = 12

res_add = seasonal_decompose(train, model="additive", period=period, extrapolate_trend="freq")
fig = res_add.plot(); fig.set_size_inches(12,8); fig.suptitle("Аддитивная модель", fontsize=14); plt.tight_layout(); plt.show()

res_mul = seasonal_decompose(train, model="multiplicative", period=period, extrapolate_trend="freq")
fig = res_mul.plot(); fig.set_size_inches(12,8); fig.suptitle("Мультипликативная модель", fontsize=14); plt.tight_layout(); plt.show()

# ADF для остатков (как часто делают при анализе)
for name, resid in [("additive", res_add.resid), ("multiplicative", res_mul.resid)]:
    r = adfuller(pd.Series(resid).dropna(), autolag="AIC")
    print(f"[{name}] ADF p-value:", r[1])


In [None]:
# Определение порядка дифференцирования d с помощью ADF

def choose_d(series, max_d=2, alpha=0.05):
    for d in range(0, max_d + 1):
        s = series if d == 0 else series.diff(d).dropna()
        pv = adfuller(s, autolag="AIC")[1]
        if pv < alpha:
            return d
    return max_d

d = choose_d(train, max_d=2)
print("Выбранный порядок d:", d)

# ACF/PACF для разностного ряда
s_for_plots = train if d == 0 else train.diff(d).dropna()
fig, ax = plt.subplots(2, 1, figsize=(10, 8))
plot_acf(s_for_plots.dropna(), lags=min(40, len(s_for_plots)//2), ax=ax[0])
ax[0].set_title("ACF")
plot_pacf(s_for_plots.dropna(), lags=min(40, len(s_for_plots)//2), ax=ax[1], method="ywm")
ax[1].set_title("PACF")
plt.tight_layout(); plt.show()


In [None]:
# Подбор ARIMA по сетке p, q (малые границы для наглядности) и выбор по AIC
p_values = [0, 1, 2]
q_values = [0, 1, 2]

rows = []
for p in p_values:
    for q in q_values:
        order = (p, d, q)
        try:
            model = ARIMA(train, order=order)
            fitted = model.fit()
            aic = float(fitted.aic)
            rows.append({"p": p, "d": d, "q": q, "AIC": aic})
            print(f"ARIMA{order} AIC={aic:.2f}")
        except Exception as e:
            print(f"ARIMA{order} не сошлась: {e}")
            rows.append({"p": p, "d": d, "q": q, "AIC": np.inf})

candidates = pd.DataFrame(rows).sort_values("AIC").reset_index(drop=True)
candidates


In [None]:
# Обучение лучшей модели на train и прогноз на test, оценка R²
best_row = candidates.iloc[0]
best_order = (int(best_row["p"]), int(best_row["d"]), int(best_row["q"]))
print("Лучшая модель:", best_order, "AIC=", best_row["AIC"])

# Внимание: тест используется только для оценки, не для подбора
test = read_series(TEST_PATH)
model_best = ARIMA(train, order=best_order).fit()
forecast = model_best.forecast(steps=len(test))

# R²
y_true = test.reindex(forecast.index).values
y_pred = forecast.values
r2 = r2_score(y_true, y_pred)
print(f"R² на тесте: {r2:.4f}")

# Визуализация прогноза
plt.figure(figsize=(12, 5))
plt.plot(train.index, train.values, label="Train")
plt.plot(test.index, test.values, label="Test (true)")
plt.plot(forecast.index, forecast.values, label=f"Forecast ARIMA{best_order}")
plt.title(f"Сравнение прогноза | ARIMA{best_order} | R²={r2:.4f}")
plt.xlabel("Дата"); plt.ylabel("Значение"); plt.legend(); plt.tight_layout(); plt.show()
