In [2]:
# ! pip install numpy



In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression

# 1) Synthetic time series 생성 (changepoint + monthly seasonality + noise)
np.random.seed(42)
length = 200
dates = pd.date_range('2020-01-01', periods=length, freq='D')
t = np.arange(length)
trend = np.piecewise(
    t,
    [t < 60, (t >= 60) & (t < 130), t >= 130],
    [lambda x: 0.2 * x,
     lambda x: 0.2 * 60 + 0.5 * (x - 60),
     lambda x: 0.2 * 60 + 0.5 * 70 - 0.1 * (x - 130)]
)
season = 5 * np.sin(2 * np.pi * t / 30)
noise  = np.random.normal(0, 1, size=length)
y = trend + season + noise

# 학습/테스트 분할
train_len = 150
y_train, y_test = y[:train_len], y[train_len:]
dates_test = dates[train_len:]

# 2) Linear Regression 으로 추세+계절성 모델링
#    Feature: time index t, sin/cos for monthly seasonality
X = np.column_stack([
    t.reshape(-1,1),
    np.sin(2 * np.pi * t / 30).reshape(-1,1),
    np.cos(2 * np.pi * t / 30).reshape(-1,1)
])
lr = LinearRegression().fit(X[:train_len], y_train)

# 전체 구간에 대한 예측
pred_all = lr.predict(X)                 # 길이 200
base_pred_train = pred_all[:train_len]   # 학습 구간 예측 (150)
base_pred_test  = pred_all[train_len:]   # 테스트 구간 예측 (50)

# 3) 잔차(residual) 계산
residual_train = y_train - base_pred_train  # 길이 150

# 4) LSTM 학습 데이터셋 준비
def create_dataset(series, window):
    Xs, Ys = [], []
    for i in range(len(series) - window):
        Xs.append(series[i:i+window])
        Ys.append(series[i+window])
    return np.array(Xs), np.array(Ys)

window = 30
# 4a) 단순 LSTM on raw y_train
X_raw, Y_raw = create_dataset(y_train, window)
# 4b) 하이브리드: residual_train 학습
X_res,  Y_res  = create_dataset(residual_train, window)

def to_loader(X, Y, batch_size=16):
    X_t = torch.tensor(X, dtype=torch.float32).unsqueeze(-1)  # (N, window, 1)
    Y_t = torch.tensor(Y, dtype=torch.float32).unsqueeze(-1)  # (N, 1)
    return DataLoader(TensorDataset(X_t, Y_t), batch_size=batch_size, shuffle=True)

loader_raw = to_loader(X_raw, Y_raw)
loader_res = to_loader(X_res, Y_res)

# 5) LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, hidden_size=16):
        super().__init__()
        self.lstm = nn.LSTM(input_size=1, hidden_size=hidden_size, num_layers=1, batch_first=True)
        self.fc   = nn.Linear(hidden_size, 1)
    def forward(self, x):
        out, _ = self.lstm(x)
        return self.fc(out[:, -1, :])

# 6) 단순 LSTM 학습
model_raw = LSTMModel()
opt_raw   = torch.optim.Adam(model_raw.parameters(), lr=0.01)
loss_fn   = nn.MSELoss()

for epoch in range(100):
    for xb, yb in loader_raw:
        opt_raw.zero_grad()
        loss_fn(model_raw(xb), yb).backward()
        opt_raw.step()

# 7) Residual LSTM 학습
model_res = LSTMModel()
opt_res   = torch.optim.Adam(model_res.parameters(), lr=0.01)

for epoch in range(100):
    for xb, yb in loader_res:
        opt_res.zero_grad()
        loss_fn(model_res(xb), yb).backward()
        opt_res.step()

# 8) 테스트 구간 예측
# 8a) Simple LSTM 예측 (iterative forecasting)
simple_preds = []
seq = y_train.copy()
for _ in range(len(y_test)):
    xin = torch.tensor(seq[-window:], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
    with torch.no_grad():
        p = model_raw(xin).item()
    simple_preds.append(p)
    seq = np.append(seq, p)

# 8b) Hybrid: residual LSTM + base_pred_test 결합
res_preds = []
seq_res   = residual_train.copy()
for _ in range(len(y_test)):
    xin = torch.tensor(seq_res[-window:], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
    with torch.no_grad():
        p = model_res(xin).item()
    res_preds.append(p)
    seq_res = np.append(seq_res, p)

hybrid_preds = base_pred_test + np.array(res_preds)

# 9) 성능 비교 (MSE)
mse_simple = np.mean((y_test - simple_preds)**2)
mse_hybrid = np.mean((y_test - hybrid_preds)**2)

print(f"MSE Simple LSTM   : {mse_simple:.3f}")
print(f"MSE Hybrid L+LR   : {mse_hybrid:.3f}")

# 10) 결과 시각화
plt.figure(figsize=(10,4))
plt.plot(dates_test, y_test,      label='Actual',   marker='o')
plt.plot(dates_test, simple_preds, label='LSTM Raw', marker='x')
plt.plot(dates_test, hybrid_preds, label='Hybrid',   marker='s')
plt.legend()
plt.title("Test Period Forecast Comparison")
plt.xlabel("Date")
plt.ylabel("Value")
plt.tight_layout()
plt.show()