In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, chi2, r_regression
from torch.utils.data import DataLoader, Dataset, TensorDataset, WeightedRandomSampler
from sklearn.metrics import RocCurveDisplay, f1_score, PrecisionRecallDisplay, roc_curve, precision_recall_curve
import matplotlib.pyplot as plt
import plotly.graph_objects as go

In [None]:
# Dataset can be found here: https://www.kaggle.com/datasets/mrwellsdavid/unsw-nb15

df = pd.read_csv('<filepath of dataset>')
df.drop(['id', "label"], axis=1, inplace=True)

In [None]:
# Perform normalization

df_nums = df.select_dtypes(include=[np.number])
df_nums = (df_nums-df_nums.min())/(df_nums.max()-df_nums.min())

for col in df_nums.columns:
    df[col] = df_nums[col]

In [None]:
# View number of samples in each of the various attack categories

df[['attack_cat']].value_counts()

In [None]:
# Choose which attacks will the the out-of-distribution attacks

ood_cats = ["Backdoor", "Worms", "Shellcode"]
def mark_ood(row):
    if row['attack_cat'] in ood_cats: row['ood'] = 1
    else: row['ood'] = 0
    return row

df = df.apply(mark_ood, axis=1)

In [None]:
# Split the dataset into training, testing, and OOD

dfX = df.drop(['attack_cat'], axis=1)
dfy = df[['attack_cat', 'ood']]

dfX = pd.get_dummies(dfX)

df_ood = dfX[dfX['ood'] == 1]
df_ood.drop(['ood'], axis=1, inplace=True)

X = dfX[dfX['ood'] == 0]
X.drop(['ood'], axis=1, inplace=True)

y = dfy[dfy['ood'] == 0]
y.drop(['ood'], axis=1, inplace=True)
y = pd.get_dummies(y)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X.values, y.values, test_size=0.2)

X_ood = torch.FloatTensor(np.concatenate((X_test, df_ood.values), axis=0))
y_ood = torch.FloatTensor([1 for j in range(len(X_test))] + [0 for i in range(len(df_ood))])

X_train = torch.FloatTensor(X_train)
X_test = torch.FloatTensor(X_test)
y_train = torch.FloatTensor(y_train)
y_test = torch.FloatTensor(y_test)

In [None]:
# Oversampling using weights

weightlist = [1 / y[col].sum() for col in y.columns]
samp = WeightedRandomSampler(weights=[weightlist[cat.argmax()] for cat in y_train], num_samples=len(y_train), replacement=True)

In [None]:
# Create dataloaders for the dataset

train = TensorDataset(X_train, y_train)
train_loader = DataLoader(train, batch_size=512, sampler=samp)

test = TensorDataset(X_test, y_test)
test_loader = DataLoader(test, batch_size=512)

ood = TensorDataset(X_ood, y_ood)
ood_loader = DataLoader(ood, batch_size=512)

In [None]:
# Verify the distribution of attack classes in the training set are uniform

from collections import defaultdict
store = defaultdict(int)
for x, y in train_loader:
    for val in y:
        store[int(val.argmax(0))] += 1
    break
for key in store:
    print(key, store[key])

In [None]:
for x, y in train_loader:
    print(x.shape)
    break

In [None]:
# Neural network architecture

class ANN(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(in_features=190, out_features=256)
        self.norm = nn.BatchNorm1d(256)
        self.fc2 = nn.Linear(in_features=256, out_features=256)
        self.norm2 = nn.BatchNorm1d(256)
        self.output = nn.Linear(in_features=256, out_features=7)
        #self.output2 = nn.Softmax(dim=1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.norm(x)
        x = F.relu(self.fc2(x))
        x = self.norm2(x)
        #x = self.norm4(x)
        x = self.output(x)
        #x = self.output2(x)
        return x

model = ANN()
model

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# Training phase

epochs = 100
lossarr = []
size = len(test_loader.dataset)
num_batches = len(test_loader)
testloss = []
for i in range(epochs):
    model.train()
    for batch, (x, y) in enumerate(train_loader):
        y_hat = model.forward(x)
        loss = criterion(y_hat, y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    lossarr.append(loss.item())

    model.eval()
    test_loss, correct = 0, 0
    preds = []
    with torch.no_grad():
        for X, y in test_loader:
            pred = model(X)
            preds.extend(pred)
            test_loss += criterion(pred, y).item()
            correct += (pred.argmax(1) == y.argmax(1)).type(torch.float).sum().item()
    testloss.append(test_loss/num_batches)

plt.plot(list(range(epochs)), lossarr, label="training loss")
plt.plot(list(range(epochs)), testloss, label="test loss")
plt.legend()
plt.show()

In [None]:
# Testing phase

size = len(test_loader.dataset)
num_batches = len(test_loader)
model.eval()
test_loss, correct = 0, 0
preds = []
with torch.no_grad():
    for X, y in test_loader:
        pred = model(X)
        preds.extend(pred)
        test_loss += criterion(pred, y).item()
        correct += (pred.argmax(1) == y.argmax(1)).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
from collections import defaultdict
store = defaultdict(int)
for x, y in test_loader:
    for val in y:
        store[int(val.argmax(0))] += 1
for key in store:
    print(key, store[key])

In [None]:
# Plot ROC curve

plt.figure(figsize=(11,11))
ax = plt.gca()
sm = nn.Softmax(dim=0)
for atk in range(6):
    labels = [int(val[atk]) for X, y in test_loader for val in y]
    probs = [float(sm(tensor)[atk]) for tensor in preds]
    RocCurveDisplay.from_predictions(labels, probs, name=f"Class: {atk}",ax=ax)
    print(f"F1 score for Class {atk} :", f1_score(labels, [int(int(tensor.argmax(0)) == atk) for tensor in preds]))
plt.show()

In [None]:
# ROC for OOD evaluation. Positive class is ID

model.eval()
preds = []
sm = nn.Softmax(dim=0)
labels = []
with torch.no_grad():
    for X, y in ood_loader:
        pred = model(X)
        preds.extend(pred)
        labels.extend(y)

plt.figure(2, figsize=(11,11))
ax = plt.gca()
msp = [float(max(sm(tensor))) for tensor in preds]
RocCurveDisplay.from_predictions(labels, msp, ax=ax)
plt.show(2)

In [None]:
# ROC for OOD evaluation. Positive class is OOD

model.eval()
preds = []
sm = nn.Softmax(dim=0)
labels = []
with torch.no_grad():
    for X, y in ood_loader:
        pred = model(X)
        preds.extend(pred)
        labels.extend(y)

plt.figure(3, figsize=(11,11))
ax = plt.gca()
msp = [-float(max(sm(tensor))) for tensor in preds]
RocCurveDisplay.from_predictions([0 if i == 1 else 1 for i in labels], msp, ax=ax)
plt.show(3)

In [None]:
# PR for OOD evaluation. Positive class is ID

model.eval()
preds = []
sm = nn.Softmax(dim=0)
labels = []
with torch.no_grad():
    for X, y in ood_loader:
        pred = model(X)
        preds.extend(pred)
        labels.extend(y)

plt.figure(4, figsize=(11,11))
ax = plt.gca()
msp = [float(max(sm(tensor))) for tensor in preds]
PrecisionRecallDisplay.from_predictions(labels, msp, ax=ax)
plt.show(4)

In [None]:
# PR for OOD evaluation. Positive class is OOD

model.eval()
preds = []
sm = nn.Softmax(dim=0)
labels = []
with torch.no_grad():
    for X, y in ood_loader:
        pred = model(X)
        preds.extend(pred)
        labels.extend(y)

plt.figure(3, figsize=(11,11))
ax = plt.gca()
msp = [-float(max(sm(tensor))) for tensor in preds]
PrecisionRecallDisplay.from_predictions([0 if i == 1 else 1 for i in labels], msp, ax=ax)
plt.show(3)