Naive Bayes classification, with some code based on the tutorial from: 
https://machinelearningmastery.com/naive-bayes-classifier-scratch-python/

In [1]:
# Imports
from math import sqrt, exp, pi
import numpy as np
from ucimlrepo import fetch_ucirepo

In [2]:
# Check if data exists, download if required
rt_iot2022 = fetch_ucirepo(id=942) 

KeyboardInterrupt: 

In [None]:
# Remove axes that don't contribute to data, and get labels
features = rt_iot2022.data.features
targets = rt_iot2022.data.targets

unique = features.nunique(axis=0)
for i in range(len(unique)):
    if unique[i] <= 1:
        print("Removing", features.axes[1][i])
        del features[features.axes[1][i]]
# bwd_URG_flag count is always the same, usually removed

# Get labels for data (attack or normal behavior)
normal_patterns = ["MQTT_Publish", "Thing_Speak", "Wipro_bulb", "Amazon-Alexa"]
y_bool = [int(x in normal_patterns) for x in targets.values]

In [None]:
# convert string to numerical
unique_service = list(features["service"].unique())
unique_proto = list(features["proto"].unique())
for i in range(len(features["service"])):
    features.loc[i, "service"] = unique_service.index(features["service"][i])
    features.loc[i, "proto"] = unique_proto.index(features["proto"][i])

In [None]:
continuous_vars = rt_iot2022.variables[rt_iot2022.variables['type'] == "Continuous"]['name'].tolist()
features = features[continuous_vars]

In [None]:
# Split data into train/test sets
X = features.values
y = np.array(y_bool)
np.random.seed(2)

test_split = .2
test_samples = int(len(X) * test_split)
perm  = np.random.permutation(len(X))
X = X[perm]
y = y[perm]

x_train = X[0:test_samples]
y_train = y[0:test_samples]
x_test  = X[test_samples:]
y_test  = y[test_samples:]

In [None]:
# Separate data by class
def separate_by_class(data, labels):
    separated = dict()
    for i in range(len(data)):
        vector = data[i]
        if labels[i] not in separated:
            separated[labels[i]] = list()    
        separated[labels[i]].append(vector)
    return separated

separated_train = separate_by_class(x_train, y_train)
separated_test = separate_by_class(x_test, y_test)

In [None]:
# Statistical measures
def mean(numbers):
    return sum(numbers)/float(len(numbers))

def stddev(numbers):
    variance = sum([(x - mean(numbers))**2 for x in numbers]) / float(len(numbers)-1)
    return sqrt(variance)

def summarize(dataset):
    summaries = []
    for column in zip(*dataset):
        # changed to numpy versions for efficency reasons (50 mins vs a few seconds)
        column = np.array(column)
        summaries.append([np.mean(column), np.std(column), len(column)])
    return summaries

# pass in the separated dataset
def summarize_classwise(dataset):
    summaries = dict()
    for classy, rows in dataset.items():
        summaries[classy] = summarize(rows)
    return summaries

In [None]:
summarized_test = summarize_classwise(separated_test)
summarized_train = summarize_classwise(separated_train)

In [None]:
# gaussian prob dist calculation
def calc_prob(x, mean, std):
        exponential = exp(-((x-mean)**2 / (2 * std**2)))
        return (1 / (sqrt(2 * pi) * std)) * exponential


def calc_class_prob(summaries, row):
    probs = dict()
    total_rows = sum([summaries[label][0][2] for label in summaries])
    for i, val in summaries.items():
        probs[i] = summaries[i][0][2] / float(total_rows)
        for j in range(len(val)):
            mean, std, _ = val[j]
            probs[i] *= calc_prob(row[j], mean, std)
    return probs


In [None]:
# Remove all values with a standard deviation of zero in train
std_zero = []
for i in range(len(summarized_train[0])):
    if summarized_train[0][i][1] == 0 or summarized_train[1][i][1] == 0:
        std_zero.append(i)

x_test = np.transpose(x_test)
x_train = np.transpose(x_train)


for i in reversed(range(len(std_zero))):
    summarized_test[0].pop(std_zero[i])
    summarized_test[1].pop(std_zero[i])
    summarized_train[0].pop(std_zero[i])
    summarized_train[1].pop(std_zero[i])
    x_test = np.delete(x_test, std_zero[i], 0)
    x_train = np.delete(x_train, std_zero[i], 0)

x_test = np.transpose(x_test)
x_train = np.transpose(x_train)

In [None]:
probs = calc_class_prob(summarized_train, x_train[15])

predicted = []
for i in x_test:
    probs = calc_class_prob(summarized_train, i)
    if probs[0] > probs[1]:
        predicted.append(0)
    else:
        predicted.append(1)


In [None]:
true_positive = 0
true_negative = 0
false_positive = 0
false_negative = 0

for i in range(len(predicted)):
    if predicted[i] == 1:
        if predicted[i] == y_test[i]:
            true_positive += 1
        else:
            false_positive += 1
    elif predicted[i] == 0:
        if predicted[i] == y_test[i]:
            true_negative += 1
        else:
            false_negative += 1

pos_acc = true_positive / (true_positive + false_positive)
neg_acc = true_negative / (true_negative + false_negative)

print("Naive bayes accuracy is:", (true_positive + true_negative)/len(predicted))
print("Balanced Acc is: ", (pos_acc+neg_acc)/2)

In [None]:
# y_bool of positive is normal behavior, negative is attack packet
print("Detected ", true_negative, " attacks from ", len(predicted))
print("Incorrectly detected ", false_negative, " attacks from ", len(predicted))

print("Missed ", false_positive, " attacks from ", len(predicted))
print("Correctly asserted ", true_positive, " packets as non harmful")