In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.preprocessing import StandardScaler
import joblib


In [3]:
class AnomalyAutoencoder(nn.Module):
    def __init__(self, input_dim):
        super(AnomalyAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 16)
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 32),
            nn.ReLU(),
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        return self.decoder(self.encoder(x))


In [4]:
def train_autoencoder(model, dataloader, epochs=50, lr=1e-4):
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        for batch in dataloader:
            batch = batch[0].float()
            optimizer.zero_grad()
            output = model(batch)
            loss = criterion(output, batch)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss / len(dataloader):.6f}")


In [5]:
def calculate_threshold(model, validation_loader):
    model.eval()
    errors = []

    with torch.no_grad():
        for batch in validation_loader:
            batch = batch[0].float()
            output = model(batch)
            mse = torch.mean((batch - output) ** 2, dim=1)
            errors.extend(mse.cpu().numpy())

    errors = np.array(errors)
    threshold = np.percentile(errors, 99)
    print(f"Calculated anomaly threshold: {threshold:.6f}")
    return threshold


In [6]:
def detect_anomaly(sample, model, threshold):
    model.eval()
    sample = sample.float().unsqueeze(0)  # add batch dimension
    with torch.no_grad():
        reconstructed = model(sample)
        loss = torch.mean((sample - reconstructed) ** 2).item()
        return (loss > threshold), loss


In [7]:
def hybrid_detection_pipeline(sample, ae_model, dnn_model, threshold):
    sample_tensor = torch.tensor(sample, dtype=torch.float32)
    is_anomaly, score = detect_anomaly(sample_tensor, ae_model, threshold)

    if is_anomaly:
        dnn_model.eval()
        with torch.no_grad():
            prediction = dnn_model(sample_tensor.unsqueeze(0))  # batch size 1
            predicted_class = torch.argmax(prediction, dim=1).item()
        return f"Anomaly (score={score:.4f}) → Attack Type: {predicted_class}"
    else:
        return f"Benign (score={score:.4f})"


In [8]:
scaler = StandardScaler()
joblib.dump(scaler, "standard_scaler.pkl")


['standard_scaler.pkl']

In [9]:
scaler = joblib.load("standard_scaler.pkl")


X_train = np.load('./preprocessing/X_train_benign.npy')
X_val = np.load('./preprocessing/X_val_benign.npy')

X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val) 


# Prepare dataloaders
train_loader = DataLoader(TensorDataset(torch.tensor(X_train_scaled)), batch_size=128, shuffle=True)
val_loader = DataLoader(TensorDataset(torch.tensor(X_val_scaled)), batch_size=128, shuffle=False)

# Define and train autoencoder


In [None]:
input_dim = X_train.shape[1]
autoencoder = AnomalyAutoencoder(input_dim)
train_autoencoder(autoencoder, train_loader, epochs=50)

# Calculate threshold from validation set
threshold = calculate_threshold(autoencoder, val_loader)

# Save model and threshold
torch.save(autoencoder.state_dict(), "autoencoder.pth")
np.save("threshold.npy", threshold)

In [20]:
# Make sure input_dim matches your data
input_dim = X_train.shape[1]  # or set manually if X_train is not in memory

# Initialize model and load weights
model = AnomalyAutoencoder(input_dim)
model.load_state_dict(torch.load("autoencoder.pth"))
model.eval()

# Load threshold
threshold = np.load("threshold.npy")

print("Loaded threshold:", threshold)


Loaded threshold: 0.0060612448


  model.load_state_dict(torch.load("autoencoder.pth"))


In [17]:
X_attack = np.load('./preprocessing/X_attack.npy')
X_attack_scaled = scaler.transform(X_attack)

In [18]:
attack_loader = DataLoader(TensorDataset(torch.tensor(X_attack_scaled).float()), batch_size=128, shuffle=False)


In [None]:
def get_reconstruction_errors(model, data_loader):
    model.eval()
    errors = []

    with torch.no_grad():
        for batch in data_loader:
            batch = batch[0].float()
            output = model(batch)
            mse = torch.mean((batch - output) ** 2, dim=1)  # per-sample error
            errors.extend(mse.cpu().numpy())
    return np.array(errors)

errors_benign = get_reconstruction_errors(model, val_loader)
errors_attack = get_reconstruction_errors(model, attack_loader)


AttributeError: 'str' object has no attribute 'eval'