# Do some imports

In [None]:
from typing import List
import numpy as np
import pandas as pd
#!pip install --user pandas

import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F

from sklearn.metrics import accuracy_score
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import math

from brevitas.nn import QuantIdentity, QuantConv2d, QuantReLU
from brevitas.core.quant import QuantType

from dataloader import UNSW_NB15
from dataloader_quantized import UNSW_NB15_quantized

### Inspired by [this github file](https://github.com/alik604/cyber-security/blob/master/Intrusion-Detection/UNSW_NB15%20-%20Torch%20MLP%20and%20autoEncoder.ipynb)

# Get UNSW_NB15 train and test set

In [2]:
#!wget https://www.unsw.adfa.edu.au/unsw-canberra-cyber/cybersecurity/ADFA-NB15-Datasets/a%20part%20of%20training%20and%20testing%20set/UNSW_NB15_training-set.csv

In [3]:
#!wget https://www.unsw.adfa.edu.au/unsw-canberra-cyber/cybersecurity/ADFA-NB15-Datasets/a%20part%20of%20training%20and%20testing%20set/UNSW_NB15_testing-set.csv

# Define the Neural Network class

In [None]:
class QuantLeNet(Module):
    def __init__(self, input_size,hidden1, hidden2, hidden3, num_classes):
        super(QuantLeNet, self).__init__()
        self.fc1   = QuantLinear(input_size, hidden1, bias=True, weight_bit_width=8)
        self.relu1 = QuantReLU(bit_width=8)
        self.fc2   = QuantLinear(hidden1, hidden2, bias=True, weight_bit_width=8)
        self.relu2 = QuantReLU(bit_width=8)
        self.fc3   = QuantLinear(hidden2, hidden3, bias=True, weight_bit_width=8)
        self.relu3 = QuantReLU(bit_width=8)
        self.out   = QuantLinear(hidden3, num_classes, bias=False, weight_bit_width=8)
        self.out_activ_quantized = QuantIdentity(bit_width=8)

    def forward(self, x):
        out = self.fc1(x)
        out = self.relu1(out)
        out = self.fc2(out)
        out = self.relu2(out)
        out = self.fc3(out)
        out = self.relu3(out)
        out = self.out(out)
        out = self.out_activ_quantized(out)
        return out

### Define Train,   Test   and    Display_Loss_Plot    methods

In [5]:
def train(model, device, train_loader, optimizer, criterion):
    losses = []
    model.train()
    y_true = []
    y_pred = []
    
    for i, data in enumerate(train_loader, 0):
        
        # get the inputs; data is a list of [inputs, target ( or labels)]
        inputs , target = data
        
        optimizer.zero_grad()
        
        #MOVING THE TENSORS TO THE CONFIGURED DEVICE
        #inputs, target = inputs.to(device), target.to(device)
        
        #FORWARD PASS
        output = model(inputs.float())

        loss = criterion(output, target.unsqueeze(1))
        #import pdb; pdb.set_trace()
        
        #BACKWARD AND OPTIMIZE
        
        loss.backward()
        optimizer.step()
        
        # PREDICTIONS
        #pred = np.round(output.detach().numpy())
        pred = output.detach().numpy() > 0.5  
        target = target.float()
        y_true.extend(target.tolist()) 
        y_pred.extend(pred.reshape(-1).tolist())
        
        losses.append(loss.data.numpy()) 
    #print("Accuracy on training set is" , accuracy_score(y_true,y_pred))
    return losses

In [6]:
#TESTING THE MODEL
def test(model, device, test_loader):
    #model in eval mode skips Dropout etc
    model.eval()
    y_true = []
    y_pred = []
    
    # set the requires_grad flag to false as we are in the test mode
    with torch.no_grad():
        for data in test_loader:
            
            #LOAD THE DATA IN A BATCH
            inputs ,target = data
            
            # the model on the data
            output = model(inputs.float())
                       
            #PREDICTIONS
            pred = np.round(output)
            #pred = output.detach().numpy() > 0.5 
            pred = pred * 1
            target = target.float()
            y_true.extend(target.tolist()) 
            y_pred.extend(pred.reshape(-1).tolist())
        
    return accuracy_score(y_true, y_pred)

In [7]:
def display_loss_plot(losses):
    x_axis = [i for i in range(len(losses))]
    plt.plot(x_axis,losses)
    plt.title('Loss of the model')
    plt.xlabel('iterations')
    plt.ylabel('Cross entropy loss')
    plt.show()

# Define some parameters first

In [26]:
device = 'cpu'
input_size = 196      # 42 for integer encoding 196
hidden1 = 128      # 1st layer number of neurons
hidden2 = 64
hidden3 = 32
num_classes = 1    # binary classification

num_epochs = 200
batch_size = 5000 
lr = 0.001        

# Initialize Neural Network class

In [27]:
model = QuantLeNet(input_size, hidden1, hidden2, hidden3, num_classes)

# Define loss and optimizer 

In [28]:
criterion = nn.BCELoss()
#optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)
optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.999))

# Initialize UNSW_NB15 class

In [12]:
#these are not slitted into validation and train set
train_dataset = UNSW_NB15(file_path ='data/UNSW_NB15_training-set.csv')
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

#get the test dataframe
test_dataset = UNSW_NB15(file_path ='data/UNSW_NB15_testing-set.csv')
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

#Get the Quantized versions
train_quantized_dataset = UNSW_NB15_quantized(file_path_train='data/UNSW_NB15_training-set.csv', \
                                              file_path_test = "data/UNSW_NB15_testing-set.csv", \
                                              train=True)
train_quantized_loader = DataLoader(train_quantized_dataset, batch_size=batch_size, shuffle=True)

test_quantized_dataset = UNSW_NB15_quantized(file_path_train='data/UNSW_NB15_training-set.csv', \
                                              file_path_test = "data/UNSW_NB15_testing-set.csv", \
                                              train=False)
test_quantized_loader = DataLoader(test_quantized_dataset, batch_size=batch_size, shuffle=True)

torch.Size([175341, 197])
torch.Size([82332, 197])
torch.Size([175341, 197])
torch.Size([82332, 197])


# Lets Train, Test the model and see the loss

In [29]:
train_loader = train_quantized_loader
test_loader = test_quantized_loader

In [None]:
running_loss = []
for epoch in tqdm(range(num_epochs)):
        loss_epoch = train(model, device, train_loader, optimizer,criterion)
        running_loss.append(loss_epoch)
#Save the model!!
torch.save(model.state_dict(), "MLP_model")

HBox(children=(FloatProgress(value=0.0, max=200.0), HTML(value='')))

In [None]:
#model.load_state_dict(torch.load("MLP_model"))
test(model,device,test_loader)

In [None]:
loss_per_epoch = [np.mean(loss_per_epoch) for loss_per_epoch in running_loss]
display_loss_plot(loss_per_epoch)

**********************************************************************************************************************

# Results

    num_epochs = 200
    batch_size = 8000 
    lr = 0.001
    accuracy on test set = 0.7235218384103387
<img src="data/train_not_quantized_72.PNG">