MIT License

Copyright (c) 2022 John Shahla

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

In [None]:
# Import libraries 
import joblib

import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

import torch

import torch.nn as nn

import torch.nn.functional as nnf

from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler

from sklearn import metrics

# SVM for multi-class classification using one-vs-rest
from sklearn.svm import SVC
from sklearn.multiclass import OneVsRestClassifier
from sklearn.model_selection import KFold

# K-Fold Validation
from numpy import mean
from numpy import std
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score

In [None]:
NROWS = 40000

In [None]:
# Read and clean X-IIoTID Dataset

class DatasetHandler():
    
    def __init__(self):

        self.data = pd.read_csv(PATH_CSV,nrows = NROWS)

        # Encode categorical columns
        self.data = pd.get_dummies(self.data, columns=['Protocol', 'Service'])
        
        # Defining target class
        self.y = self.data['class2']
        


        self.data = self.data.drop(['Date', 'Timestamp', 'Scr_IP', 'Des_IP', 'class1', 'class2', 'class3'], axis=1)
        
        # Defining non-numbers values
        self.data = self.data.replace({False: 0, 'FALSE': 0, 'false': 0, True: 1, 'TRUE': 1, 'true': 1, '-': np.nan, '?': np.nan, '' : np.nan, ' ' : np.nan}).replace({'[A-Za-z]': np.nan}, regex=True)
        
        # Delete all columns that has 30%+ of NaN values in all their records
        for col in self.data.columns:
            count = self.data[col].isna().sum()
            
            if count/NROWS * 100 > 30.0:
                self.data = self.data.drop(col, axis=1)
        
        # Remove any record that has NaN in it's features and resetting index for data
        to_del = []
        for col in self.data.columns:
            to_del.append(self.data[self.data[col].isin([np.nan])].index.tolist())

        to_del =  [j for sub in to_del for j in sub]

        self.data = self.data.drop(to_del, axis=0)
        self.data = self.data.reset_index()
        self.X_data= self.data
        self.y = self.y.drop(to_del, axis=0)
        self.encoded_labels = self.y.unique()

        
        # Encode target class with LabelEncoder
        self.le = LabelEncoder()
        self.y_data = self.le.fit_transform(self.y)
        
        # Convert all features into float32 for the neural network
        self.X_data = self.X_data.astype('float32')     

        # Convert training subset-dataset and deleted subset-dataset into ndarray
        self.X_data = self.X_data.values

        # Scaling features to unit variance
        self.scaler = MinMaxScaler()
        self.X_data = self.scaler.fit_transform(self.X_data)

        self.X_stored = self.X_data
        self.y_stored = self.y_data

        
    # Splitting our dataset into training and testing datasets
    def get_imbalanced_splits(self, min_num_rec):
        
        self.imbalancedSamplesX = []
        self.imbalancedSamplesY = []    
        
        self.X_data = self.X_stored
        self.y_data= self.y_stored
        
        # Remove the class_to_del class from class2 from training dataset in order to test it vs OSR model
        to_del2 = np.where(self.y_stored == 3)[0]
        to_del3 = [i for i in to_del2]

        self.x_deleted = self.X_stored[to_del3]        
        self.X_data = np.delete(self.X_data,to_del3,axis = 0)

        self.y_data = np.delete(self.y_data,to_del3,axis = 0)

        self.x_deleted = self.scaler.fit_transform(self.x_deleted)
        for i in range(len(np.unique(self.y_stored))):
            self.s = np.where(self.y_data==i)[0]

            if len(self.s) > min_num_rec : 
                
                self.imbalancedSamplesX.extend(self.X_data[self.s[:]])
                self.imbalancedSamplesY.extend(self.y_data[self.s[:]])
                
            continue
        
        self.imbalancedSamplesX = np.array(self.imbalancedSamplesX)
        self.imbalancedSamplesY = np.array(self.imbalancedSamplesY)
        self.encoded_labels = self.encoded_labels[[np.unique(self.imbalancedSamplesY)]]
        if len(self.imbalancedSamplesY) == 0 :
            return -1,-1,-1,-1,-1
        
        self.imbalancedSamplesY = self.le.fit_transform(self.imbalancedSamplesY)
        
        self.ibxtrain , self.ibxtest , self.ibytrain, self.ibytest = train_test_split(self.imbalancedSamplesX,self.imbalancedSamplesY, test_size=0.2, random_state=3)
        return  self.ibxtrain , self.ibxtest , self.ibytrain, self.ibytest, self.x_deleted
    
    def get_balanced_splits(self, min_num_rec, max_num_rec): 
        
        self.balancedSamplesX = []
        self.balancedSamplesY = []    
        
        self.X_data = self.X_stored
        self.y_data= self.y_stored
        
        # Remove the class_to_del class from class2 from training dataset in order to test it vs OSR model
        to_del2 = np.where(self.y_stored == 3)[0]
        to_del3 = [i for i in to_del2]

        self.x_deleted = self.X_stored[to_del3]        
        self.X_data = np.delete(self.X_data,to_del3,axis = 0)

        self.y_data = np.delete(self.y_data,to_del3,axis = 0)

        self.x_deleted = self.scaler.fit_transform(self.x_deleted)
        for i in range(len(np.unique(self.y_stored))):
            self.s = np.where(self.y_data==i)[0]

            if len(self.s) > min_num_rec :

                try : 
                    
                    self.balancedSamplesX.extend(self.X_data[self.s[:max_num_rec]])
                    self.balancedSamplesY.extend(self.y_data[self.s[:max_num_rec]])
                except : 
                    self.balancedSamplesX.extend(self.X_data[self.s])
                    self.balancedSamplesY.extend(self.y_data[self.s])
                continue
        
        self.balancedSamplesX = np.array(self.balancedSamplesX)
        self.balancedSamplesY = np.array(self.balancedSamplesY)
        self.encoded_labels = self.encoded_labels[[np.unique(self.balancedSamplesY)]]
        if len(self.balancedSamplesY) == 0 :
            return -1,-1,-1,-1,-1
        
        self.balancedSamplesY = self.le.fit_transform(self.balancedSamplesY)
        
        self.bxtrain , self.bxtest , self.bytrain, self.bytest = train_test_split(self.balancedSamplesX,self.balancedSamplesY, test_size=0.2, random_state=3)
        return  self.bxtrain , self.bxtest , self.bytrain, self.bytest, self.x_deleted
        
        
    #get number of lables 
    def get_num_labels(self):
        return len(np.unique(self.y_data))

    #get number of featuers 
    def get_num_features(self):
            return self.X_data.shape[1]

In [None]:
def svm_predict(row, model):
    
    t = model.predict_proba([row]).tolist()[0]
    y = max(t)
    k = t.index(y)
    return y, k

In [None]:
# Calculate the best theshold for the unknown classes 
# Error rate is True Unknown predictions + False Known predictions 
# Modfiy true_negative and false_negative for bigger emphasis on error rate 
# i.g for bigger emphasis on true_negative accuracy (True unkown predictions) use : true_negative  * 2 

def svm_get_threshold (X_data, x_deleted, range_thresh, model):
    
    lowest_error = 0 
    best_thresh = 0 
    min_error = 100
    
    # Change the len for more accurate reults
    # The bigger range the more accurate results 
    deleted_len = 500 if x_deleted.shape[0]>500 else x_deleted.shape[0]
    
    
    # Plotting true_negative (true unknown classes) and false_negative (false known classes) with thresholds 
    thresh_li = []
    true_negative_li = []
    false_negative_li = []
    error_list = [] 
    acc_list = []
    pred_del = []
    pred_test = [] 
    
    for record_idx in range(deleted_len):
        pred_del.append(svm_predict(x_deleted[record_idx],model)[0])
        pred_test.append(svm_predict(X_data[record_idx],model)[0])
    pred_deleted = np.array(pred_del)
    pred_tested = np.array(pred_test)

    for thresh in [th/1000 for th in range(range_thresh[0],range_thresh[1])]:
        error_combined  = 0 
        true_negative = 0 
        false_negative  = 0 
        thresh_li.append(thresh)
        true_negative = sum(pred_deleted < thresh)
        false_negative = sum(pred_tested < thresh)
        true_negative_li.append(100 -true_negative/deleted_len*100)
        false_negative_li.append(false_negative/deleted_len*100)
        error_combined = (100 - true_negative/deleted_len*100 + false_negative/deleted_len*100 )/2
        error_list.append(error_combined)
        if error_combined < min_error :
            min_error = error_combined
            best_thresh = thresh 
            lowest_error  = min_error
            acc_list = [true_negative/deleted_len*100, false_negative/deleted_len*100]

    return lowest_error/100, best_thresh, acc_list, thresh_li, true_negative_li, false_negative_li, error_list

In [None]:
def svm_draw_thresh_plots(thresh_li , true_negative_li, false_negative_li , error_list):
    plt.plot(thresh_li,false_negative_li, label = 'False Negative (False Known Predictions)')
    plt.plot(thresh_li,true_negative_li ,label = 'True Negative (True Unknown Predictions)')
    plt.plot(thresh_li, error_list , label = 'Error rate (For Both Knowns and Unknowns)' )
    plt.xlabel('Threshold')
    plt.ylabel('Percentage')
    plt.title('Thresh/Error rate')
    plt.show()

In [None]:
# Balanced Testing

df = DatasetHandler()

modell = SVC(probability = True)

# define ovo strategy
ovo = OneVsRestClassifier(modell)

X_train , X_test , y_train , y_test, xdel = df.get_balanced_splits(300, 500)

# fit model
ovo.fit(X_train, y_train)

# make predictions
y_predd = ovo.predict(X_test)

lowest_error, best_thresh, acc_list, thresh_li, true_negative_li, false_negative_li, error_list = svm_get_threshold(X_test, xdel, [100,999], ovo)
svm_draw_thresh_plots(thresh_li , true_negative_li, false_negative_li, error_list)

classification = metrics.classification_report(y_test, y_predd)

# Define K-Fold 10
cv = KFold(n_splits=10, random_state=1, shuffle=True)

# Evaluate model
scores = cross_val_score(modell, X_test, y_test, scoring='accuracy', cv=cv, n_jobs=-1)

# Report performance
res =  classification + '\n' + 'With k-fold=10 Accuracy: %.3f (%.3f)' % (mean(scores), std(scores))
res = res + '\n \n' + 'Lowest Error ' + str(lowest_error) 
res = res + '\n \n' + 'Best Thresh ' + str(best_thresh)

print(res)

In [None]:
# Imbalanced Testing

df = DatasetHandler()

modell = SVC(probability = True)

# define ovo strategy
ovo = OneVsRestClassifier(modell)

X_train , X_test , y_train , y_test, xdel = df.get_imbalanced_splits(3000)

# fit model
ovo.fit(X_train, y_train)

# make predictions
y_predd = ovo.predict(X_test)

lowest_error, best_thresh, acc_list, thresh_li, true_negative_li, false_negative_li, error_list = svm_get_threshold(X_test, xdel, [100,999], ovo)
svm_draw_thresh_plots(thresh_li , true_negative_li, false_negative_li, error_list)

classification = metrics.classification_report(y_test, y_predd)

cv = KFold(n_splits=10, random_state=1, shuffle=True)

# Evaluate model
scores = cross_val_score(modell, X_test, y_test, scoring='accuracy', cv=cv, n_jobs=-1)

# Report performance
res =  classification + '\n' + 'With k-fold=10 Accuracy: %.3f (%.3f)' % (mean(scores), std(scores))
res = res + '\n \n' + 'Lowest Error ' + str(lowest_error) 
res = res + '\n \n' + 'Best Thresh ' + str(best_thresh)

print(res)

In [None]:
# Create a testing shuffled dataset (contains known and unknown classes) to try and predict it's classes.

test_1 = pd.DataFrame(df.X_data[:1000])

test_2 = pd.DataFrame(df.x_deleted[:1000])

x_test_known_unknows = pd.concat([test_1, test_2], axis=0)

x_test_known_unknows = x_test_known_unknows.sample(frac=1).reset_index(drop=True)

x_test_known_unknows = np.array(x_test_known_unknows)

In [None]:
def predict_all(i, row, model, thresh):
    if max(model.predict_proba(row).tolist()[0]) < thresh :
        print("Unkown Class")
    else : 
        print(df.encoded_labels[df.y_data[i]-1])

In [None]:
for i in range(2000):
    predict_all(i, [x_test_known_unknows[i]], ovo, best_thresh)