In [210]:
# ML related
from time import sleep, thread_time
import glob
import pandas as pd
import sklearn
import matplotlib.pyplot as plt
import numpy as np


# Model training
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler


# system requirements
import os, sys


# Load dataset
path = "/home/abuyusif01/dev/dtss/dt/fp/src/clean_logs/"

all_files = glob.glob(os.path.join(path, "*_label.csv"))
df = pd.concat((pd.read_csv(f) for f in all_files), ignore_index=True)


scaler = StandardScaler()

# we bascially drop the time column
fit_cols = df.select_dtypes(include=["float64", "int64"])
scale_df = scaler.fit_transform(fit_cols)
train_x = pd.DataFrame(scale_df, columns=fit_cols.columns)
train_y = df["class"]

# Train ML model

x_train, x_test, y_train, y_test = train_test_split(
    train_x,
    train_y,
    test_size=0.7,
    random_state=1,
    stratify=train_y,
)

df_analysis = pd.concat([x_test, y_test], axis=1)
print(df["class"].value_counts())


Normal                  1824
Command Injection TL     253
Command Injection TH      62
Name: class, dtype: int64


In [None]:
# 1. RandomForest Model Test Results
import pickle
from sklearn.multiclass import OneVsRestClassifier
from sklearn.ensemble import RandomForestClassifier

random_forest = OneVsRestClassifier(
    RandomForestClassifier(
        random_state=1,
        n_estimators=100,
        max_depth=10,
    )
)

random_forest.fit(x_train, y_train)




In [None]:
# 2. GB Model Test Results

from sklearn.ensemble import GradientBoostingClassifier

gb = OneVsRestClassifier(
    GradientBoostingClassifier(
        random_state=1,
        n_estimators=100,
        max_depth=10,
    )
)
gb.fit(x_train, y_train)


In [None]:
# Logistic Regression Model Test Results
from sklearn.linear_model import LogisticRegression

logistic = OneVsRestClassifier(
    LogisticRegression(
        random_state=1,
        max_iter=10000,
        solver="lbfgs",
    )
)
logistic.fit(x_train, y_train)


In [None]:
# Naive Bayes Model Test Results
from sklearn.naive_bayes import GaussianNB

naive_bayes = OneVsRestClassifier(
    GaussianNB()
)
naive_bayes.fit(x_train, y_train)


In [None]:
# 3. Stacking Model Test Results
# change final_estimator to minimum 2 models

from sklearn.ensemble import StackingClassifier
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.neural_network import MLPClassifier

stacking = OneVsRestClassifier(
    StackingClassifier(
        estimators=[
            ("rf", random_forest),
            ("gb", gb),
            ("logistic", logistic),
            ("naive_bayes", naive_bayes),
        ],
        final_estimator=MLPClassifier(
            random_state=1,
            max_iter=10000,
            hidden_layer_sizes=(100, 100),
            solver="lbfgs",
        ),
    )
)
stacking.fit(x_train, y_train)


In [None]:
import time, warnings
from turtle import color, title

import pickle  # dump model
from sklearn.metrics import plot_confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay

FIG_SIZE = 7.5


def plot_confusion_matrix(y_test, y_pred, title, labels, color, normalize=False):
    if normalize == False:
        ConfusionMatrixDisplay.from_predictions(
            y_test,
            y_pred,
            labels=labels,
            cmap=color,
            xticks_rotation="vertical",
        )

        return
    ConfusionMatrixDisplay.from_predictions(
        y_test,
        y_pred,
        labels=labels,
        cmap=color,
        normalize="true",
        xticks_rotation="vertical",
    )
    return
    # cm.plot()


models = []
models.append(("rf", random_forest))
models.append(("naive_bayes", naive_bayes))
models.append(("stacking", stacking))
models.append(("gb", gb))
models.append(("logistic", logistic))


models_name = [
    "randomForest",
    "naive_bayes",
    "stacking",
    "gradientBoost",
    "logistic",
]
labels = [
    "Normal",
    "Command Injection TL",
    "Command Injection TH",
]

model_test_accuracy = []
model_test_precision = []
model_test_recall = []
model_test_f1 = []


for x, y in models:
    y.fit(x_train, y_train)
    y_pred = y.predict(x_test)
    y_pred_proba = y.predict_proba(x_test)[:, 1]
    accuracy = sklearn.metrics.accuracy_score(y_test, y_pred)
    precision = sklearn.metrics.precision_score(y_test, y_pred, average="macro")
    recall = sklearn.metrics.recall_score(y_test, y_pred, average="macro")
    confusion_matrix = sklearn.metrics.confusion_matrix(y_test, y_pred)
    f1 = sklearn.metrics.f1_score(
        y_test, y_pred, average="macro", labels=np.unique(y_pred)
    )

    # append algorithm overall test results to list
    model_test_accuracy.append(accuracy)
    model_test_precision.append(precision)
    model_test_recall.append(recall)
    model_test_f1.append(f1)
    sklearn.metrics.multilabel_confusion_matrix(y_test, y_pred)

    # dump model
    pickle.dump(y, open("pkl_files/" + x + ".pkl", "wb"))

    print(
        """
        Model: %s 
        Dataset size: %s
        Accuracy: %f
        Precision: %f
        Recall: %f
        F1: %f
        """
        % (
            x,
            len(df),
            accuracy,
            precision,
            recall,
            f1,
        )
    )

    # overall plot_confusion_matrix
    plot_confusion_matrix(
        y_test,
        y_pred,
        x,
        labels,
        plt.cm.Blues,
    )

# plot dash graph
fig, axes = plt.subplots(figsize=(FIG_SIZE, FIG_SIZE))
axes.title.set_text("Accuracy of Models")
axes.plot(
    models_name,
    model_test_accuracy,
    label="Accuracy",
    marker="o",
    color="blue",
    linestyle="dashed",
)


# plot histogram
fig, axes = plt.subplots(figsize=(FIG_SIZE, FIG_SIZE))
axes.bar(models_name, model_test_accuracy)
axes.set_title("Accuracy")
axes.set_ylabel("Accuracy")
axes.set_xlabel("Algorithms")
axes.grid(True)
plt.show()


# float a pie chart
fig, axes = plt.subplots(figsize=(FIG_SIZE, FIG_SIZE))
axes.pie(
    model_test_accuracy,
    labels=models_name,
    autopct="%1.1f%%",
    startangle=90,
)
axes.grid(True)
axes.set_title("Accuracy Comparison")
plt.show()
