# 시계열 분석 - LSTM 모델
## Team 4

이 노트북은 LSTM(Long Short-Term Memory) 기반 딥러닝 모델을 사용하여 시계열 예측을 수행하는 과정을 보여줍니다.
LSTM은 순환 신경망(RNN)의 한 종류로, 시계열 데이터처럼 순서가 중요한 데이터에서 장기 의존성(long-term dependency)을 학습하는 데 강점이 있습니다.
과거의 여러 시점 데이터를 입력으로 받아 다음 시점의 값을 예측하는 방식으로 작동합니다.

## 분석 흐름
1. **문제 정의 및 목표 설정**: 예측 대상과 목표, 평가 지표를 정의합니다.
2. **데이터 탐색 및 전처리**: 데이터를 로드하고 시각화하여 패턴을 파악하며, 결측치/이상치를 처리합니다. LSTM 모델링을 위한 특성 엔지니어링, 스케일링, 시퀀스 생성을 수행합니다.
3. **모델 선택 및 구현 (LSTM)**: 다양한 구조의 LSTM 모델을 정의하고 컴파일합니다.
4. **모델 학습 및 평가**: 정의된 모델들을 학습시키고 검증/테스트 데이터로 성능을 평가합니다. 학습 곡선을 통해 과적합/과소적합 여부를 확인합니다.
5. **모델 비교**: 여러 LSTM 모델의 성능을 비교하여 최적 모델을 선정합니다.
6. **결과 해석 및 인사이트 도출**: 최적 모델의 예측 결과를 분석하고, 미래 예측 시뮬레이션을 수행하며 인사이트를 도출합니다.

## 1. 문제 정의 및 목표 설정

이 템플릿은 LSTM 모델을 사용한 시계열 예측을 위한 기본 구조를 제공합니다.

**목표:**
- 시계열 데이터의 장기적인 패턴과 복잡한 시간적 의존성을 LSTM 모델로 학습합니다.
- 과거 일정 기간(시퀀스)의 데이터를 기반으로 미래 시점의 값을 예측합니다.
- 단층, 다층 등 다양한 LSTM 모델 구조를 실험하고 하이퍼파라미터 튜닝을 통해 성능을 비교/개선합니다.

**평가 지표:**
- RMSE (Root Mean Squared Error): 예측 오차의 제곱 평균 제곱근. 오차의 크기에 민감합니다.
- MAE (Mean Absolute Error): 예측 오차 절대값의 평균. 이상치에 덜 민감합니다.
- R² Score (Coefficient of Determination): 모델이 데이터의 분산을 얼마나 설명하는지를 나타냅니다. 1에 가까울수록 좋습니다.

### 사용자 파라미터 설정
분석 대상 데이터, 모델 구조, 학습 관련 파라미터를 설정합니다.
사용자는 자신의 데이터와 분석 목적에 맞게 이 값들을 수정해야 합니다.

In [None]:
# 데이터 파라미터
DATA_PATH = "../data/raw/your_data.csv"   # 데이터 파일 경로
DATE_COL = "date"                         # 날짜 컬럼명
TARGET_COL = "value"                      # 예측할 타겟 컬럼명
TRAIN_SIZE = 0.7                          # 훈련 데이터 비율
VALIDATION_SIZE = 0.1                     # 검증 데이터 비율 (7:1:2 분할)

# 시퀀스 및 모델 파라미터
SEQUENCE_LENGTH = 30                      # 시계열 시퀀스 길이 (과거 며칠의 데이터로 예측할지)
EPOCHS = 100                              # 최대 학습 에폭
BATCH_SIZE = 32                           # 배치 크기
PATIENCE = 20                             # 조기 종료 파라미터 (검증 손실 개선이 없을 때 기다릴 에폭 수)
FORECAST_HORIZON = 30                     # 미래 예측 기간 (일 단위 등)
RANDOM_STATE = 42                         # 랜덤 시드 (재현성을 위해 추가)

# 출력 디렉토리
OUTPUT_DIR = '../plots'                   # 시각화 결과 저장 경로
DATA_OUTPUT_DIR = '../data/processed'     # 전처리된 데이터 저장 경로

### 필요한 라이브러리 임포트
데이터 처리, 시각화, 시계열 분석, 딥러닝 모델링(TensorFlow/Keras)에 필요한 라이브러리를 불러옵니다.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib as mpl
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import TimeSeriesSplit
import tensorflow as tf
# Keras를 독립 패키지로 사용
import keras
from keras.models import Sequential
from keras.layers import LSTM, Dense, Dropout, BatchNormalization, Bidirectional
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
import statsmodels.tsa.api as tsa
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import seasonal_decompose
from tqdm import tqdm
import os
import warnings
from datetime import datetime, timedelta

# 경고 무시
warnings.filterwarnings('ignore')

# 랜덤 시드 설정 (결과 재현성을 위해)
np.random.seed(RANDOM_STATE)
tf.random.set_seed(RANDOM_STATE)
keras.utils.set_random_seed(RANDOM_STATE)  # keras 시드 설정 추가

# 그래프 스타일 설정
plt.style.use('seaborn-whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 12

# 한글 폰트 설정 (한글이 포함된 시각화를 위해, 필요시 활성화)
try:
    plt.rc('font', family='Malgun Gothic')
    plt.rcParams['axes.unicode_minus'] = False
except:
    print("'Malgun Gothic' 폰트가 없습니다. 기본 폰트를 사용합니다.")

# 결과 저장 디렉토리 생성 코드 제거
# 프로젝트 지침에 따라 필요 폴더는 이미 존재한다고 가정

# # GPU 메모리 설정 (주석 처리됨)
# gpus = tf.config.experimental.list_physical_devices('GPU')
# if gpus:
#     try:
#         for gpu in gpus:
#             tf.config.experimental.set_memory_growth(gpu, True)
#         print("GPU 메모리 증가 설정 완료")
#     except RuntimeError as e:
#         print(f"GPU 설정 오류: {e}")
# else:
#     print("사용 가능한 GPU가 없습니다. CPU를 사용합니다.")

## 2. 데이터 탐색 및 전처리
모델 학습에 사용할 데이터를 준비하는 과정입니다.
데이터 로드, 기본 정보 확인, 결측치/이상치 처리, 시각화를 통한 패턴 분석, 특성 생성, 스케일링, 시퀀스 변환 등을 포함합니다.

### 2.1 데이터 로드 및 기본 탐색
지정된 경로에서 데이터를 로드하고, 날짜 컬럼을 인덱스로 설정하며, 타겟 변수를 확인합니다.
데이터의 기본적인 정보(형태, 타입, 통계량)를 출력하여 데이터에 대한 이해를 높입니다.

In [None]:
# 데이터 로드 (사용자가 제공한 경로)
df = pd.read_csv(DATA_PATH)
print(f"데이터를 성공적으로 로드했습니다. 크기: {df.shape}")

# 날짜 열 처리 (사용자 지정 컬럼 사용)
df[DATE_COL] = pd.to_datetime(df[DATE_COL])
df.set_index(DATE_COL, inplace=True)
print(f"'{DATE_COL}' 열을 인덱스로 설정했습니다.")

# 타겟 변수 확인 (사용자 지정 컬럼 사용)
if TARGET_COL not in df.columns:
    # TARGET_COL이 없는 경우 오류 발생 또는 경고 후 첫 번째 숫자형 컬럼 사용 등의 처리가 필요하나,
    # 템플릿 단순화를 위해 사용자가 올바른 컬럼명을 제공하는 것을 전제합니다.
    print(f"경고: 지정된 타겟 컬럼 '{TARGET_COL}'이 데이터에 없습니다. 첫 번째 숫자형 컬럼을 사용하거나 코드를 수정하세요.")
    # 또는 raise ValueError(f"Target column '{TARGET_COL}' not found in data.")

print(f"\n타겟 변수: {TARGET_COL}")

# 기본 정보 출력
print("\n데이터 기본 정보:")
print(df.info())

print("\n기술 통계량:")
print(df.describe())

# 데이터 처음과 끝 확인
print("\n데이터 처음 5행:")
print(df.head())
print("\n데이터 마지막 5행:")
print(df.tail())

### 2.2 결측치 및 이상치 처리
데이터 품질을 확보하기 위해 결측치를 처리하고 이상치를 탐지합니다.
- **결측치 처리**: 시계열 데이터의 연속성을 고려하여 보간법(여기서는 시간 기반 선형 보간)을 사용합니다.
- **이상치 확인**: 통계적 방법(여기서는 Z-점수)을 사용하여 극단적인 값을 탐지하고 시각화합니다. 시계열 데이터에서는 이상치가 중요한 이벤트일 수 있으므로, 분석 목적에 따라 처리 여부를 신중히 결정해야 합니다. (여기서는 확인만 하고 제거하지 않음)

In [None]:
# 결측치 확인
print("\n결측치 개수:")
print(df.isnull().sum())

# 시나리오 A: 결측치가 있는 경우 - 시계열 보간법 적용
# 시계열 데이터이므로 시간 기반 보간 방법 사용
df_clean = df.copy()
df_clean = df_clean.interpolate(method='time')

# 시작/끝에 결측치가 남아있을 경우
if df_clean.isnull().sum().sum() > 0:
    df_clean = df_clean.fillna(method='bfill').fillna(method='ffill')

print(f"결측치 처리 완료. 남은 결측치: {df_clean.isnull().sum().sum()}")

# 시나리오 B: 결측치가 없는 경우 (주석 처리됨)
# df_clean = df.copy()
# print("결측치가 없습니다.")

# 이상치 확인 (Z-점수 방법)
print("\n이상치 확인 중...")
z_scores = np.abs((df_clean[TARGET_COL] - df_clean[TARGET_COL].mean()) / df_clean[TARGET_COL].std())
outliers = (z_scores > 3)

# 시나리오 A: 이상치가 있는 경우 - 시각화 및 보존
# 시계열 예측에서는 이상치가 실제 중요한 패턴일 수 있으므로 확인만 하고 보존
if outliers.any():
    print(f"Z-점수 방법으로 감지된 이상치 개수: {outliers.sum()}")
    
    # 이상치 시각화
    plt.figure(figsize=(14, 7))
    plt.scatter(df_clean.index, df_clean[TARGET_COL], c='blue', alpha=0.5, label='정상 데이터')
    plt.scatter(df_clean.index[outliers], df_clean.loc[outliers, TARGET_COL], 
               c='red', alpha=0.8, label='이상치')
    plt.title('시계열 데이터 이상치')
    plt.xlabel('날짜')
    plt.ylabel(TARGET_COL)
    plt.legend()
    plt.tight_layout()
    plt.savefig(f'{OUTPUT_DIR}/lstm_이상치.png')
    plt.show()
    
    print("이상치는 실제 데이터의 중요한 패턴일 수 있으므로 제거하지 않고 유지합니다.")
else:
    print("주요 이상치가 감지되지 않았습니다.")

# 시나리오 B: 이상치 처리 - 평균값으로 대체 (주석 처리됨)
# 특정 상황에서는 이상치가 모델 성능에 부정적 영향을 줄 수 있어 처리가 필요할 수 있음
# if outliers.any():
#     print(f"Z-점수 방법으로 감지된 이상치 개수: {outliers.sum()}")
#     # 이상치를 평균값으로 대체
#     df_clean.loc[outliers, TARGET_COL] = df_clean[TARGET_COL].mean()
#     print("이상치를 평균값으로 대체했습니다.")
# else:
#     print("주요 이상치가 감지되지 않았습니다.")

### 2.3 시계열 시각화 및 패턴 분석
전처리된 데이터를 시각화하여 패턴을 분석하고, 정상성 여부를 확인합니다.
- **시계열 플롯**: 시간에 따른 데이터 변화 추세를 확인합니다.
- **상관관계 히트맵 (다변량 시)**: 여러 특성 간의 선형 상관관계를 확인합니다.
- **ACF/PACF 플롯**: 데이터의 자기상관 구조를 파악하여 모델링(특히 ARIMA 계열)에 참고하거나 데이터 특성을 이해합니다.
- **ADF 검정**: 시계열의 정상성을 통계적으로 검정합니다. LSTM은 비정상성 데이터도 다룰 수 있지만, 정상성 여부는 데이터 특성 파악에 중요합니다.
- **계절성 분해**: 데이터를 추세, 계절성, 잔차로 분해하여 구조적 특징을 파악합니다.

In [None]:
# 시계열 데이터 시각화
plt.figure(figsize=(14, 7))
plt.plot(df_clean.index, df_clean[TARGET_COL])
plt.title('시계열 데이터')
plt.xlabel('날짜')
plt.ylabel(TARGET_COL)
plt.grid(True)
plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_시계열_데이터.png')
plt.show()

# 다변량 시계열인 경우 상관관계 확인
if df_clean.shape[1] > 1:
    plt.figure(figsize=(12, 10))
    corr = df_clean.corr()
    mask = np.triu(np.ones_like(corr, dtype=bool))  # 상삼각행렬만 표시
    sns.heatmap(corr, mask=mask, annot=True, cmap='coolwarm', linewidths=0.5)
    plt.title('특성 간 상관관계')
    plt.tight_layout()
    plt.savefig(f'{OUTPUT_DIR}/lstm_상관관계_히트맵.png')
    plt.show()

# ACF, PACF 그래프로 자기상관 분석
plt.figure(figsize=(14, 7))
plt.subplot(211)
plot_acf(df_clean[TARGET_COL].dropna(), lags=40, ax=plt.gca())
plt.title('자기상관함수(ACF)')

plt.subplot(212)
plot_pacf(df_clean[TARGET_COL].dropna(), lags=40, ax=plt.gca())
plt.title('편자기상관함수(PACF)')

plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_자기상관.png')
plt.show()

# ADF 테스트로 정상성 확인
adf_result = adfuller(df_clean[TARGET_COL].dropna())
print('ADF 테스트 결과:')
print(f'ADF 통계값: {adf_result[0]:.4f}')
print(f'p-value: {adf_result[1]:.4f}')
print('임계값:')
for key, value in adf_result[4].items():
    print(f'   {key}: {value:.4f}')

is_stationary = adf_result[1] < 0.05
print(f"시계열 정상성 여부: {'정상적일 가능성 높음 (p < 0.05)' if is_stationary else '비정상적일 가능성 높음 (p >= 0.05)'}")

# 계절성 분해
# 참고: seasonal_decompose는 데이터 길이나 주기 설정에 따라 오류가 발생할 수 있습니다.
#      데이터에 맞는 적절한 period 값을 설정해야 합니다. (예: 주간 데이터면 7, 월간 데이터면 12)
#      아래는 예시로 period=7을 사용합니다. 실제 데이터에 맞게 수정하세요.
decompose_period = 7 # 예시 주기 (필요시 사용자 파라미터로 이동 또는 데이터 기반 설정)
decomposition = seasonal_decompose(df_clean[TARGET_COL].dropna(), period=decompose_period)

plt.figure(figsize=(14, 10))
plt.subplot(411)
plt.plot(decomposition.observed)
plt.title('관측값')

plt.subplot(412)
plt.plot(decomposition.trend)
plt.title('추세')

plt.subplot(413)
plt.plot(decomposition.seasonal)
plt.title('계절성')

plt.subplot(414)
plt.plot(decomposition.resid)
plt.title('잔차')

plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_계절성분해.png')
plt.show()
print(f"계절성 분해 완료 (주기: {decompose_period})")

### 2.4 특성 엔지니어링
(선택 사항) LSTM 모델은 원본 시계열 자체를 입력으로 사용할 수도 있지만, 추가적인 특성을 생성하여 모델 성능을 향상시킬 수도 있습니다.
여기서는 날짜 관련 특성, 시차 특성, 이동 통계량 특성을 예시로 생성합니다. 분석 대상 데이터와 문제에 따라 유용한 특성은 달라질 수 있습니다.

In [None]:
# 특성 엔지니어링: 시간 관련 특성 추가
# 시나리오 A: 시간 관련 특성과 시차/이동 특성 추가 (DatetimeIndex가 있는 경우)
# 목적: 다양한 특성을 모델에 제공하여 시간적 패턴 학습 강화
if isinstance(df_clean.index, pd.DatetimeIndex):
    print("시간 관련 특성 추가 중...")
    # 원본 데이터 복사
    df_features = df_clean.copy()
    
    # 시간 관련 특성
    df_features['dayofweek'] = df_features.index.dayofweek
    df_features['month'] = df_features.index.month
    df_features['year'] = df_features.index.year
    df_features['dayofyear'] = df_features.index.dayofyear
    df_features['quarter'] = df_features.index.quarter
    
    # 시간 특성 사인/코사인 변환 (주기성 포착)
    df_features['dayofweek_sin'] = np.sin(2 * np.pi * df_features.index.dayofweek / 7)
    df_features['dayofweek_cos'] = np.cos(2 * np.pi * df_features.index.dayofweek / 7)
    df_features['month_sin'] = np.sin(2 * np.pi * df_features.index.month / 12)
    df_features['month_cos'] = np.cos(2 * np.pi * df_features.index.month / 12)
    
    # 시차 특성 (이전 시점의 값)
    df_features['lag_1'] = df_features[TARGET_COL].shift(1)
    df_features['lag_7'] = df_features[TARGET_COL].shift(7)
    df_features['lag_14'] = df_features[TARGET_COL].shift(14)
    
    # 이동 평균 및 표준편차
    df_features['rolling_mean_7'] = df_features[TARGET_COL].rolling(window=7).mean()
    df_features['rolling_std_7'] = df_features[TARGET_COL].rolling(window=7).std()
    df_features['rolling_mean_14'] = df_features[TARGET_COL].rolling(window=14).mean()
    df_features['rolling_std_14'] = df_features[TARGET_COL].rolling(window=14).std()
    
    print("생성된 특성:")
    print(df_features.columns.tolist())
    
    # 결측치 처리
    df_features = df_features.dropna()
    print(f"특성 생성 완료. 최종 데이터 크기: {df_features.shape}")
    
    # 단순 시계열 모델링을 위해 단변량 데이터도 유지
    df_univariate = df_clean[[TARGET_COL]].copy()
else:
    # 시나리오 B: DatetimeIndex가 없는 경우
    # 시간 기반 특성 없이 단변량 시계열로 처리
    print("DatetimeIndex가 없어 단변량 시계열로 처리합니다.")
    df_univariate = df_clean[[TARGET_COL]].copy()
    df_features = df_clean.copy()

# 시나리오 C: 최소 특성만 사용 (주석 처리됨)
# 목적: 모델 복잡성 감소, 더 빠른 훈련 및 예측
# df_features = df_clean[[TARGET_COL]].copy()
# df_univariate = df_clean[[TARGET_COL]].copy()
# print("최소 특성 사용 (단변량 시계열)으로 진행합니다.")

### 2.5 데이터 스케일링 및 시퀀스 생성
딥러닝 모델의 안정적인 학습을 위해 데이터를 특정 범위(여기서는 0~1)로 스케일링하고, LSTM 모델 입력 형식에 맞게 시계열 데이터를 시퀀스 형태로 변환합니다.
- **스케일링**: `MinMaxScaler`를 사용하여 모든 특성(및 타겟)을 0과 1 사이로 조정합니다. 다변량 데이터의 경우 각 특성별로 스케일링합니다.
- **시퀀스 생성**: `create_sequences` 함수는 원본 시계열을 `SEQUENCE_LENGTH` 길이의 입력 시퀀스(X)와 해당 시퀀스 다음 시점의 타겟 값(y)으로 변환합니다.
- **데이터 분할**: 스케일링 및 시퀀스 생성이 완료된 데이터를 훈련, 검증, 테스트 세트로 분할합니다.

In [None]:
# 데이터 스케일링
print("\n데이터 스케일링...")
scaler = MinMaxScaler()
if len(df_features.columns) > 1:
    # 다변량 특성이 있는 경우
    target_scaler = MinMaxScaler()
    scaled_target = target_scaler.fit_transform(df_features[[TARGET_COL]])
    features = df_features.drop(columns=[TARGET_COL])
    
    # 특성 스케일링
    feature_columns = features.columns
    scaled_features = scaler.fit_transform(features)
    scaled_features_df = pd.DataFrame(scaled_features, index=df_features.index, columns=feature_columns)
    
    # 타겟과 특성 결합
    scaled_target_df = pd.DataFrame(scaled_target, index=df_features.index, columns=[TARGET_COL])
    scaled_df = pd.concat([scaled_target_df, scaled_features_df], axis=1)
else:
    # 단변량 시계열인 경우
    scaled_data = scaler.fit_transform(df_univariate)
    scaled_df = pd.DataFrame(scaled_data, index=df_univariate.index, columns=[TARGET_COL])

print(f"스케일링 완료. 데이터 크기: {scaled_df.shape}")

# 시퀀스 생성 함수
def create_sequences(data, seq_length, target_col_idx=0):
    """
    시계열 데이터에서 입력 시퀀스와 타겟 값 생성
    
    Args:
        data: 스케일링된 데이터 (numpy 배열)
        seq_length: 시퀀스 길이
        target_col_idx: 타겟 열의 인덱스
        
    Returns:
        X: 입력 시퀀스의 배열
        y: 타겟 값의 배열
    """
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        x = data[i:i + seq_length]
        y = data[i + seq_length, target_col_idx]
        xs.append(x)
        ys.append(y)
    return np.array(xs), np.array(ys)

# 데이터 분할
print("\n데이터 분할 및 시퀀스 생성...")
train_size = int(len(scaled_df) * TRAIN_SIZE)
val_size = int(len(scaled_df) * VALIDATION_SIZE)

# 데이터 인덱스 범위 계산
train_end = train_size
val_end = train_size + val_size
test_end = len(scaled_df)

# 훈련, 검증, 테스트 데이터 분할 - 전체 특성 데이터 (7:1:2 비율)
train_data = scaled_df.values[:train_end]
val_data = scaled_df.values[train_end:val_end]
test_data = scaled_df.values[val_end:test_end]

# 타겟 열 인덱스 (다변량의 경우 0번째 열이 타겟)
target_idx = scaled_df.columns.get_loc(TARGET_COL)

# 시퀀스 생성
X_train, y_train = create_sequences(train_data, SEQUENCE_LENGTH, target_idx)
X_val, y_val = create_sequences(val_data, SEQUENCE_LENGTH, target_idx)
X_test, y_test = create_sequences(test_data, SEQUENCE_LENGTH, target_idx)

print(f"훈련 데이터 형태: {X_train.shape}, {y_train.shape} (70%)")
print(f"검증 데이터 형태: {X_val.shape}, {y_val.shape} (10%)")
print(f"테스트 데이터 형태: {X_test.shape}, {y_test.shape} (20%)")

# 데이터 분할 시각화 - 원본 값 기준
original_dates = df_clean.index
train_dates = original_dates[:train_end]
val_dates = original_dates[train_end:val_end]
test_dates = original_dates[val_end:test_end]

plt.figure(figsize=(14, 7))
plt.plot(train_dates, df_clean.iloc[:train_end][TARGET_COL], label='훈련 데이터 (70%)')
plt.plot(val_dates, df_clean.iloc[train_end:val_end][TARGET_COL], label='검증 데이터 (10%)')
plt.plot(test_dates, df_clean.iloc[val_end:test_end][TARGET_COL], label='테스트 데이터 (20%)')
plt.title('데이터 분할 (7:1:2 비율)')
plt.xlabel('날짜')
plt.ylabel(TARGET_COL)
plt.legend()
plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_데이터_분할.png')
plt.show()

## 3. 모델 선택 및 구현
LSTM 모델 구조를 정의하고 학습 및 평가를 준비합니다.

### 3.1 LSTM 모델 정의
Keras Sequential API를 사용하여 여러 구조의 LSTM 모델을 정의하는 함수(`build_lstm_model`)를 만듭니다.
- **LSTM 레이어**: 시계열 패턴 학습의 핵심 레이어. `units`는 LSTM 셀의 개수, `return_sequences=True`는 다음 LSTM 레이어로 전체 시퀀스를 전달할 때 사용합니다.
- **Dropout**: 과적합을 방지하기 위해 일부 뉴런 연결을 무작위로 끊습니다.
- **BatchNormalization**: 각 배치마다 입력을 정규화하여 학습을 안정화시키고 속도를 높입니다.
- **Dense**: 최종 예측값을 출력하는 완전 연결 레이어.
여러 모델 구성(Basic, Medium, Deep)을 리스트로 정의하여 비교 실험을 준비합니다.

In [None]:
# LSTM 모델 정의 함수
def build_lstm_model(input_shape, units=64, dropout_rate=0.2, num_layers=2):
    """
    LSTM 모델 생성 함수
    
    Args:
        input_shape: 입력 데이터 형태 (시퀀스 길이, 특성 수)
        units: LSTM 유닛(뉴런) 수
        dropout_rate: 드롭아웃 비율
        num_layers: LSTM 층 수
        
    Returns:
        구성된 LSTM 모델
    """
    model = Sequential()
    
    # 첫 번째 LSTM 층
    model.add(LSTM(units=units, 
                  return_sequences=(num_layers > 1), 
                  input_shape=input_shape))
    model.add(Dropout(dropout_rate))
    model.add(BatchNormalization())
    
    # 추가 LSTM 층
    for i in range(num_layers - 1):
        return_seq = i < num_layers - 2  # 마지막 LSTM 층을 제외하고 return_sequences=True
        model.add(LSTM(units=units, return_sequences=return_seq))
        model.add(Dropout(dropout_rate))
        model.add(BatchNormalization())
    
    # 출력층
    model.add(Dense(1))
    
    # 모델 컴파일
    model.compile(optimizer='adam', loss='mse')
    
    return model

# 다양한 LSTM 모델 구성
lstm_models = [
    {'name': 'LSTM_Basic', 'units': 32, 'dropout_rate': 0.2, 'num_layers': 1},
    {'name': 'LSTM_Medium', 'units': 64, 'dropout_rate': 0.3, 'num_layers': 2},
    {'name': 'LSTM_Deep', 'units': 128, 'dropout_rate': 0.4, 'num_layers': 3}
]

### 3.2 모델 학습 및 평가 준비
모델 학습 과정을 제어하고 성능을 평가하기 위한 준비를 합니다.
- **콜백 함수 (Callbacks)**: 모델 학습 중 특정 조건에 따라 추가 작업을 수행합니다.
  - `EarlyStopping`: 검증 손실(val_loss)이 일정 에폭(patience) 동안 개선되지 않으면 학습을 조기 종료하여 과적합을 방지하고 최적 가중치를 복원합니다.
  - `ReduceLROnPlateau`: 검증 손실 개선이 정체되면 학습률(learning rate)을 동적으로 감소시켜 학습 안정성을 높입니다.
- **평가 함수 (evaluate_model)**: 스케일링된 예측값을 원래 스케일로 되돌린 후, RMSE, MAE, R² 등의 성능 지표를 계산하고 예측 결과를 시각화하는 함수를 정의합니다.

In [None]:
# 콜백 함수 설정
callbacks = [
    EarlyStopping(
        monitor='val_loss', 
        patience=PATIENCE, 
        restore_best_weights=True, 
        verbose=1
    ),
    ReduceLROnPlateau(
        monitor='val_loss', 
        factor=0.5, 
        patience=PATIENCE//2, 
        min_lr=0.0001, 
        verbose=1
    )
]

# 모델 성능 저장 리스트
model_performances = []

# 모델 평가 함수
def evaluate_model(y_true, y_pred, model_name, scaler, dates=None):
    """
    모델 성능 평가 함수
    
    Args:
        y_true: 실제 값
        y_pred: 예측 값
        model_name: 모델 이름
        scaler: 데이터 스케일러
        dates: 예측 날짜 (선택 사항)
        
    Returns:
        rmse, mae, r2: 평가 지표
        y_pred_orig: 원래 스케일로 변환된 예측 값
    """
    # 예측값을 원래 스케일로 변환
    y_true_reshaped = y_true.reshape(-1, 1)
    y_pred_reshaped = y_pred.reshape(-1, 1)
    
    # 원래 데이터가 단변량이었을 경우
    if len(scaled_df.columns) == 1:
        y_true_orig = scaler.inverse_transform(y_true_reshaped).flatten()
        y_pred_orig = scaler.inverse_transform(y_pred_reshaped).flatten()
    else:
        # 다변량인 경우, 타겟 변수의 스케일러만 사용
        # 임시 데이터프레임에 같은 크기로 복사 후 역변환
        temp_true = np.zeros((len(y_true), scaled_df.shape[1]))
        temp_true[:, target_idx] = y_true
        
        temp_pred = np.zeros((len(y_pred), scaled_df.shape[1]))
        temp_pred[:, target_idx] = y_pred
        
        y_true_orig = scaler.inverse_transform(temp_true)[:, target_idx]
        y_pred_orig = scaler.inverse_transform(temp_pred)[:, target_idx]
    
    rmse = np.sqrt(mean_squared_error(y_true_orig, y_pred_orig))
    mae = mean_absolute_error(y_true_orig, y_pred_orig)
    r2 = r2_score(y_true_orig, y_pred_orig)
    
    print(f"\n{model_name} 모델 성능 지표:")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"R²: {r2:.4f}")
    
    # 시각화 (날짜가 제공된 경우)
    if dates is not None:
        plt.figure(figsize=(14, 7))
        plt.plot(dates, y_true_orig, label='실제값')
        plt.plot(dates, y_pred_orig, label='예측값', alpha=0.7)
        plt.title(f'{model_name} - 예측 결과')
        plt.xlabel('날짜')
        plt.ylabel(TARGET_COL)
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.savefig(f'{OUTPUT_DIR}/lstm_{model_name}_예측.png')
        plt.show()
    
    return rmse, mae, r2, y_pred_orig

### 3.3 모델 학습 및 평가
정의된 각 LSTM 모델 구성에 대해 다음 과정을 반복합니다:
1. 모델 생성 (`build_lstm_model` 호출)
2. 모델 아키텍처 확인 (`model.summary()`)
3. 모델 학습 (`model.fit()`): 훈련 데이터(X_train, y_train)로 학습하고, 검증 데이터(X_val, y_val)로 에폭별 성능을 모니터링합니다. 설정된 콜백 함수들이 적용됩니다.
4. 학습 곡선 시각화: 에폭별 훈련 손실과 검증 손실을 그래프로 그려 과적합/과소적합 여부를 확인합니다.
5. 검증/테스트 데이터 예측 및 평가 (`evaluate_model` 호출): 학습된 모델로 검증 및 테스트 데이터에 대한 예측을 수행하고 성능 지표를 계산 및 시각화합니다.
6. 성능 결과 저장: 각 모델의 성능 지표를 리스트(`model_performances`)에 저장합니다.
7. 모델 저장 (선택 사항): 학습된 모델 가중치를 파일(`.h5`)로 저장합니다.

In [None]:
# 각 모델 학습 및 평가
for model_config in lstm_models:
    model_name = model_config['name']
    print(f"\n{model_name} 모델 학습 시작...")
    
    # 모델 생성
    model = build_lstm_model(
        input_shape=(X_train.shape[1], X_train.shape[2]),
        units=model_config['units'],
        dropout_rate=model_config['dropout_rate'],
        num_layers=model_config['num_layers']
    )
    
    # 모델 아키텍처 출력
    model.summary()
    
    # 모델 학습
    history = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=EPOCHS,
        batch_size=BATCH_SIZE,
        callbacks=callbacks,
        verbose=1
    )
    
    # 학습 곡선 시각화
    plt.figure(figsize=(14, 7))
    plt.plot(history.history['loss'], label='훈련 손실')
    plt.plot(history.history['val_loss'], label='검증 손실')
    plt.title(f'{model_name} - 학습 곡선')
    plt.xlabel('에폭')
    plt.ylabel('손실 (MSE)')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.savefig(f'{OUTPUT_DIR}/lstm_{model_name}_학습곡선.png')
    plt.show()
    
    # 검증 데이터 예측
    val_pred = model.predict(X_val)
    # 검증 데이터에 해당하는 날짜 (시퀀스 길이만큼 이동)
    val_dates = val_dates[SEQUENCE_LENGTH:]
    
    # 검증 데이터 평가
    val_rmse, val_mae, val_r2, val_pred_orig = evaluate_model(
        y_val, val_pred, f"{model_name} (검증)", scaler, val_dates
    )
    
    # 테스트 데이터 예측
    test_pred = model.predict(X_test)
    # 테스트 데이터에 해당하는 날짜 (시퀀스 길이만큼 이동)
    test_dates = test_dates[SEQUENCE_LENGTH:]
    
    # 테스트 데이터 평가
    test_rmse, test_mae, test_r2, test_pred_orig = evaluate_model(
        y_test, test_pred, f"{model_name} (테스트)", scaler, test_dates
    )
    
    # 성능 저장
    model_performances.append({
        'model': model_name,
        'val_rmse': val_rmse,
        'val_mae': val_mae,
        'val_r2': val_r2,
        'test_rmse': test_rmse,
        'test_mae': test_mae,
        'test_r2': test_r2
    })
    
    # 모델 저장 (선택 사항)
    model.save(f'{DATA_OUTPUT_DIR}/lstm_{model_name}.h5')
    print(f"{model_name} 모델이 '{DATA_OUTPUT_DIR}/lstm_{model_name}.h5'에 저장되었습니다.")

## 4. 모델 평가 및 비교
여러 LSTM 모델들의 성능 평가 결과를 종합하고 비교합니다.
- **성능 비교 테이블**: 각 모델의 검증 및 테스트 성능 지표를 Pandas DataFrame으로 정리하여 출력합니다.
- **성능 비교 시각화**: 주요 성능 지표(RMSE, R²)를 막대 그래프로 시각화하여 모델 간 성능 차이를 직관적으로 비교합니다.
- **최고 성능 모델 선택**: 테스트 RMSE가 가장 낮은 모델을 최적 모델로 선정합니다.

In [None]:
# 모델 성능 비교 테이블
models_comparison = pd.DataFrame([
    {
        '모델': m['model'],
        'RMSE (검증)': m['val_rmse'],
        'MAE (검증)': m['val_mae'],
        'R² (검증)': m['val_r2'],
        'RMSE (테스트)': m['test_rmse'],
        'MAE (테스트)': m['test_mae'],
        'R² (테스트)': m['test_r2']
    }
    for m in model_performances
])

print("\n모델 성능 비교:")
print(models_comparison)

# 모델 비교 결과 저장
models_comparison.to_csv(f'{DATA_OUTPUT_DIR}/lstm_모델비교_결과.csv', index=False)
print(f"모델 비교 결과가 '{DATA_OUTPUT_DIR}/lstm_모델비교_결과.csv'에 저장되었습니다.")

# 모델 비교 시각화
plt.figure(figsize=(14, 7))
plt.subplot(121)
sns.barplot(x='모델', y='RMSE (테스트)', data=models_comparison)
plt.title('LSTM 모델 비교 - RMSE (낮을수록 좋음)')
plt.ylabel('RMSE')
plt.xticks(rotation=45)

plt.subplot(122)
sns.barplot(x='모델', y='R² (테스트)', data=models_comparison)
plt.title('LSTM 모델 비교 - R² (높을수록 좋음)')
plt.ylabel('R²')
plt.xticks(rotation=45)

plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_모델비교.png')
plt.show()

# 최고 성능 모델 선택
best_model_idx = models_comparison['RMSE (테스트)'].idxmin()
best_model = models_comparison.iloc[best_model_idx]['모델']
best_rmse = models_comparison.iloc[best_model_idx]['RMSE (테스트)']
best_r2 = models_comparison.iloc[best_model_idx]['R² (테스트)']

print(f"\n최고 성능 모델: {best_model}")
print(f"테스트 RMSE: {best_rmse:.4f}")
print(f"테스트 R²: {best_r2:.4f}")

## 5. 결과 해석 및 인사이트 도출
최종 선정된 최적 모델의 성능을 바탕으로 결과를 해석하고, 미래 예측을 수행하며 인사이트를 도출합니다.

### 5.1 미래 예측 시뮬레이션
최적 모델을 사용하여 학습 데이터 이후의 미래 기간(`FORECAST_HORIZON`)에 대한 예측을 수행합니다.
여기서는 **반복적 예측(Iterative Forecasting)** 방식을 사용합니다:
1. 가장 마지막 시퀀스 데이터를 초기 입력으로 사용합니다.
2. 다음 한 스텝의 값을 예측합니다.
3. 예측된 값을 다음 입력 시퀀스에 포함시키고, 가장 오래된 값을 제거하여 시퀀스를 업데이트합니다.
4. 위 과정을 예측하려는 기간만큼 반복합니다.
최종 예측값은 원래 스케일로 변환하여 시각화합니다.

In [None]:
# 미래 예측을 위한 예시
print("\n미래 예측 시뮬레이션")
print("실제 프로젝트에서는 이 부분을 데이터에 맞게 수정해야 합니다.")

# 최상의 모델 로드
best_model_config = next(m for m in lstm_models if m['name'] == best_model)
best_model_loaded = keras.models.load_model(f'{DATA_OUTPUT_DIR}/lstm_{best_model}.h5')  # keras로 수정

# 예측 기간 설정 (파라미터 사용)
forecast_horizon = FORECAST_HORIZON

# 예측을 위한 마지막 시퀀스 가져오기
# (실제 프로젝트에서는 아래 로직을 데이터에 맞게 수정해야 함)
last_sequence = scaled_df.values[-SEQUENCE_LENGTH:].reshape(1, SEQUENCE_LENGTH, scaled_df.shape[1])

# 미래 날짜 생성
last_date = df_clean.index[-1]
future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=forecast_horizon)

# 점진적 예측 (이전 예측을 다음 입력으로 사용)
predictions = []
current_sequence = last_sequence.copy()

for _ in range(forecast_horizon):
    # 다음 값 예측
    next_pred = best_model_loaded.predict(current_sequence)[0][0]
    predictions.append(next_pred)
    
    # 시퀀스 업데이트 (가장 오래된 값 제거하고 새 예측값 추가)
    new_values = current_sequence[0, 1:, :].copy()
    
    # 다변량인 경우, 타겟 열만 업데이트
    if scaled_df.shape[1] > 1:
        # 값 복사
        new_row = current_sequence[0, -1, :].copy()
        # 타겟 열만 새 예측으로 업데이트
        new_row[target_idx] = next_pred
        # 새 행 추가
        new_values = np.vstack([new_values, new_row.reshape(1, -1)])
    else:
        # 단변량인 경우, 단순히 새 예측값 추가
        new_values = np.vstack([new_values, np.array([[next_pred]])])
    
    # 업데이트된 시퀀스로 교체
    current_sequence[0] = new_values

# 예측 역변환
if scaled_df.shape[1] == 1:
    # 단변량 케이스
    predictions_array = np.array(predictions).reshape(-1, 1)
    predictions_original = scaler.inverse_transform(predictions_array).flatten()
else:
    # 다변량 케이스 (타겟 변수만 역변환)
    temp_array = np.zeros((len(predictions), scaled_df.shape[1]))
    temp_array[:, target_idx] = predictions
    predictions_original = scaler.inverse_transform(temp_array)[:, target_idx]

# 예측 결과 시각화
plt.figure(figsize=(14, 7))
# 과거 데이터 (마지막 90일)
history_days = 90
past_dates = df_clean.index[-history_days:]
past_values = df_clean[TARGET_COL][-history_days:]

plt.plot(past_dates, past_values, label='과거 데이터', color='blue')
plt.plot(future_dates, predictions_original, label='미래 예측', color='red', linestyle='--')
plt.axvline(x=last_date, color='green', linestyle='-', label='현재')
plt.title(f'{best_model} - {FORECAST_HORIZON}일 미래 예측')
plt.xlabel('날짜')
plt.ylabel(TARGET_COL)
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_미래예측.png')
plt.show()

# 예측 결과를 DataFrame으로 변환
forecast_df = pd.DataFrame({
    'date': future_dates,
    f'{TARGET_COL}_forecast': predictions_original
})

print("\n미래 예측 결과 (처음 10일):")
print(forecast_df.head(10))

# 예측 결과 저장
forecast_df.to_csv(f'{DATA_OUTPUT_DIR}/lstm_forecast_results.csv', index=False)
print(f"예측 결과가 '{DATA_OUTPUT_DIR}/lstm_forecast_results.csv'에 저장되었습니다.")

### 5.2 결론 및 인사이트
전체 분석 과정과 모델 성능 결과를 요약합니다.
- 최적 모델과 그 성능(RMSE, R²)을 명시합니다.
- 최적 모델의 구성(units, dropout, layers)을 제시합니다.
- 모델 성능에 영향을 미친 요인(모델 복잡성, 시퀀스 길이, 데이터 정상성 등)에 대한 간략한 해석을 제공합니다.
- 향후 모델 성능 개선을 위한 방향(하이퍼파라미터 튜닝, 특성 추가, 다른 모델 구조 시도 등)을 제안합니다.

In [None]:
print("\n" + "="*50)
print("결론 및 인사이트")
print("="*50)

# 모델 성능 요약
print(f"1. 최적의 모델: {best_model}")
print(f"2. 테스트 RMSE: {best_rmse:.4f}")
print(f"3. 테스트 R²: {best_r2:.4f}")

# 모델 구성 요약
print("\n최적 모델 구성:")
for key, value in best_model_config.items():
    if key != 'name':
        print(f"- {key}: {value}")

# 주요 인사이트
print("\n주요 인사이트:")
print("1. LSTM 모델의 복잡성과 성능 간의 균형")
if best_model == 'LSTM_Basic':
    print("   - 단순한 모델이 가장 우수한 성능을 보였으며, 과적합 위험이 낮았습니다.")
elif best_model == 'LSTM_Medium':
    print("   - 중간 복잡도의 모델이 가장 우수한 성능을 보였으며, 단순 모델보다 패턴 포착에 효과적이었습니다.")
else:
    print("   - 복잡한 모델이 데이터 패턴을 잘 포착하여 더 좋은 성능을 보였습니다.")

print("2. 시퀀스 길이의 영향")
print(f"   - {SEQUENCE_LENGTH}일의 시퀀스 길이로 예측에 성공했으며, 이는 데이터의 기본 패턴 주기와 관련이 있습니다.")

print("3. 정상성과 모델 성능")
if is_stationary:
    print("   - 데이터가 정상성을 보여 모델 학습이 더 안정적이었습니다.")
else:
    print("   - 데이터의 비정상성에도 불구하고 LSTM이 패턴을 학습할 수 있었습니다.")

# 향후 개선 방향
print("\n향후 개선 방향:")
print("1. 시퀀스 길이 최적화를 통한 성능 향상 가능")
print("2. 더 다양한 특성 엔지니어링 시도")
print("3. 양방향 LSTM 또는 Attention 메커니즘 적용 고려")
print("4. 앙상블 방법을 통한 여러 모델 결합")
print("5. 예측 불확실성 추정 방법 도입")

print("\nLSTM 기반 시계열 분석 완료!")

## 6. 시계열 교차 검증 (Time Series Cross-Validation)
본 분석에서는 기본적인 훈련/검증/테스트 분할 방식을 사용했습니다. 더 견고한 모델 평가를 위해 시계열 교차 검증을 추가로 수행합니다.
시계열 데이터에서는 일반적인 k-fold 교차 검증이 아닌, 시간적 의존성을 고려한 `TimeSeriesSplit`을 사용합니다.
LSTM 모델은 계산 비용이 높아 간소화된 모델로 교차 검증을 수행합니다.

In [None]:
# 시계열 교차 검증 설정
print("\n시계열 교차 검증 수행 중...")
n_splits = 3  # LSTM은 계산 비용이 높으므로 fold 수를 줄임
tscv = TimeSeriesSplit(n_splits=n_splits)

# 교차 검증을 위한 간소화된 모델 생성 함수
def build_simple_lstm_for_cv(input_shape):
    """교차 검증용 간소화된 LSTM 모델"""
    model = Sequential([
        LSTM(32, input_shape=input_shape),
        Dropout(0.2),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss='mse')
    return model

# 모든 데이터 (스케일링된)를 하나로 합치기
all_data = np.vstack([train_data, val_data, test_data])

# 결과 저장
cv_rmse_scores = []
cv_mae_scores = []

# 교차 검증 시각화 준비
plt.figure(figsize=(14, 10))

# 각 폴드에 대해 학습 및 평가
for i, (train_idx, test_idx) in enumerate(tscv.split(all_data)):
    print(f"\nFold {i+1}/{n_splits} 처리 중...")
    
    # 훈련/테스트 데이터 분할
    cv_train_data = all_data[train_idx]
    cv_test_data = all_data[test_idx]
    
    # 시퀀스 생성
    cv_X_train, cv_y_train = create_sequences(cv_train_data, SEQUENCE_LENGTH, target_idx)
    cv_X_test, cv_y_test = create_sequences(cv_test_data, SEQUENCE_LENGTH, target_idx)
    
    # 간소화된 모델 구축 및 학습
    cv_model = build_simple_lstm_for_cv(cv_X_train.shape[1:])
    cv_model.fit(
        cv_X_train, cv_y_train,
        epochs=20,  # 빠른 학습을 위해 에포크 감소
        batch_size=BATCH_SIZE,
        verbose=0
    )
    
    # 예측
    cv_y_pred = cv_model.predict(cv_X_test)
    
    # 원래 스케일로 변환 (스케일러를 이미 알고 있다고 가정)
    # 실제 구현에서는 각 분할마다 스케일러를 따로 생성해야 할 수 있음
    cv_y_test_orig = scaler.inverse_transform(np.hstack([cv_X_test[:, -1, :], cv_y_test.reshape(-1, 1)]))[:, -1]
    cv_y_pred_orig = scaler.inverse_transform(np.hstack([cv_X_test[:, -1, :], cv_y_pred.reshape(-1, 1)]))[:, -1]
    
    # 성능 평가
    cv_rmse = np.sqrt(mean_squared_error(cv_y_test_orig, cv_y_pred_orig))
    cv_mae = mean_absolute_error(cv_y_test_orig, cv_y_pred_orig)
    cv_rmse_scores.append(cv_rmse)
    cv_mae_scores.append(cv_mae)
    
    # 각 폴드의 결과 출력
    print(f"Fold {i+1} - RMSE: {cv_rmse:.4f}, MAE: {cv_mae:.4f}")
    
    # 시각화 (최대 30개 포인트만)
    plt.subplot(n_splits, 1, i+1)
    vis_limit = min(30, len(cv_y_test_orig))
    plt.plot(cv_y_test_orig[:vis_limit], label='실제값')
    plt.plot(cv_y_pred_orig[:vis_limit], label='예측값', linestyle='--')
    plt.title(f'Fold {i+1} - RMSE: {cv_rmse:.4f}')
    plt.legend()
    plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig(f'{OUTPUT_DIR}/lstm_교차검증.png')
plt.show()

# 교차 검증 결과 요약
print("\n교차 검증 결과:")
print(f"평균 RMSE: {np.mean(cv_rmse_scores):.4f} (±{np.std(cv_rmse_scores):.4f})")
print(f"평균 MAE: {np.mean(cv_mae_scores):.4f} (±{np.std(cv_mae_scores):.4f})")

# 교차 검증 성능 시각화
plt.figure(figsize=(10, 6))
plt.bar(range(len(cv_rmse_scores)), cv_rmse_scores, alpha=0.7, label='RMSE')
plt.bar(range(len(cv_mae_scores)), cv_mae_scores, alpha=0.5, label='MAE')
plt.xlabel('Fold')
plt.ylabel('오차')
plt.title('교차 검증 성능')
plt.legend()
plt.grid(True, alpha=0.3)
plt.savefig(f'{OUTPUT_DIR}/lstm_교차검증_성능.png')
plt.show()