In [1]:
# ==========================================
# PyTorch CNN-LSTM Rolling Forecast + 이상치 + SHAP
# ==========================================
import os, sys, random, numpy as np, pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, r2_score
import shap
import matplotlib.pyplot as plt
from matplotlib import rc

# -------------------------
# [1] 환경 설정
# -------------------------
sys.path.append(r"C:\ESG_Project1\util")
from logger import setup_logger
logger = setup_logger(__name__)

rc('font', family='Malgun Gothic')
plt.rcParams['axes.unicode_minus'] = False
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"💻 Device: {device}")

# -------------------------
# [2] 데이터 로드
# -------------------------
path = "남동발전_발전데이터.csv"
df = pd.read_csv(path)
df['일자'] = pd.to_datetime(df['일자'], format='%Y%m%d')

feature_cols = ['기온','일사량','일조량']
hour_cols = [f'{i}시' for i in range(1,25)]
df = df.dropna(subset=feature_cols + hour_cols)

# -------------------------
# [3] 시퀀스 생성
# -------------------------
input_steps = 168
output_steps = 24

X_seq, y_seq = [], []
for i in range(len(df) - input_steps - output_steps + 1):
    X_window = []
    for j in range(i, i+input_steps):
        hour_features = np.tile(df.loc[j,feature_cols].values, (24,1))
        hours = np.arange(1,25).reshape(-1,1)
        seq = np.concatenate([hours,hour_features],axis=1)
        X_window.append(seq)
    X_seq.append(np.array(X_window).reshape(-1,4))
    y_seq.append(df.loc[i+input_steps:i+input_steps+output_steps-1,hour_cols].values.flatten())

X_seq = np.array(X_seq)
y_seq = np.array(y_seq).reshape(len(y_seq), output_steps, 24, 1)

# -------------------------
# [4] 스케일링
# -------------------------
scaler_X = MinMaxScaler()
X_scaled = scaler_X.fit_transform(X_seq.reshape(-1,X_seq.shape[-1])).reshape(X_seq.shape)

scaler_y = MinMaxScaler()
y_scaled = scaler_y.fit_transform(y_seq.reshape(-1,1)).reshape(y_seq.shape)

# -------------------------
# [5] 학습/테스트 분리
# -------------------------
train_size = int(len(X_scaled)*0.8)
X_train, X_test = X_scaled[:train_size], X_scaled[train_size:]
y_train, y_test = y_scaled[:train_size], y_scaled[train_size:]

X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
X_test  = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test  = torch.tensor(y_test, dtype=torch.float32).to(device)

train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)

# -------------------------
# [6] CNN-LSTM 모델 정의
# -------------------------
class CNNLSTM(nn.Module):
    def __init__(self, input_size=4, conv_channels=(32,16), lstm_hidden=64, output_steps=24, dropout=0.2):
        super().__init__()
        self.conv1 = nn.Conv1d(input_size, conv_channels[0],3,padding=1)
        self.bn1   = nn.BatchNorm1d(conv_channels[0])
        self.conv2 = nn.Conv1d(conv_channels[0], conv_channels[1],3,padding=1)
        self.bn2   = nn.BatchNorm1d(conv_channels[1])
        self.relu  = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.encoder_lstm = nn.LSTM(conv_channels[1], lstm_hidden, batch_first=True)
        self.decoder_lstm = nn.LSTM(1, lstm_hidden, batch_first=True)
        self.fc = nn.Linear(lstm_hidden,1)
        self.output_steps = output_steps

    def forward(self, x, y=None, teacher_forcing_ratio=0.0):
        # x: (batch, seq_len, features)
        x = self.relu(self.bn1(self.conv1(x.transpose(1,2))))
        x = self.relu(self.bn2(self.conv2(x))).transpose(1,2)
        _, (hidden, cell) = self.encoder_lstm(x)
        decoder_input = x[:,-1,0].unsqueeze(-1)
        outputs = []
        for t in range(self.output_steps):
            decoder_output, (hidden, cell) = self.decoder_lstm(decoder_input.unsqueeze(1),(hidden,cell))
            out = self.fc(decoder_output).squeeze(1)
            outputs.append(out)
            decoder_input = y[:,t] if (y is not None and torch.rand(1).item() < teacher_forcing_ratio) else out
        return torch.stack(outputs,dim=1)

# -------------------------
# [7] 모델 학습
# -------------------------
model = CNNLSTM(output_steps=output_steps).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scaler_amp = torch.cuda.amp.GradScaler()

epochs = 50
for epoch in range(1, epochs+1):
    model.train()
    train_loss = 0
    teacher_ratio = max(0.3,0.7-0.4*(epoch-1)/epochs)
    for xb,yb in train_loader:
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            y_pred = model(xb,yb,teacher_forcing_ratio=teacher_ratio)
            loss = criterion(y_pred,yb)
        scaler_amp.scale(loss).backward()
        scaler_amp.step(optimizer)
        scaler_amp.update()
        train_loss += loss.item()*xb.size(0)
    avg_train = train_loss/len(train_loader.dataset)
    logger.info(f"Epoch {epoch}/{epochs} | Train Loss: {avg_train:.6f} | TF:{teacher_ratio:.2f}")

# -------------------------
# [8] Rolling Forecast 예측
# -------------------------
model.eval()
predicted_scaled = []
with torch.no_grad():
    for i in range(len(X_test)):
        X_input = X_test[i:i+1]
        y_pred = model(X_input).cpu().numpy().flatten()
        predicted_scaled.append(y_pred)

predicted_scaled = np.array(predicted_scaled).reshape(len(X_test), output_steps, 24,1)
predicted_generation = scaler_y.inverse_transform(predicted_scaled.reshape(-1,1)).reshape(predicted_scaled.shape)
y_true = y_test.cpu().numpy()

# -------------------------
# [9] 평가
# -------------------------
rmse = np.sqrt(mean_squared_error(y_true.flatten(), predicted_generation.flatten()))
r2 = r2_score(y_true.flatten(), predicted_generation.flatten())
logger.info(f"✅ Rolling forecast 평가 결과: RMSE={rmse:.2f}, R²={r2:.4f}")

# -------------------------
# [10] 이상치 탐지
# -------------------------
errors = np.abs(y_true - predicted_generation)
threshold = np.quantile(errors, 0.95)
outliers = np.where(errors > threshold)
logger.info(f"🚨 이상치 탐지 완료: {len(outliers[0])}건 (상위 5%)")

# -------------------------
# [11] 시각화 (샘플 1일)
# -------------------------
plt.figure(figsize=(12,5))
plt.plot(y_true[0].flatten(), label='실제 발전량')
plt.plot(predicted_generation[0].flatten(), label='예측 발전량', linestyle='--')
plt.title("PyTorch CNN-LSTM Rolling Forecast (샘플 1일)")
plt.xlabel("시간")
plt.ylabel("발전량 (MWh)")
plt.legend()
plt.grid(True)
plt.show()

# -------------------------
# [12] SHAP 영향 분석
# -------------------------
# PyTorch 모델용 KernelExplainer
X_sample = X_test[:20].cpu().numpy()
explainer = shap.KernelExplainer(lambda x: model(torch.tensor(x,dtype=torch.float32).to(device)).cpu().detach().numpy().reshape(len(x),-1), X_sample)
shap_values = explainer.shap_values(X_sample)
shap.summary_plot(shap_values, X_sample, feature_names=['시간','기온','일사량','일조량'])


  super().__init__(


Epoch 1/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 7ms/step - loss: 0.0015 - val_loss: 7.5286e-05
Epoch 2/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m67s[0m 7ms/step - loss: 9.8607e-04 - val_loss: 6.9127e-05
Epoch 3/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m66s[0m 7ms/step - loss: 9.5615e-04 - val_loss: 6.0498e-05
Epoch 4/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m69s[0m 7ms/step - loss: 8.9721e-04 - val_loss: 6.9764e-05
Epoch 5/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m72s[0m 8ms/step - loss: 8.4891e-04 - val_loss: 6.3925e-05
Epoch 6/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 8ms/step - loss: 8.3645e-04 - val_loss: 1.3248e-04
Epoch 7/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m 8ms/step - loss: 8.2302e-04 - val_loss: 6.1607e-05
Epoch 8/150
[1m9408/9408[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m77s[0m

KeyboardInterrupt: 