In [None]:
from sklearn import tree
from xgboost import XGBClassifier
import pandas as pd
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from sklearn import preprocessing
from sklearn.ensemble import RandomForestClassifier
import numpy as np

In [None]:
train_file_path = "../Data/cleaned_dataset_train.csv"
test_file_path = "../Data/cleaned_dataset_test.csv"

train_df = pd.read_csv(train_file_path,skipinitialspace=True)
test_df = pd.read_csv(test_file_path,skipinitialspace=True)

print("Shape of training dataset:", train_df.shape)
print("Shape of testing dataset:", test_df.shape)

# separate labels from training and testing dataset
X_train = train_df.drop(train_df.columns[-1], axis=1)
y_train = train_df[train_df.columns[-1]]
X_test = test_df.drop(test_df.columns[-1], axis=1)
y_test = test_df[test_df.columns[-1]]

Calculate feature importance

In [None]:
features = X_train.columns
feature_importance_scores = np.zeros(X_train.shape[1])

scaler = preprocessing.StandardScaler()
scaler.fit(X_train)
X_train_scaled = scaler.transform(X_train)

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)

for i, (train_index, val_index) in enumerate(skf.split(X_train, y_train)):
    print(f"Starting fold: {i+1} ...")
    # train a random forest classifier
    rf_clf = RandomForestClassifier(
        n_estimators=20,
        max_depth=None,
        bootstrap=False,
        n_jobs=-1,
        random_state=0
        )
    
    rf_clf.fit(X_train_scaled[train_index], y_train[train_index])
    y_val_pred = rf_clf.predict(X_train_scaled[val_index])
    accuracy = accuracy_score(y_train[val_index], y_val_pred)
    
    # accumulate feature importance scores
    feature_importance_scores += rf_clf.feature_importances_

feature_importance_scores /= 5 # divide by 5 to find average

# create a DataFrame to store feature importance scores
feature_importance_df = pd.DataFrame(
    {
        "feature": features,
        "importance": feature_importance_scores
    })

# sort features by importance scores in descending order
feature_importance_df = feature_importance_df.sort_values(
    by="importance",
    ascending=False
    )

Train and test the classifiers with the selected features

In [None]:
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from imbens.metrics import classification_report_imbalanced
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate
import time

def calculate_macro_avg(report_imbens):
    macro_avg = []
    lines = report_imbens.strip().split("\n")[2:-2]
    data = { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [] }
    for line in lines:
        values = line.split()
        metrics = [float(value) for value in values[1:-1]]
        for i, metric in enumerate(metrics):
            data[i].append(metric)
    for _, v in data.items():
        macro_avg.append(np.array(v).mean())
    
    macro_avg = [str("{:.4f}".format(round(x, 4))) for x in macro_avg]  # convert each float to a string
    macro_avg = "    ".join(macro_avg) # join the string representations of floats with spaces
    macro_avg = "macro avg       " + macro_avg
    
    return macro_avg

def perf_evaluation(y_true, y_pred, class_labels, name, save, training_time=None, testing_time=None):
    cm_normalized = confusion_matrix(y_true, y_pred, normalize="true")
    cm = confusion_matrix(y_true, y_pred)
    accuracy = accuracy_score(y_true, y_pred) * 100
    report_imbens = classification_report_imbalanced(y_true, y_pred, digits=4)
    
    # calculate macro average from imbalanced report
    macro_avg = calculate_macro_avg(report_imbens=report_imbens)

    # calculate TP, TN, FP, FN for each class
    class_tp, class_tn, class_fp, class_fn = {}, {}, {}, {}
    for i, label in enumerate(class_labels):
        tp = cm[i, i]
        tn = cm.sum() - cm[i, :].sum() - cm[:, i].sum() + tp
        fp = cm[:, i].sum() - tp
        fn = cm[i, :].sum() - tp
        class_tp[label] = tp
        class_tn[label] = tn
        class_fp[label] = fp
        class_fn[label] = fn
    
    # print ************************************************
    print()
    print(f"Training Time: {training_time} seconds")
    print(f"Testing Time: {testing_time} seconds")
    print(f"Total Accuracy: {accuracy}")
    print("Classification Report based on IMBENS")
    print(report_imbens)
    print(macro_avg)
    print()
    # print ************************************************

    if save:
        # save the training and testing time, accuracy, report, and TP, TN, FP, FN for each class
        with open(str(name) + "_results.txt", "w") as file:
            file.write(f"Training Time: {training_time} seconds\n")
            file.write(f"Testing Time: {testing_time} seconds\n")
            file.write(f"Total Accuracy: {accuracy}\n")
            file.write("Classification Report based on IMBENS\n")
            file.write(report_imbens)
            file.write(macro_avg)
            file.write("\n\n")
            for label in class_labels:
                file.write(f"Class: {label}\n")
                file.write(f"True Positives (TP): {class_tp[label]}\n")
                file.write(f"True Negatives (TN): {class_tn[label]}\n")
                file.write(f"False Positives (FP): {class_fp[label]}\n")
                file.write(f"False Negatives (FN): {class_fn[label]}\n\n")

        # save the confusion matrix
        plt.figure(figsize=(15, 15))
        sns.heatmap(cm_normalized,
                    annot=True,
                    fmt=".4f",
                    cmap="viridis",
                    square=True,
                    xticklabels=class_labels,
                    yticklabels=class_labels
                    )
        plt.xlabel("Predicted Labels")
        plt.ylabel("True Labels")
        plt.title("Confusion Matrix")
        plt.savefig(str(name) + "_cm.png")
        plt.close()

    return cm_normalized, report_imbens, accuracy

# Performance evaluation auxiliary method to combine the folds
def perf_evaluation_auxiliary(cms_normalized, reports_imbens, avg_accuracy, class_labels, name):
    # calculate the average from the average imbalanced reports
    sum_metrics = []
    headers = []
    for report in reports_imbens:
        lines = report.strip().split("\n")
        headers = lines[0].split()
        data_list = []
        for line in lines[2:-2]:
            values = line.split()
            metrics = [float(value) for value in values[1:]]
            data_list.append(metrics)
        data_array = np.array(data_list)
        sum_metrics.append(data_array)
    avg_report_imbens = tabulate(np.mean(sum_metrics, axis=0), headers=headers, tablefmt="plain", floatfmt=".4f")
    
    # calculate weighted average from the average imbalanced reports
    weighted_avg_list = []
    for report in reports_imbens:
        lines = report.strip().split("\n")
        weighted_avg = [float(v) for v in lines[-1].split()[3:-1]]
        weighted_avg_list.append(weighted_avg)
    weighted_avg_list = np.array(weighted_avg_list)
    weighted_avg = np.mean(weighted_avg_list, axis=0)
    weighted_avg = [str("{:.4f}".format(round(x, 4))) for x in weighted_avg]  # convert each float to a string
    weighted_avg = "  ".join(weighted_avg) # join the string representations of floats with spaces

    # calculate macro average from the average imbalanced report
    macro_avg = []
    lines = avg_report_imbens.strip().split("\n")[1:]
    data = { 0: [], 1: [], 2: [], 3: [], 4: [], 5: [] }
    for line in lines:
        values = line.split()
        metrics = [float(value) for value in values[:-1]]
        for i, metric in enumerate(metrics):
            data[i].append(metric)
    for _, v in data.items():
        macro_avg.append(np.array(v).mean())
    macro_avg = [str("{:.4f}".format(round(x, 4))) for x in macro_avg]  # convert each float to a string
    macro_avg = "  ".join(macro_avg) # join the string representations of floats with spaces

    # save the average accuracy and average report
    with open(str(name) + "_avg-results.txt", "w") as file:
        file.write(f"Average Stratified 5-Fold Cross Validation - Accuracy: {avg_accuracy}\n")
        file.write("Average Stratified 5-Fold Cross Validation - Classification Report based on IMBENS\n")
        file.write(avg_report_imbens)
        file.write("\n\n")
        file.write(weighted_avg + "  (weighted avg)\n")
        file.write(macro_avg + "  (macro avg)\n")
    
    # calculate and save the average confusion matrix
    stack_cms = np.stack(cms_normalized)
    average_cm = np.mean(stack_cms, axis=0)
    plt.figure(figsize=(15, 15))
    sns.heatmap(average_cm,
                annot=True,
                fmt=".4f",
                cmap="viridis",
                square=True,
                xticklabels=class_labels,
                yticklabels=class_labels
                )
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.title("Confusion Matrix")
    plt.savefig(str(name) + "_cm.png")
    plt.close()

# Cross Validation Method
def cross_validation(clf, X_train, y_train, skf, class_labels, name):
    cms_normalized = []
    reports_imbens = []
    avg_accuracy = 0
    
    for train_index, val_index in skf.split(X_train, y_train):
        clf.fit(X_train[train_index], y_train[train_index]) # train
        y_val_pred = clf.predict(X_train[val_index]) # test on validation data set
        
        cm_normalized, report_imbens, accuracy = perf_evaluation(y_true=y_train[val_index], y_pred=y_val_pred, class_labels=class_labels, name=name, save=False)
        
        cms_normalized.append(cm_normalized)
        reports_imbens.append(report_imbens)
        avg_accuracy += accuracy

    avg_accuracy /= 5
    
    perf_evaluation_auxiliary(cms_normalized, reports_imbens, avg_accuracy, class_labels, name)

# Test Evaluation Method
def test_evaluation(clf, X_train, y_train, X_test, y_test, class_labels, name):
    start_time = time.time()
    clf.fit(X_train, y_train)
    end_time = time.time()
    training_time = end_time - start_time
    
    start_time = time.time()
    y_pred = clf.predict(X_test)
    end_time = time.time()
    testing_time = end_time - start_time
    
    perf_evaluation(y_test, y_pred, class_labels, name, True, training_time, testing_time, )

class_labels = [
    "Benign",
    "Bot",
    "DDoS",
    "DoS_Golden_Eye",
    "DoS_Hulk",
    "DoS_Slowhttptest",
    "DoS_Slowloris",
    "FTP_Patator",
    "Heartbleed",
    "Infiltration",
    "PortScan",
    "SSH_Patator",
    "WebAttack_Brute_Force",
    "WebAttack_Sql_Injection",
    "WebAttack_XSS"
    ]

# classifiers
dt_clf = tree.DecisionTreeClassifier(criterion="gini", max_depth=None, min_samples_split=2, min_samples_leaf=1)
xgboost_clf = XGBClassifier()

# stratified k fold
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# select top n features
top_n_features = [15, 20, 25, 30, 35, 40]

for n in top_n_features:
    selected_top_features = []
    cumulative_importance_top_features = 0

    for i, (_, row) in enumerate(feature_importance_df.iterrows()):
        if i >= n:
            break
        selected_top_features.append(row["feature"])
        cumulative_importance_top_features += row["importance"]

    # separate labels from training and testing dataset
    X_train = train_df.drop(train_df.columns[-1], axis=1)
    y_train = train_df[train_df.columns[-1]]
    X_test = test_df.drop(test_df.columns[-1], axis=1)
    y_test = test_df[test_df.columns[-1]]

    # maintain top n features
    X_train = X_train.loc[:, selected_top_features]
    X_test = X_test.loc[:, selected_top_features]

    print(f"Selected top {len(selected_top_features)} features with cumulative importance of {cumulative_importance_top_features}")

    scaler = preprocessing.StandardScaler()
    scaler.fit(X_train)
    X_train_scaled = scaler.transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    test_evaluation(clf=dt_clf, X_train=X_train_scaled, y_train=y_train, X_test=X_test_scaled, y_test=y_test, class_labels=class_labels, name="dt_test-eval_"+str(n)+"-features_"+str("{:.4f}".format(round(cumulative_importance_top_features, 4)))+"-cum-importance")
    cross_validation(clf=dt_clf, X_train=X_train_scaled, y_train=y_train, skf=skf, class_labels=class_labels, name="dt_5-fold-cv_"+str(n)+"-features_"+str("{:.4f}".format(round(cumulative_importance_top_features, 4)))+"-cum-importance")