In [1]:
import argparse
from pathlib import Path
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from types import SimpleNamespace
import time
import json
try:
    import psutil
    psutil_available = True
    proc = psutil.Process()
except Exception:
    psutil_available = False
    import resource

def infer_file(data_dir: Path, file_arg: str = None) -> Path:
    if file_arg:
        f = Path(file_arg)
        if not f.exists():
            raise FileNotFoundError(f)
        return f
    files = sorted(data_dir.glob("*.csv"), key=lambda p: p.stat().st_mtime, reverse=True)
    if not files:
        raise FileNotFoundError(f"No CSV files in {data_dir}")
    return files[0]

def load_series(path: Path, y_col: str = "y"):
    df = pd.read_csv(path)
    if y_col not in df.columns:
        raise ValueError(f"Column '{y_col}' not found in {path}")
    series = df[y_col].copy()
    # If there's a 't' column and it's datetime-like, try to set index
    if "t" in df.columns:
        try:
            idx = pd.to_datetime(df["t"])
            series.index = idx
        except Exception:
            series.index = pd.RangeIndex(start=0, stop=len(series), step=1)
    else:
        series.index = pd.RangeIndex(start=0, stop=len(series), step=1)
    return series, df

def make_future_index(index, h):
    if isinstance(index, pd.DatetimeIndex):
        freq = index.freq or pd.infer_freq(index)
        if freq is None:
            # fallback to daily
            freq = "D"
        return pd.date_range(start=index[-1] + pd.tseries.frequencies.to_offset(freq), periods=h, freq=freq)
    else:
        return np.arange(len(index), len(index) + h)

In [25]:
# Notebook-friendly: задайте параметры вручную (или оставьте None для авто-выбора последнего файла)
file_arg = "../data/ts_1000_1_02.csv"               # например: "data/ts_1000_1_02.csv" или None
seasonal_periods = None
trend = "add"
seasonal = None
ycol = "y"

args = SimpleNamespace(
    file=file_arg,
    seasonal_periods=seasonal_periods,
    trend=trend,
    seasonal=seasonal,
    ycol=ycol,
)

# Папки относительно текущей рабочей директории в контейнере
project_root = Path.cwd().resolve().parent
data_dir = project_root / "data"
out_dir = project_root / "output"
data_dir.mkdir(parents=True, exist_ok=True)
out_dir.mkdir(parents=True, exist_ok=True)

# Выбор файла: если указали file_arg — используем его, иначе берём последний в data/
csv_path = infer_file(data_dir, args.file)
series, raw_df = load_series(csv_path, args.ycol)

trend = None if args.trend == "none" else args.trend
seasonal = None if args.seasonal == "none" else args.seasonal
seasonal_periods = args.seasonal_periods

# If seasonal_periods not provided but seasonal requested, try to infer simple default
if seasonal and seasonal_periods is None:
    seasonal_periods = 12

In [None]:
# Разделение на train/test
split = max(1, int(len(series) * 0.7))
train = series.iloc[:split]
test = series.iloc[split:]

# Построение и обучение на train
model = ExponentialSmoothing(
    train,
    trend=trend,
    seasonal=seasonal,
    seasonal_periods=seasonal_periods,
    initialization_method="estimated"
)

if psutil_available:
    mem_before = proc.memory_info().rss
else:
    mem_before = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024

t0 = time.perf_counter()
fitted = model.fit(optimized=True)
train_time_s = time.perf_counter() - t0

if psutil_available:
    mem_after_fit = proc.memory_info().rss
else:
    mem_after_fit = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024

mem_used_fit_bytes = max(0, mem_after_fit - mem_before)

# Предсказание на test (out-of-sample) и время предсказания
t0 = time.perf_counter()
fcast = fitted.forecast(len(test))
predict_time_s = time.perf_counter() - t0

if psutil_available:
    mem_after_pred = proc.memory_info().rss
else:
    mem_after_pred = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * 1024

mem_used_pred_bytes = max(0, mem_after_pred - mem_after_fit)

# Сопоставление индексов: сохранить индекс как числовую позицию, как в исходных данных
if isinstance(series.index, pd.DatetimeIndex):
    idx_vals = np.arange(len(series), len(series) + len(fcast))
else:
    # если индекс уже числовой, но может начинаться не с 0 — используем относительные позиции
    try:
        idx_vals = fcast.index.astype(int)
    except Exception:
        idx_vals = np.arange(len(series), len(series) + len(fcast))

# Сохранение прогноза (index, value)
out_name = f"{csv_path.stem}_hw_h{len(fcast)}_s{seasonal_periods if seasonal_periods else 0}.csv"
out_path = out_dir / out_name
df_out = pd.DataFrame({"index": idx_vals, "value": fcast.values})
df_out.to_csv(out_path, index=False)

# Оценка качества: RMSE, MAE, MAPE (без деления на ноль)
y_true = test.values
y_pred = fcast.values
err = y_true - y_pred
rmse = float(np.sqrt(np.mean(err ** 2)))
mae = float(np.mean(np.abs(err)))
with np.errstate(divide='ignore', invalid='ignore'):
    mape_arr = np.abs(err / np.where(y_true == 0, np.nan, y_true))
    mape = float(np.nanmean(mape_arr) * 100)  # в процентах

# Чтение истинных параметров из raw_df
true_params = {}
for col in raw_df.columns:
    if col.startswith("true_"):
        name = col.replace("true_", "")
        true_params[name] = raw_df[col].iloc[0]

# Отклонение оценок параметров модели от истинных
param_deviation = {}

def _to_scalar(x):
    try:
        arr = np.asarray(x)
        if arr.size == 0:
            return None
        return float(arr.ravel()[-1])
    except Exception:
        try:
            return float(x)
        except Exception:
            return None

if true_params:
    params = getattr(fitted, "params", {}) or {}

    # стартовый уровень: предпочитаем initial_level, иначе первый элемент fitted.level
    est_level = None
    if "initial_level" in params:
        est_level = _to_scalar(params["initial_level"])
    else:
        lvl = getattr(fitted, "level", None)
        if lvl is not None:
            try:
                est_level = float(np.asarray(lvl).ravel()[0])  # начальный уровень
            except Exception:
                est_level = _to_scalar(lvl)

    if pd.notna(true_params.get("intercept")) and est_level is not None:
        param_deviation["intercept_vs_initial_level"] = float(abs(float(true_params["intercept"]) - est_level))

    # стартовый тренд: предпочитаем initial_trend, иначе первый элемент slope (если есть)
    est_trend = None
    if "initial_trend" in params:
        est_trend = _to_scalar(params["initial_trend"])
    else:
        slope = getattr(fitted, "slope", None)
        if slope is not None:
            try:
                est_trend = float(np.asarray(slope).ravel()[0])
            except Exception:
                est_trend = _to_scalar(slope)
        elif "smoothing_slope" in params:
            est_trend = _to_scalar(params.get("smoothing_slope"))

    if pd.notna(true_params.get("trend")) and est_trend is not None:
        param_deviation["trend_vs_initial_trend"] = float(abs(float(true_params["trend"]) - est_trend))

    # сезонная амплитуда: оставить как ранее (усреднённая амплитуда оценённой сезонности)
    if pd.notna(true_params.get("season_amp")):
        est_season = getattr(fitted, "season", None)
        if est_season is not None:
            est_arr = np.asarray(est_season)
            est_amp = float(np.nanmean(np.abs(est_arr))) if est_arr.size else None
            if est_amp is not None:
                param_deviation["season_vs_amp"] = float(abs(float(true_params["season_amp"]) - est_amp))

# Сбор метрик и сохранение
metrics = {
    "n_total": int(len(series)),
    "n_train": int(len(train)),
    "n_test": int(len(test)),
    "train_time_s": float(train_time_s),
    "predict_time_s": float(predict_time_s),
    "mem_used_fit_bytes": int(mem_used_fit_bytes),
    "mem_used_pred_bytes": int(mem_used_pred_bytes),
    "rmse": rmse,
    "mae": mae,
    "mape_pct": mape,
    "param_deviation": param_deviation,
    "forecast_csv": str(out_path),
}

metrics_path = out_dir / f"{csv_path.stem}_hw_metrics.json"
with open(metrics_path, "w") as f:
    json.dump(metrics, f, indent=2)

# График: train, test, прогноз
plt.figure(figsize=(10, 5))
plt.plot(train.index, train.values, label="train", marker="o")
plt.plot(test.index, test.values, label="test", marker="o")
# fitted.fittedvalues соответствует обучающим значениям
plt.plot(train.index, fitted.fittedvalues, label="fitted (train)", alpha=0.7)
# plot forecast using numeric x for forecast if index types differ
try:
    plt.plot(fcast.index, fcast.values, label="forecast", marker="o")
except Exception:
    plt.plot(idx_vals, fcast.values, label="forecast", marker="o")
plt.legend()
plt.title(f"Holt-Winters forecast ({csv_path.name})")
plot_path = out_dir / f"{csv_path.stem}_hw_plot_h{len(fcast)}.png"
plt.tight_layout()
plt.savefig(plot_path)
plt.close()

print(f"Loaded: {csv_path}")
print(f"Forecast saved: {out_path}")
print(f"Plot saved: {plot_path}")
print(f"Metrics saved: {metrics_path}")
print(f"RMSE={rmse:.4f}, MAE={mae:.4f}, MAPE={mape:.2f}%")

  self._init_dates(dates, freq)


Loaded: ../data/ts_1000_1_02.csv
Forecast saved: /home/jovyan/work/Time-series-forecasting-Holt-Winters-model/output/ts_1000_1_02_hw_h300_s0.csv
Plot saved: /home/jovyan/work/Time-series-forecasting-Holt-Winters-model/output/ts_1000_1_02_hw_plot_h300.png
Metrics saved: /home/jovyan/work/Time-series-forecasting-Holt-Winters-model/output/ts_1000_1_02_hw_metrics.json
RMSE=2.4020, MAE=1.4426, MAPE=1.53%
