CS446 FINAL PROJECT - Machine Learning in Intrusion Detection Systems (IDS)

Group Members: Jophene Campbell and Sophia Sasko

Contributions:

Jophene - 

Sophia - 

In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.preprocessing import LabelEncoder, StandardScaler, KBinsDiscretizer
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.feature_selection import mutual_info_classif

from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.base import clone

sns.set()
RANDOM_STATE = 42
TOP_K = 30     # prevents class collapse
CV_FOLDS = 5


In [None]:
def load_nsl_kdd(path="./Dataset", train="KDDTrain+.txt", test="KDDTest+.txt"):
    train_path = os.path.join(path, train)
    test_path = os.path.join(path, test)

    df_train = pd.read_csv(train_path, header=None)
    df_test  = pd.read_csv(test_path,  header=None)

    df = pd.concat([df_train, df_test], ignore_index=True)

    df.columns = [*(f"f{i}" for i in range(df.shape[1]-1)), "label"]

    print("Loaded data:", df.shape)
    return df

df = load_nsl_kdd()
df.head()


In [None]:
def preprocess(df):
    df = df.copy()

    df["label"] = df["label"].apply(lambda x: "normal" if str(x).lower()=="normal" else "attack")

    X = df.drop(columns=["label"])
    y = df["label"]

    for col in X.select_dtypes(include=["object"]).columns:
        X[col] = LabelEncoder().fit_transform(X[col])

    y = LabelEncoder().fit_transform(y)

    X = pd.DataFrame(StandardScaler().fit_transform(X), columns=X.columns)

    print("Preprocessing complete.")
    print("X shape:", X.shape)
    print("y shape:", y.shape)

    return X, y

X, y = preprocess(df)


In [None]:
def igr_ranking(X, y):
    mi = mutual_info_classif(X.values, y, random_state=RANDOM_STATE)
    igr = mi / (mi.sum() + 1e-9)
    out = pd.Series(igr, index=X.columns).sort_values(ascending=False)
    return out

igr_scores = igr_ranking(X, y)
igr_scores.head(10)


In [None]:
def cfs_ranking(X, y):
    mi = mutual_info_classif(X.values, y)
    corr = X.corr().abs()
    np.fill_diagonal(corr.values, 0)

    redundancy = corr.mean(axis=1) + 1e-9
    scores = mi / redundancy
    out = pd.Series(scores, index=X.columns).sort_values(ascending=False)
    return out

cfs_scores = cfs_ranking(X, y)
cfs_scores.head(10)


In [None]:
top_igr = igr_scores.head(TOP_K).index.tolist()
top_cfs = cfs_scores.head(TOP_K).index.tolist()

print("Top IGR features:", top_igr)
print("Top CFS features:", top_cfs)


In [None]:
def evaluate(X, y, feature_sets, folds=CV_FOLDS):

    classifiers = {
        "kNN": KNeighborsClassifier(n_neighbors=5),
        "DecisionTree": DecisionTreeClassifier(random_state=RANDOM_STATE),
        "SVM": SVC(kernel="rbf", probability=True, random_state=RANDOM_STATE)
    }

    skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=42)
    rows = []

    for fs_name, features in feature_sets.items():
        print(f"\nEvaluating feature set: {fs_name}")
        X_sub = X[features].values

        for clf_name, clf in classifiers.items():
            print(f"  Running classifier: {clf_name}")

            accs=[]; precs=[]; recs=[]; f1s=[]; aucs=[]

            for tr, te in skf.split(X_sub, y):

                # Skip folds that collapse into one class
                if len(np.unique(y[tr])) < 2 or len(np.unique(y[te])) < 2:
                    continue

                model = clone(clf)
                model.fit(X_sub[tr], y[tr])
                pred = model.predict(X_sub[te])

                accs.append(accuracy_score(y[te], pred))
                precs.append(precision_score(y[te], pred, zero_division=0))
                recs.append(recall_score(y[te], pred, zero_division=0))
                f1s.append(f1_score(y[te], pred, zero_division=0))

                try:
                    probs = model.predict_proba(X_sub[te])
                    if probs.shape[1] == 2:
                        aucs.append(roc_auc_score(y[te], probs[:,1]))
                    else:
                        aucs.append(np.nan)
                except:
                    aucs.append(np.nan)

            rows.append({
                "feature_set": fs_name,
                "classifier": clf_name,
                "accuracy": np.mean(accs),
                "precision": np.mean(precs),
                "recall": np.mean(recs),
                "f1": np.mean(f1s),
                "auc": np.nanmean(aucs)
            })

    results = pd.DataFrame(rows)
    print("\n=== FINAL RESULTS ===")
    display(results)
    return results

results = evaluate(X, y, {"IGR": top_igr, "CFS": top_cfs})


In [None]:
plt.figure(figsize=(6,4))
pivot = results.pivot(index="classifier", columns="feature_set", values="accuracy")
sns.heatmap(pivot, annot=True, cmap="viridis", fmt=".3f")
plt.title("Accuracy by Classifier and Feature Selection")
plt.show()


In [None]:
def plot_top(series, title):
    plt.figure(figsize=(8,6))
    sns.barplot(x=series.values[:20], y=series.index[:20])
    plt.title(title)
    plt.show()

plot_top(igr_scores, "Top IGR Features (Scores)")
plot_top(cfs_scores, "Top CFS Features (Scores)")
