In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.optim import Adam
from sklearn.utils.class_weight import compute_class_weight
import matplotlib.pyplot as plt
import numpy as np
from pathlib import Path
from Transformer_Archs.Bi_LSTM import BiLSTM
import math
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.optim as optim
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report, roc_auc_score, average_precision_score



In [None]:
# Option for 5% or 10% attack data
option = '15' 

# Get data loaders
train_dataset = torch.load('./Preprocessed_data/train_dataset_{}.pt'.format(option), weights_only=False)
train_config = torch.load('./Preprocessed_data/train_config_{}.pt'.format(option), weights_only=False)
train_loader = DataLoader(train_dataset, **train_config)

val_dataset = torch.load('./Preprocessed_data/val_dataset_{}.pt'.format(option), weights_only=False)
val_config = torch.load('./Preprocessed_data/val_config_{}.pt'.format(option), weights_only=False)
val_loader = DataLoader(val_dataset, **val_config)

test_dataset = torch.load('./Preprocessed_data/test_dataset_{}.pt'.format(option), weights_only=False)
test_config = torch.load('./Preprocessed_data/test_config_{}.pt'.format(option), weights_only=False)
test_loader = DataLoader(test_dataset, **test_config)

# Set feautures and target size
num_features = 86
out_features = 11
seq_len = 12

In [None]:

input_size   = num_features                         
hidden_size   = 64                  
num_layers   = 2
output_size   = out_features

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
model = BiLSTM(
    input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, output_size= output_size
).to(device)

In [None]:
def prepare_batch(x, y):
    """
    Ensures x -> (batch, in_channels=num_features, seq_len) for Conv1d,
    and y -> int64 class indices for CrossEntropyLoss.
    """
    # x may be (B, S, F) or (B, F, S)
    if x.dim() != 3:
        raise ValueError(f"Expected 3D input (B, S, F) or (B, F, S), got {x.shape}")

    B, A, Bdim = x.shape
    if A == seq_len and Bdim == num_features:
        # (B, S, F) -> transpose to (B, F, S)
        x = x.permute(0, 2, 1)
    elif A == num_features and Bdim == seq_len:
        # already (B, F, S)
        pass
    else:
        # Try to infer; if not, error out clearly
        raise ValueError(f"Input shape {x.shape} doesn't match either (B,{seq_len},{num_features}) or (B,{num_features},{seq_len}).")

    # Targets: want class indices [0..out_features-1]
    if y.dtype != torch.long:
        # If one-hot or floats, convert to indices
        if y.dim() > 1 and y.size(-1) == out_features:
            y = y.argmax(dim=-1)
        else:
            y = y.long()

    return x.to(device), y.to(device)


In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5, verbose=True)

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        """
        Args:
            patience (int): How many epochs to wait after last time validation loss improved.
            min_delta (float): Minimum change in the monitored quantity to qualify as an improvement.
        """
        self.patience = patience
        self.min_delta = min_delta
        self.best_loss = None
        self.counter = 0
        self.early_stop = False

    def __call__(self, val_loss):
        if self.best_loss is None:
            self.best_loss = val_loss
        elif val_loss > self.best_loss - self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.counter = 0

In [None]:

best_val_loss = math.inf
patience = 7
patience_left = patience
best_path = f'.Final_models/Bi-LSTM/Bi_lstm_best_{option}.pt'

scaler = torch.cuda.amp.GradScaler(enabled=torch.cuda.is_available())

def run_epoch(loader, train_mode: bool):
    if train_mode:
        model.train()
    else:
        model.eval()

    epoch_loss = 0.0
    correct = 0
    total = 0

    for batch in loader:
        # Support datasets returning dicts or tuples
        if isinstance(batch, dict):
            x, y = batch['x'], batch['y']
        else:
            x, y = batch

        x, y = prepare_batch(x, y)

        with torch.set_grad_enabled(train_mode):
            with torch.cuda.amp.autocast(enabled=torch.cuda.is_available()):
                logits = model(x)                 # (B, out_features)
                loss = criterion(logits, y)

            if train_mode:
                optimizer.zero_grad(set_to_none=True)
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()

        epoch_loss += loss.item() * x.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)

    avg_loss = epoch_loss / max(1, total)
    acc = correct / max(1, total)
    return avg_loss, acc

EPOCHS = 50
for epoch in range(1, EPOCHS + 1):
    train_loss, train_acc = run_epoch(train_loader, train_mode=True)
    val_loss, val_acc = run_epoch(val_loader, train_mode=False)

    scheduler.step(val_loss)

    print(f"Epoch {epoch:02d} | "
          f"train loss {train_loss:.4f} acc {train_acc:.3f} | "
          f"val loss {val_loss:.4f} acc {val_acc:.3f}")

    if val_loss < best_val_loss - 1e-6:
        best_val_loss = val_loss
        patience_left = patience
        torch.save({'model_state': model.state_dict(),
                    'config': {
                        'in_channels': input_size,
                        'hidden_size': hidden_size,
                        'num_layers': num_layers,
                        'output_size': output_size,
                        'seq_length': seq_len
                    }}, best_path)
        print(f"  ↳ saved new best model to {best_path}")
    else:
        patience_left -= 1
        if patience_left == 0:
            print("Early stopping.")
            break


In [None]:
# Load best checkpoint
ckpt = torch.load(best_path, map_location=device)
model.load_state_dict(ckpt['model_state'])
model.eval()

test_loss, test_acc = run_epoch(test_loader, train_mode=False)
print(f"TEST | loss {test_loss:.4f} acc {test_acc:.3f}")


In [None]:
import torch
import numpy as np
from collections import Counter

@torch.no_grad()
def collect_preds_targets(loader):
    model.eval()
    preds_all, y_all = [], []
    for batch in loader:
        if isinstance(batch, dict):
            x, y = batch['x'], batch['y']
        else:
            x, y = batch
        x, y = prepare_batch(x, y)
        logits = model(x)
        preds = logits.argmax(dim=1)
        preds_all.append(preds.cpu())
        y_all.append(y.cpu())
    return torch.cat(preds_all), torch.cat(y_all)

preds, targets = collect_preds_targets(test_loader)
num_classes = out_features
cm = torch.zeros((num_classes, num_classes), dtype=torch.int64)
for t, p in zip(targets, preds):
    cm[t, p] += 1

print("Confusion matrix:\n", cm.numpy())
per_class_acc = (cm.diag() / cm.sum(dim=1).clamp(min=1)).numpy()
print("Per-class accuracy:", per_class_acc)


In [None]:
from sklearn.metrics import classification_report, accuracy_score, balanced_accuracy_score
import numpy as np

# Convert to 1D numpy arrays
y_true = targets.cpu().numpy().ravel()
y_pred = preds.cpu().numpy().ravel()

# class_names = ["class_0", "class_1", ..., "class_{num_classes-1}"]
class_names = [f"class_{i}" for i in range(num_classes)]

# Ensure all classes appear in the report even if missing in y_true or y_pred
labels = np.arange(num_classes)

print("Overall accuracy:", accuracy_score(y_true, y_pred))
print("Balanced accuracy:", balanced_accuracy_score(y_true, y_pred))
print("\nClassification report:\n")
print(classification_report(
    y_true,
    y_pred,
    labels=labels,
    target_names=class_names,   # or omit if you don’t want names
    digits=4,
    zero_division=0             # avoid warnings for empty precision/recall
))


In [None]:
# False positive rate
fp = cm.sum(axis=0) - np.diag(cm)
fn = cm.sum(axis=1) - np.diag(cm)
tp = np.diag(cm)
tn = cm.sum() - (fp + fn + tp)

fpr = fp/(fp + tn)
print("False Positive Rate\n No Attack: {:.3f}%, Bus15: {:.3f}%, Bus18: {:.3f}%, Bus19: {:.3f}%, Bus20: {:.3f}%, Bus21: {:.3f}%, "
"Bus23: {:.3f}%, Bus24: {:.3f}%, Bus26: {:.3f}%, Bus29: {:.3f}%,Bus30: {:.3f}%".format(fpr[0]*100,
                                                        fpr[1]*100,fpr[2]*100, fpr[3]*100, fpr[4]*100, fpr[5]*100, fpr[6]*100, fpr[7]*100, fpr[8]*100, fpr[9]*100, fpr[10]*100))

# False negative rate
fnr = fn/(fn + tp)
print("False Negative Rate\n No Attack: {:.3f}%, Bus15: {:.3f}%, Bus18: {:.3f}%, Bus19: {:.3f}%, Bus20: {:.3f}%, Bus21: {:.3f}%, "
"Bus23: {:.3f}%, Bus24: {:.3f}%, Bus26: {:.3f}%, Bus29: {:.3f}%,Bus30: {:.3f}%".format(fnr[0]*100,
                                                        fnr[1]*100,fnr[2]*100, fnr[3]*100, fnr[4]*100, fnr[5]*100, fnr[6]*100, fnr[7]*100, fnr[8]*100, fnr[9]*100, fnr[10]*100))