In [None]:
from sklearn.preprocessing import OneHotEncoder,RobustScaler
from sklearn.compose import ColumnTransformer 
from sklearn.metrics import accuracy_score
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import f1_score, precision_score, recall_score ,classification_report,confusion_matrix

from collections import defaultdict
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

from modelzoo import *

1. 전처리된 df → 시계열 형태로 정렬
2. Sliding Window 등으로 (samples, timesteps, features) 구성
3. MinMaxScaler로 시퀀스 스케일링
4. LSTM 모델 설계 (Keras 기반 추천)
5. 학습 및 예측
6. 분류/회귀 성능 비교
7. 베이스라인보다 나은지 분석


In [None]:
# 2단계 전처리 충전소별로 시간순 정렬
df = pd.read_csv('../data/csv/50area_dummy_processed.csv')
df_sort = df.sort_values(by=['station_location','connection_start_time_ts']).reset_index(drop=True)
df_sort

In [None]:
target_class = 'post_charge_departure_range'
target_reg = 'kwh_per_usage_time'
# sort colums
onehot_col = ['station_location','evse_name','evse_type','supports_discharge','scheduled_charge','weekday','cluster','post_charge_departure_range','usage_departure_range'] 
scale_col = [col for col in df.columns.to_list() if col not in onehot_col]

ct = ColumnTransformer(
    [
        ('scaling',RobustScaler(),scale_col),
        ('onehot',OneHotEncoder(sparse_output=False,handle_unknown='ignore'),onehot_col)
    ]
)


In [None]:
#지점별로 시퀀스 분리
seq_len = 24 #48시간 동안의 특징 파악 
max_n = 1*24 # 최대 예측 n (ex 1주일)

# 슬라이딩 윈도우 함수
def create_sequences(X_df, y_class_series, y_reg_series, feature_cols, seq_len, max_n):
    X_seq, y_class, y_reg = [], [], []

    for i in range(len(X_df) - seq_len - max_n + 1):
        X_seq.append(X_df[feature_cols].iloc[i:i+seq_len].values)
        y_class.append(y_class_series.iloc[i + seq_len : i + seq_len + max_n].values)
        y_reg.append(y_reg_series.iloc[i + seq_len : i + seq_len + max_n].values)

    return np.array(X_seq), np.array(y_class), np.array(y_reg)

#지점별 시간대 정렬후 데이터 합치기
def create_data(ct,df_sort,max_n,target_class,target_reg,seq_len):
    all_X, all_y_class, all_y_reg = [], [], []
    
    ct.fit(df_sort)
    
    for _, group in df_sort.groupby('station_location'):
        if len(group) < seq_len+max_n:
            continue
        group_sorted = group.sort_values('charging_start_time_ts')
        
        # ColumnTransformer 적용
        X_trans = ct.transform(group_sorted)
        df_trans = pd.DataFrame(X_trans, columns=ct.get_feature_names_out())
        # 원본에서 타겟 시리즈 추출
        y_class_series = group_sorted[target_class]
        y_reg_series = group_sorted[target_reg]

        # 슬라이딩 윈도우 생성
        X_seq, y_class, y_reg = create_sequences(
            df_trans,
            y_class_series,
            y_reg_series,
            feature_cols=ct.get_feature_names_out(),
            seq_len=seq_len,
            max_n=max_n
        )
        all_X.append(X_seq)
        all_y_class.append(y_class)
        all_y_reg.append(y_reg)

    x_all = np.concatenate(all_X,axis=0)
    y_class_all = np.concatenate(all_y_class , axis=0)
    y_reg_all = np.concatenate(all_y_reg, axis=0)
    
    return x_all, y_class_all, y_reg_all

x_all, y_class_all, y_reg_all = create_data(ct,df_sort,max_n,target_class,target_reg,seq_len)

In [None]:
#3단계: LSTM 멀티태스크 모델 구성 (회귀 + 분류)
#모델1

class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, weight=None):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.weight = weight  # 클래스 가중치 (불균형 대응)

    def forward(self, input, target):
        logp = F.log_softmax(input, dim=1)
        p = torch.exp(logp)
        logp = (1 - p) ** self.gamma * logp
        loss = F.nll_loss(logp, target, weight=self.weight)
        return loss 

class MultiTargetDataset(Dataset):
    def __init__(self,x,n,y_regs,y_clss):
        self.x = torch.tensor(x,dtype=torch.float32)
        self.n = torch.tensor(n,dtype=torch.float32)
        self.y_reg = torch.tensor(y_regs,dtype=torch.float32)
        self.y_cls =torch.tensor(y_clss,dtype=torch.long)
    def __len__(self):
        return len(self.x)
    def __getitem__(self, idx):
        return self.x[idx],self.n[idx],self.y_reg[idx],self.y_cls[idx]

#data
X,n_array,y_regs,y_clss = create_seq(df,data_scaled,seq_len,max_n)

#hyper parameter
input_dim = X.shape[2]
hidden_dim = 128
num_layers = 2
num_classes = len(np.unique(y_clss))
batch_size = 64
epochs = 10
lr = 0.01
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


model = LSTMwithMultiOutput(input_dim,hidden_dim,num_layers,num_classes).to(device)
# model = BiLSTMwithMultiOutput(input_dim,hidden_dim,num_layers,num_classes).to(device)
dataset = MultiTargetDataset(X,n_array,y_regs,y_clss)
loader = DataLoader(dataset,batch_size=batch_size,shuffle=True)


unique_classes = np.unique(y_clss)
class_weights = compute_class_weight('balanced', classes=unique_classes, y=y_clss)
weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)


loss_fn_reg = nn.MSELoss()
# loss_fn_cls = nn.CrossEntropyLoss(weight=weights_tensor)
loss_fn_cls = FocalLoss(gamma=2.0,weight=weights_tensor)

optimizer = optim.Adam(model.parameters(),lr=lr)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.5)




In [None]:
from collections import Counter
print(Counter(y_clss))


In [None]:
for epoch in range(epochs):
    model.train()
    total_loss, total_reg_loss, total_cls_loss = 0, 0, 0
    all_preds, all_labels = [], []

    for xb, nb, yb_reg, yb_cls in loader:
        xb, nb = xb.to(device), nb.to(device)
        yb_reg, yb_cls = yb_reg.to(device), yb_cls.to(device)

        pred_reg, pred_cls = model(xb, nb)  # 회귀, 분류 예측값
        # print(yb_reg.min().item(), yb_reg.max().item())  # 정규화된 범위인지
        # print(pred_reg.min().item(), pred_reg.max().item())  # 튀는 값 있는지
        # print(pred_reg.shape, yb_reg.shape)

        # 손실 계산
        loss_reg = loss_fn_reg(pred_reg, yb_reg)
        loss_cls = loss_fn_cls(pred_cls, yb_cls)
        
        alpha = 1.0  # 회귀 손실 비중
        beta = 2.0   # 분류 손실 비중 (더 작게 시작)

        loss = alpha * loss_reg + beta * loss_cls

        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=2.0)

        optimizer.step()

        total_loss += loss.item()
        total_reg_loss += loss_reg.item()
        total_cls_loss += loss_cls.item()

        # 분류 정확도 계산
        all_preds.extend(pred_cls.argmax(dim=1).cpu().numpy())
        all_labels.extend(yb_cls.cpu().numpy())
        

    n_batches = len(loader)
        
    scheduler.step()
    curr_lr = optimizer.param_groups[0]['lr']

    avg_loss = total_loss / n_batches
    avg_reg  = total_reg_loss  / n_batches
    avg_cls  = total_cls_loss  / n_batches
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')       # 다중 클래스용
    precision = precision_score(all_labels, all_preds, average='macro',zero_division=1)
    recall = recall_score(all_labels, all_preds, average='macro')
    print(f"[{epoch+1}] LR = {curr_lr:.5f} Loss: {avg_loss:.4f} | Reg: {avg_reg:.4f} | Cls: {avg_cls:.4f} | Acc: {acc:.4f} | F1: {f1:.4f} | precision: {precision:.4f} | recall: {recall:.4f}")

In [None]:
# 예측 → 정수 클래스 → 문자열로 디코딩
model.eval()  # <--- Add this line to set model to eval mode
with torch.no_grad():
    x_sample = X[0]                # shape: (seq_len, input_dim)
    n_sample = n_array[0]
    x_tensor = torch.tensor(x_sample, dtype=torch.float32).unsqueeze(0).to(device)
    n_tensor = torch.tensor(n_sample, dtype=torch.float32).unsqueeze(0).to(device)

    pred_energy, pred_state = model(x_tensor, n_tensor)
    pred_state_label = target_cls_le.inverse_transform([int(pred_state.argmax(1).item())])[0]
    real_state_label = target_cls_le.inverse_transform([int(y_clss[0])])[0]
    pred_energy_cpu = pred_energy.detach().cpu().numpy().reshape(-1, 1)
    pred_energy_inv = scaler_y.inverse_transform(pred_energy_cpu)
print(f"예측 장비 상태: {pred_state_label}, 실제 상태: {real_state_label}")
print(f"예측 에너지 사용량 (역정규화값): {pred_energy_inv.item():.4f}")

In [None]:
#모델 저장
# torch.save(model,'../model/bilstm_0.48.pt')
bilstms = torch.load('../model/bilstm_0.48.pt', weights_only=False)

In [None]:
#입력 n 시간뒤 예측
sample_seq = X[10000]
sample_seq_tensor = torch.tensor(sample_seq,dtype=torch.float32).unsqueeze(0).to(device)

def predict_n_hours_ahead(n_hour, model):
    n_norm = n_hour / max_n
    n_tensor = torch.tensor([[n_norm]], dtype=torch.float32).to(device)  # shape (1, 1)
    
    with torch.no_grad():
        model.eval()
        pred_reg, pred_cls = model(sample_seq_tensor, n_tensor)
        pred_energy = scaler_y.inverse_transform(pred_reg.cpu().numpy().reshape(-1, 1))[0][0]
        pred_cls_label = target_cls_le.inverse_transform([torch.argmax(pred_cls, dim=1).item()])[0]
    return pred_energy, pred_cls_label

n = 3 # 12시간 뒤 예측
에너지, 상태 = predict_n_hours_ahead(n,bilstms)
print(f"{n}시간 뒤 에너지 사용량: {에너지:.2f}, 장비 상태: {상태}")

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt

cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=target_cls_le.classes_)
disp.plot(cmap='Blues')
plt.title("Confusion Matrix")
plt.show()


In [None]:
print("Class별 F1:", f1_score(all_labels, all_preds, average=None))

In [None]:
import matplotlib.pyplot as plt
plt.hist(y_clss, bins=np.arange(num_classes+1)-0.5, rwidth=0.8)
plt.title("Class Distribution")
plt.xlabel("Class")
plt.ylabel("Count")
plt.xticks(range(num_classes))
plt.show()
