# 資料讀取與整合
* 數據路徑與對應標籤（正常=0 / 異常=1）

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score
from sklearn.model_selection import StratifiedKFold 
from settings import *
import scienceplots  
from torch.utils.tensorboard import SummaryWriter

In [None]:
# 圖片要儲存到哪個資料夾
os.makedirs(RESULT_DIR, exist_ok=True)

all_sequences = []
all_labels = []

def process_file(file_path, label):
    """
    讀取 CSV 檔案並切分為 CHUNK_SIZE 的小段。
    :param file_path: 檔案路徑
    :param label: 該檔案的標籤 (0: 正常, 1: 異常)
    """
    df = pd.read_csv(file_path)
    
    # 假設有 'current', 'voltage', 'power' 3個columns
    current = df['current'].values  # shape: (N,)
    voltage = df['voltage'].values
    power   = df['power'].values
    
    # 組合特徵維度: (N, 3)
    sequence = np.column_stack((current, voltage, power))
    seq_len = sequence.shape[0]
    
    # 將數據切分為 CHUNK_SIZE 的小段
    num_chunks = seq_len // MAX_SEQ_LEN
    for i in range(num_chunks):
        start = i * MAX_SEQ_LEN
        end = start + MAX_SEQ_LEN
        chunk = sequence[start:end]
        all_sequences.append(chunk)
        all_labels.append(label)


# === 讀取 normal 資料夾的 CSV，標籤=0 ===
for filename in os.listdir(NORMAL_DIR):
    if filename.lower().endswith(".csv"):
        file_path = os.path.join(NORMAL_DIR, filename)
        process_file(file_path, label=0)

# === 讀取 abnormal 資料夾的 CSV，標籤=1 ===
for filename in os.listdir(ABNORMAL_DIR):
    if filename.lower().endswith(".csv"):
        file_path = os.path.join(ABNORMAL_DIR, filename)
        process_file(file_path, label=1)
        
# 轉為 numpy array
all_sequences = np.array(all_sequences, dtype=np.float32)  # shape: (num_samples, 30, 3)
all_labels = np.array(all_labels, dtype=np.int64)          # shape: (num_samples,)

print("all_sequences shape:", all_sequences.shape)
print("all_labels shape:", all_labels.shape)
print("Number of normal samples:", np.sum(all_labels == 0))
print("Number of abnormal samples:", np.sum(all_labels == 1))

# 自訂 Dataset 與 DataLoader

In [None]:
class ChargingDataset(Dataset):
    def __init__(self, sequences, labels):
        self.sequences = sequences  # shape: (num_samples, 30, 3)
        self.labels = labels        # shape: (num_samples,)

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        x = self.sequences[idx]    # (30, 3)
        y = self.labels[idx]       # 0 or 1
        x_tensor = torch.tensor(x, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.long)
        return x_tensor, y_tensor

# LSTM模型
輸入(voltage, current, power) 
輸出(normal / abnormal)

In [None]:
class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes=2):
        super(LSTMClassifier, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size=input_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_dim, num_classes)
        
    def forward(self, x):
        # x: (batch_size, seq_len, input_dim) 可能在 GPU (cuda) 或 CPU
        batch_size = x.size(0)

        # h0: , c0: Initialize cell state
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=x.device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=x.device)
        
        out, (hn, cn) = self.lstm(x, (h0, c0))
        # out: (batch_size, seq_len, hidden_dim)
        
        out = out[:, -1, :]  # 取最後時刻的輸出 (batch_size, hidden_dim)
        out = self.fc(out)   # (batch_size, num_classes)
        return out


# K-fold Cross Validation

In [None]:
dataset = ChargingDataset(all_sequences, all_labels)
kfold = StratifiedKFold(n_splits=KFOLD_SPLITS, shuffle=True, random_state=SEED)

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
def evaluate_model(model, loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for x_batch, y_batch in loader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)

            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)

            running_loss += loss.item() * x_batch.size(0)

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)

    avg_loss = running_loss / total
    avg_acc = correct / total if total > 0 else 0
    return avg_loss, avg_acc

# K-fold 訓練

In [None]:
all_folds_metrics = []

for fold_idx, (train_indices, test_indices) in enumerate(kfold.split(all_sequences, all_labels)):
    print(f"\n=== Fold {fold_idx+1} / {kfold.n_splits} ===")

    # 初始化 TensorBoard 的 SummaryWriter，將每個 fold 的日誌寫入不同的子目錄
    writer = SummaryWriter(log_dir=os.path.join(RESULT_DIR, f"fold_{fold_idx}"))
    
    # -- 建立當前fold的 train / test data --
    train_sequences = all_sequences[train_indices]
    train_labels    = all_labels[train_indices]
    test_sequences  = all_sequences[test_indices]
    test_labels     = all_labels[test_indices]
    
    # Dataset
    train_dataset = ChargingDataset(train_sequences, train_labels)
    test_dataset  = ChargingDataset(test_sequences,  test_labels)
    
    # DataLoader
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False)
    
    # 每個 fold 需要新建一個 LSTM 模型並重新訓練
    model = LSTMClassifier(INPUT_DIM, HIDDEN_DIM, NUM_LAYERS, NUM_CLASSES).to(device)

    # class_weights = torch.tensor([1.0, 1.2], device=device)  # 假設異常類別重要性更高
    # class_weights = torch.tensor([1.17, 0.83], device=device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    
    # 用於記錄歷次 epoch 的 loss/acc
    train_loss_list = []
    train_acc_list = []
    test_loss_list = []
    test_acc_list = []
    
    test_precision_list = []
    test_recall_list = []
    test_f1_list = []
    
    # === 開始訓練 (num_epochs) ===
    for epoch in range(NUM_EPOCHS):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for x_batch, y_batch in train_loader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)

            optimizer.zero_grad()
            outputs = model(x_batch)
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * x_batch.size(0)

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y_batch).sum().item()
            total += y_batch.size(0)
        
        # epoch的 train loss/acc
        epoch_train_loss = running_loss / total
        epoch_train_acc = correct / total if total>0 else 0

        
        # 接著在同一epoch，做一次 test_loader 的評估
        epoch_test_loss, epoch_test_acc = evaluate_model(model, test_loader, criterion, device)

        # 在每個 epoch 結束時，計算 Precision, Recall, F1-score
        model.eval()
        preds = []
        trues = []
        with torch.no_grad():
            for x_batch, y_batch in test_loader:
                x_batch = x_batch.to(device)
                y_batch = y_batch.to(device)
                outputs = model(x_batch)
                _, predicted = torch.max(outputs, 1)
                preds.extend(predicted.cpu().numpy())
                trues.extend(y_batch.cpu().numpy())

        test_precision = precision_score(trues, preds, zero_division=0)
        test_recall = recall_score(trues, preds, zero_division=0)
        test_f1 = f1_score(trues, preds, zero_division=0)

        test_precision_list.append(test_precision)
        test_recall_list.append(test_recall)
        test_f1_list.append(test_f1)

        train_loss_list.append(epoch_train_loss)
        train_acc_list.append(epoch_train_acc)
        test_loss_list.append(epoch_test_loss)
        test_acc_list.append(epoch_test_acc)

        print(f"Epoch {epoch+1}/{NUM_EPOCHS} | "
              f"TrainLoss: {epoch_train_loss:.4f}, TrainAcc: {epoch_train_acc:.4f} | "
              f"TestLoss: {epoch_test_loss:.4f}, TestAcc: {epoch_test_acc:.4f} | "
              f"Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, F1: {test_f1:.4f}")

        # 將指標記錄到 TensorBoard
        writer.add_scalar('Loss/Train', epoch_train_loss, epoch)
        writer.add_scalar('Loss/Test', epoch_test_loss, epoch)
        writer.add_scalar('Accuracy/Train', epoch_train_acc, epoch)
        writer.add_scalar('Accuracy/Test', epoch_test_acc, epoch)
        writer.add_scalar('Precision/Test', test_precision, epoch)
        writer.add_scalar('Recall/Test', test_recall, epoch)
        writer.add_scalar('F1_Score/Test', test_f1, epoch)
    
    dummy_input = torch.randn(1, MAX_SEQ_LEN, INPUT_DIM).to(device)
    writer.add_graph(model, dummy_input)
                                                        
    writer.close()

    epochs_range = range(1, NUM_EPOCHS+1)
    # 新增：每個 fold 訓練結束後，建立一個 DataFrame 來儲存該 fold 的指標
    fold_metrics_df = pd.DataFrame({
        'Epoch': epochs_range,
        'Train Loss': train_loss_list,
        'Test Loss': test_loss_list,
        'Train Accuracy': train_acc_list,
        'Test Accuracy': test_acc_list,
        'Test Precision': test_precision_list,
        'Test Recall': test_recall_list,
        'Test F1-score': test_f1_list
    })
    fold_metrics_df['Fold'] = fold_idx 
    all_folds_metrics.append(fold_metrics_df)

    # === 混淆矩陣 ===
    model.eval()
    preds = []
    trues = []
    with torch.no_grad():
        for x_batch, y_batch in test_loader:
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)
            outputs = model(x_batch)
            _, predicted = torch.max(outputs, 1)
            preds.extend(predicted.cpu().numpy())
            trues.extend(y_batch.cpu().numpy())

    cm = confusion_matrix(trues, preds, labels=[0,1])
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[0,1])

    plt.figure(figsize=(4,4))
    disp.plot(values_format='d', cmap='Blues')
    plt.title(f"Fold {fold_idx} - Confusion Matrix")
    
    # 儲存到 result/ 目錄
    fold_cm_path = os.path.join(RESULT_DIR, f"fold_{fold_idx}_cm.pdf")
    plt.savefig(fold_cm_path, bbox_inches='tight')
    fold_cm_path = os.path.join(RESULT_DIR, f"fold_{fold_idx}_cm.svg")
    plt.savefig(fold_cm_path, bbox_inches='tight')
    plt.show()

    # 儲存模型
    model_save_path = os.path.join(RESULT_DIR, f"fold_{fold_idx}_model.pth")
    torch.save(model.state_dict(), model_save_path) # 新增：儲存模型狀態字典
    print(f"Model for fold {fold_idx} saved to {model_save_path}") # 新增：印出模型儲存訊息

# 所有 fold 循環結束後，將 all_folds_metrics 列表中的 DataFrame 合併
all_metrics_df = pd.concat(all_folds_metrics, ignore_index=True)

# 將合併後的 DataFrame 儲存到 CSV 檔案
all_metrics_csv_path = os.path.join(RESULT_DIR, "all_folds_metrics.csv")
all_metrics_df.to_csv(all_metrics_csv_path, index=False)
print(f"All folds metrics saved to {all_metrics_csv_path}")

# 圖表繪製

In [None]:
folds = all_metrics_df['Fold'].unique()

for fold in folds:
    fold_data = all_metrics_df[all_metrics_df['Fold'] == fold]

    # with plt.style.context(["science"]):
    fig, axes = plt.subplots(1, 5, figsize=(25, 5))
    fig.suptitle(f'Fold {fold} Metrics', fontsize=16)

    # Train/Test Loss
    axes[0].plot(fold_data['Epoch'], fold_data['Train Loss'], label='Train Loss')
    axes[0].plot(fold_data['Epoch'], fold_data['Test Loss'], label='Test Loss')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Train/Test Loss')
    axes[0].legend()

    # Train/Test Accuracy
    axes[1].plot(fold_data['Epoch'], fold_data['Train Accuracy'], label='Train Accuracy')
    axes[1].plot(fold_data['Epoch'], fold_data['Test Accuracy'], label='Test Accuracy')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('Accuracy')
    axes[1].set_title('Train/Test Accuracy')
    axes[1].legend()

    # Precision
    axes[2].plot(fold_data['Epoch'], fold_data['Test Precision'], label='Precision') 
    axes[2].set_xlabel('Epoch')
    axes[2].set_ylabel('Precision')
    axes[2].set_title('Precision')
    axes[2].legend()

    # Recall
    axes[3].plot(fold_data['Epoch'], fold_data['Test Recall'], label='Recall') 
    axes[3].set_xlabel('Epoch')
    axes[3].set_ylabel('Recall')
    axes[3].set_title('Recall')
    axes[3].legend()

    # F1-score
    axes[4].plot(fold_data['Epoch'], fold_data['Test F1-score'], label='F1-score') 
    axes[4].set_xlabel('Epoch')
    axes[4].set_ylabel('F1-Score')
    axes[4].set_title('F1-Score')
    axes[4].legend()

    plt.tight_layout()
    fold_plot_path = os.path.join(RESULT_DIR, f"fold_{fold}_all_metrics.pdf")
    plt.savefig(fold_plot_path, bbox_inches='tight')
    fold_plot_path = os.path.join(RESULT_DIR, f"fold_{fold}_all_metrics.svg")
    plt.savefig(fold_plot_path, bbox_inches='tight')
    plt.show()
    plt.close(fig) 