In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, f1_score, roc_curve, precision_recall_curve
from sklearn.model_selection import train_test_split
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Model

from datasets import load_dataset


In [None]:
# Load NSL-KDD Dataset
data = load_dataset("Mireu-Lab/NSL-KDD")
df_train = pd.DataFrame(data['train'])
df_test = pd.DataFrame(data['test'])


In [None]:
# Exploratory Data Analysis
print("Train class distribution:")
print(df_train['class'].value_counts(normalize=True))

print("\nTest class distribution:")
print(df_test['class'].value_counts(normalize=True))

numerical_cols = df_train.select_dtypes(include=np.number).columns.tolist()

plt.figure(figsize=(12, 6))
sns.boxplot(data=df_train[numerical_cols].sample(1000))
plt.xticks(rotation=90)
plt.title("Distribution of Numerical Features")
plt.show()


In [None]:
categorical_cols = ['protocol_type', 'service', 'flag']
label_col = 'class'

preprocessor = ColumnTransformer([
    ('num', StandardScaler(), numerical_cols),
    ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_cols)
])

df_train_normal = df_train[df_train[label_col] == 'normal']
X_train = preprocessor.fit_transform(df_train_normal.drop(columns=[label_col]))
X_test = preprocessor.transform(df_test.drop(columns=[label_col]))
y_test = df_test[label_col].apply(lambda x: 0 if x == 'normal' else 1).values


In [None]:
input_dim = X_train.shape[1]
autoencoder = keras.Sequential([
    layers.Input(shape=(input_dim,)),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(input_dim, activation='linear')
])
autoencoder.compile(optimizer='adam', loss='mse')


In [None]:
history = autoencoder.fit(
    X_train, X_train,
    epochs=50,
    batch_size=256,
    validation_split=0.1,
    verbose=1
)


In [None]:
X_test_pred = autoencoder.predict(X_test)
mse = np.mean(np.square(X_test - X_test_pred), axis=1)

best_f1 = 0
best_threshold = None
for z in np.arange(0.5, 3.0, 0.1):
    t = mse[y_test == 0].mean() + z * mse[y_test == 0].std()
    y_pred = (mse > t).astype(int)
    f1 = f1_score(y_test, y_pred)
    print(f"Z={z:.1f}, Threshold={t:.5f}, F1 Score={f1:.4f}")
    if f1 > best_f1:
        best_f1 = f1
        best_threshold = t

y_pred = (mse > best_threshold).astype(int)
print("Best Threshold:", best_threshold)
print(confusion_matrix(y_test, y_pred))
print(classification_report(y_test, y_pred))
print("ROC AUC:", roc_auc_score(y_test, mse))


In [None]:
precision, recall, pr_thresholds = precision_recall_curve(y_test, mse)
plt.plot(recall, precision)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.grid(True)
plt.show()


In [None]:
# Isolation Forest
iso = IsolationForest(contamination=0.5)
y_pred_iso = iso.fit_predict(X_test)
y_pred_iso = np.where(y_pred_iso == -1, 1, 0)
print("\nIsolation Forest:")
print(classification_report(y_test, y_pred_iso))

# One-Class SVM
svm = OneClassSVM(nu=0.5, kernel="rbf", gamma='scale')
svm.fit(X_train)
y_pred_svm = svm.predict(X_test)
y_pred_svm = np.where(y_pred_svm == -1, 1, 0)
print("\nOne-Class SVM:")
print(classification_report(y_test, y_pred_svm))


In [None]:
# Optional: VAE
class VAE(Model):
    def __init__(self, original_dim, intermediate_dim=64, latent_dim=32):
        super(VAE, self).__init__()
        self.encoder = keras.Sequential([
            layers.InputLayer(input_shape=(original_dim,)),
            layers.Dense(intermediate_dim, activation='relu'),
            layers.Dense(latent_dim + latent_dim)
        ])
        self.decoder = keras.Sequential([
            layers.InputLayer(input_shape=(latent_dim,)),
            layers.Dense(intermediate_dim, activation='relu'),
            layers.Dense(original_dim, activation='linear')
        ])

    def call(self, x):
        z_mean, z_log_var = tf.split(self.encoder(x), num_or_size_splits=2, axis=1)
        epsilon = tf.random.normal(shape=tf.shape(z_mean))
        z = z_mean + tf.exp(0.5 * z_log_var) * epsilon
        return self.decoder(z)

vae = VAE(input_dim)
vae.compile(optimizer='adam', loss='mse')
vae.fit(X_train, X_train, epochs=30, batch_size=256, validation_split=0.1)
