# Intrusion Detection — Inference and Evaluation

This notebook loads pre-trained models and evaluates them on the KDD test data.
It mirrors the structure of the training notebook and is designed to be run top-to-bottom.


## Setup and Imports


In [None]:
import pandas as pd
import numpy as np
import joblib
import pickle
from tensorflow import keras
from sklearn import metrics
from sklearn.preprocessing import RobustScaler
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import regularizers
import warnings
pd.set_option('display.max_columns',None)
warnings.filterwarnings('ignore')


## Load Test Data


In [None]:
df_test = pd.read_csv("../data/raw/KDDTest.txt")
df_test.info()


In [None]:
columns = (['duration','protocol_type','service','flag','src_bytes','dst_bytes','land','wrong_fragment','urgent','hot'
,'num_failed_logins','logged_in','num_compromised','root_shell','su_attempted','num_root','num_file_creations'
,'num_shells','num_access_files','num_outbound_cmds','is_host_login','is_guest_login','count','srv_count','serror_rate'
,'srv_serror_rate','rerror_rate','srv_rerror_rate','same_srv_rate','diff_srv_rate','srv_diff_host_rate','dst_host_count','dst_host_srv_count'
,'dst_host_same_srv_rate','dst_host_diff_srv_rate','dst_host_same_src_port_rate','dst_host_srv_diff_host_rate','dst_host_serror_rate'
,'dst_host_srv_serror_rate','dst_host_rerror_rate','dst_host_srv_rerror_rate','outcome','level'])


In [None]:
df_test.columns = columns


## Preprocessing


In [None]:
# Manual mapping for protocol_type
protocol_map = {'tcp': 1, 'udp': 2, 'icmp': 3}
df_test['protocol_type'] = df_test['protocol_type'].map(protocol_map)


In [None]:
df_test.info()


In [None]:
df_test.describe().style.background_gradient(cmap='Blues').set_properties(**{'font-family':'Segoe UI'})


In [None]:
df_test.loc[df_test['outcome'] == "normal", "outcome"] = 0
df_test.loc[df_test['outcome'] != 0, "outcome"] = 1


In [None]:
def pie_plot(df, cols_list, rows, cols):
    fig, axes = plt.subplots(rows, cols)
    for ax, col in zip(axes.ravel(), cols_list):
        df[col].value_counts().plot(ax=ax, kind='pie', figsize=(15, 15), fontsize=10, autopct='%1.0f%%')
        ax.set_title(str(col), fontsize = 12)
    plt.show()


In [None]:
pie_plot(df_test, ['protocol_type', 'outcome'], 1, 2)


In [None]:
df_test.head()


## DATA PREPRCESSING

### Scaling (RobustScaler)


In [None]:
def preprocess(dataframe, train_columns=None):
    # Numeric columns
    cat_cols = ['is_host_login','protocol_type','service','flag','land', 'logged_in','is_guest_login', 'level', 'outcome']
    df_num = dataframe.drop(cat_cols, axis=1)
    num_cols = df_num.columns

    # Scale numeric columns
    scaler = RobustScaler()
    scaled_df = scaler.fit_transform(df_num)
    scaled_df = pd.DataFrame(scaled_df, columns=num_cols)

    # Combine scaled numeric data with categorical
    dataframe.drop(labels=num_cols, axis="columns", inplace=True)
    dataframe[num_cols] = scaled_df

    # One-hot encode categorical columns
    dataframe = pd.get_dummies(dataframe, columns = ['protocol_type', 'service', 'flag'])

    # Ensure train and test sets have the same columns
    if train_columns is not None:
        missing_cols = set(train_columns) - set(dataframe.columns)
        for col in missing_cols:
            dataframe[col] = 0
        dataframe = dataframe[train_columns]

    return dataframe


## Load Trained Models (Classical ML)


In [None]:
model_lr = joblib.load('../artifacts/model_lr.pkl')
train_columns = joblib.load('../artifacts/train_columns.pkl')
train_columns.nunique()


In [None]:
scaled_test = preprocess(df_test, train_columns)


In [None]:
test_columns = scaled_test.columns
test_columns.nunique()


In [None]:
# Find missing columns in test data
missing_in_test = set(train_columns) - set(test_columns)

# Find extra columns in the test data that aren't in the training data
extra_in_test = set(test_columns) - set(train_columns)

for col in missing_in_test:
    scaled_test[col] = 0

# Remove extra columns from test data
scaled_test = scaled_test.drop(columns=extra_in_test, errors='ignore')

# Ensure the test data columns are in the same order as the train data
scaled_test = scaled_test[train_columns]

print("Test columns after alignment:", scaled_test.columns)


In [None]:
scaled_test.info()


## Metrics and Evaluation Helpers


In [None]:
kernal_evals = dict()
def evaluate_classification(model, name, X_train, X_test, y_train, y_test):
    # Predict class labels (apply thresholding for binary classification)
    train_predictions = (model.predict(X_train) > 0.5).astype(int)
    test_predictions = (model.predict(X_test) > 0.5).astype(int)

    # Calculate accuracy, precision, and recall
    train_accuracy = metrics.accuracy_score(y_train, train_predictions)
    test_accuracy = metrics.accuracy_score(y_test, test_predictions)

    train_precision = metrics.precision_score(y_train, train_predictions)
    test_precision = metrics.precision_score(y_test, test_predictions)

    train_recall = metrics.recall_score(y_train, train_predictions)
    test_recall = metrics.recall_score(y_test, test_predictions)

    # Store results in the dictionary
    kernal_evals[str(name)] = [train_accuracy, test_accuracy, train_precision, test_precision, train_recall, test_recall]

    # Print results
    print(f"Training Accuracy {name}: {train_accuracy * 100:.2f}%  Test Accuracy {name}: {test_accuracy * 100:.2f}%")
    print(f"Training Precision {name}: {train_precision * 100:.2f}%  Test Precision {name}: {test_precision * 100:.2f}%")
    print(f"Training Recall {name}: {train_recall * 100:.2f}%  Test Recall {name}: {test_recall * 100:.2f}%")

    # Confusion matrix
    actual = y_test
    confusion_matrix = metrics.confusion_matrix(actual, test_predictions)
    cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix=confusion_matrix, display_labels=['normal', 'attack'])

    # Plot confusion matrix
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.grid(False)
    cm_display.plot(ax=ax)
    plt.show()


In [None]:
for column in df_test.columns:
    print(column.upper(),':',df_test[column].nunique())
    # print(data_train.value_counts())


## PCA

### Dimensionality Reduction (PCA)


In [None]:
x = scaled_test.drop(['outcome', 'level'], axis=1)
print(x.columns)
x = x.values
y = scaled_test['outcome'].values
y_reg = scaled_test['level'].values
pca = PCA(n_components=20)
pca = pca.fit(x)
x_reduced = pca.transform(x)
print("Number of original features is {} and of reduced features is {}".format(x.shape[1], x_reduced.shape[1]))
y = y.astype('int')
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)
x_train_reduced, x_test_reduced, y_train_reduced, y_test_reduced = train_test_split(x_reduced, y, test_size=0.2, random_state=42)
x_train_reg, x_test_reg, y_train_reg, y_test_reg = train_test_split(x, y_reg, test_size=0.2, random_state=42)


In [None]:
model_lr = joblib.load('../artifacts/model_lr.pkl')
model_knn = joblib.load('../artifacts/model_knn.pkl')
model_gnb = joblib.load('../artifacts/model_gnb.pkl')
model_linear_svc = joblib.load('../artifacts/model_linear_svc.pkl')
model_tdt = joblib.load('../artifacts/model_tdt.pkl')
model_rf = joblib.load('../artifacts/model_rf.pkl')
model_xg_r = joblib.load('../artifacts/model_xg_r.pkl')
model_rrf = joblib.load('../artifacts/model_rrf.pkl')


In [None]:
try:
    x_train = x_train.astype(np.float32)
    y_train = y_train.astype(np.int32)  # Use float32 if it's a regression problem
    x_test = x_test.astype(np.float32)
    y_test = y_test.astype(np.int32)
    model_history = joblib.load('../artifacts/model_history.pkl')
    print("Model history loaded successfully.")
except Exception as e:
    print("Error loading model history:", e)


## Deep Learning (Loaded Model)


In [None]:
# Load pre-trained Deep Learning model from artifacts
import os
import joblib
import numpy as np
import tensorflow as tf

_dl_model = None
_joblib_path = '../artifacts/model_dl.pkl'
_savedmodel_dir = '../artifacts/model_dl'
_h5_path = '../artifacts/model_dl.h5'

try:
    if os.path.exists(_joblib_path):
        _dl_model = joblib.load(_joblib_path)
        print('Loaded DL model from', _joblib_path)
    elif os.path.isdir(_savedmodel_dir):
        _dl_model = tf.keras.models.load_model(_savedmodel_dir)
        print('Loaded DL model from', _savedmodel_dir)
    elif os.path.exists(_h5_path):
        _dl_model = tf.keras.models.load_model(_h5_path)
        print('Loaded DL model from', _h5_path)
except Exception as e:
    print('Failed to load DL model:', e)

if _dl_model is None:
    print('Deep Learning model artifact not found in ../artifacts/. Provide model_dl.pkl or model_dl(.h5).')


In [None]:
x_train = x_train.astype(np.float32)
y_train = y_train.astype(np.int32)  # Use float32 if it's a regression problem
x_test = x_test.astype(np.float32)
y_test = y_test.astype(np.int32)


In [None]:
# Evaluate the loaded DL model if available
from sklearn import metrics
if _dl_model is not None:
    x_eval = x_test.astype(np.float32)
    y_eval = y_test.astype(np.int32)
    try:
        y_pred_probs = _dl_model.predict(x_eval)
    except Exception:
        y_pred_probs = _dl_model.predict(x_eval)
    if hasattr(y_pred_probs, 'numpy'):
        y_pred_probs = y_pred_probs.numpy()
    import numpy as _np
    if _np.ndim(y_pred_probs) > 1:
        y_pred_probs = _np.ravel(y_pred_probs)
    y_pred = (y_pred_probs >= 0.5).astype(int)

    print(f'DL Test Accuracy: {metrics.accuracy_score(y_eval, y_pred) * 100:.2f}%')
    print(f'DL Test Precision: {metrics.precision_score(y_eval, y_pred) * 100:.2f}%')
    print(f'DL Test Recall: {metrics.recall_score(y_eval, y_pred) * 100:.2f}%')

    cm = metrics.confusion_matrix(y_eval, y_pred)
    disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['normal', 'attack'])
    disp.plot()


In [None]:
plt.plot(model_history.history['loss'], label='loss')
plt.plot(model_history.history['val_loss'], label='val_loss')
plt.xlabel('Epoch')
plt.ylabel('SCCE Loss')
plt.legend()
plt.grid(True)


In [None]:
plt.plot(model_history.history['accuracy'], label='accuracy')
plt.plot(model_history.history['val_accuracy'], label='val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.grid(True)


## Evaluate Classical ML Models


In [None]:
evaluate_classification(model_rrf, "RRF", x_train_reduced, x_test_reduced, y_train_reduced, y_test_reduced)


In [None]:
df_test['protocol_type']
df_test['protocol_type'] = df_test['protocol_type'].astype(float)
print(scaled_test.nunique())
x_train = pd.DataFrame(x_train)
print(x_train.columns)


In [None]:
def convert_to_numeric(df):
    for column in df.columns:
        if df[column].dtype == 'object':
            try:
                df[column] = pd.to_numeric(df[column])
            except ValueError:
                df[column] = df[column].astype('category').cat.codes
    return df
x_train = convert_to_numeric(pd.DataFrame(x_train))
x_test = convert_to_numeric(pd.DataFrame(x_test))
print("Evaluating models on the new test data...")


In [None]:
evaluate_classification(model_lr, "Logistic Regression", x_train, x_test, y_train, y_test)


In [None]:
evaluate_classification(model_knn, "KNN", x_train, x_test, y_train, y_test)


In [None]:

evaluate_classification(model_gnb, "GaussianNB", x_train, x_test, y_train, y_test)


In [None]:

evaluate_classification(model_tdt, "Decision Tree", x_train, x_test, y_train, y_test)


In [None]:

evaluate_classification(model_rf, "Random Forest", x_train, x_test, y_train, y_test)


In [None]:

evaluate_classification(model_xg_r, "XGBoost", x_train, x_test, y_train, y_test)


In [None]:
evaluate_classification(model_linear_svc, "Linear SVC", x_train, x_test, y_train, y_test)


In [None]:
keys = [key for key in kernal_evals.keys()]
values = [value for value in kernal_evals.values()]
fig, ax = plt.subplots(figsize=(20, 6))
ax.bar(np.arange(len(keys)) - 0.2, [value[0] for value in values], color='darkred', width=0.25, align='center')
ax.bar(np.arange(len(keys)) + 0.2, [value[1] for value in values], color='y', width=0.25, align='center')
ax.legend(["Training Accuracy", "Test Accuracy"])
ax.set_xticklabels(keys)
ax.set_xticks(np.arange(len(keys)))
plt.ylabel("Accuracy")
plt.show()


In [None]:
keys = [key for key in kernal_evals.keys()]
values = [value for value in kernal_evals.values()]
fig, ax = plt.subplots(figsize=(20, 6))
ax.bar(np.arange(len(keys)) - 0.2, [value[2] for value in values], color='g', width=0.25, align='center')
ax.bar(np.arange(len(keys)) + 0.2, [value[3] for value in values], color='b', width=0.25, align='center')
ax.legend(["Training Precesion", "Test Presision"])
ax.set_xticklabels(keys)
ax.set_xticks(np.arange(len(keys)))
plt.ylabel("Precesion")
plt.show()


In [None]:
keys = [key for key in kernal_evals.keys()]
values = [value for value in kernal_evals.values()]
fig, ax = plt.subplots(figsize=(20, 6))
ax.bar(np.arange(len(keys)) - 0.2, [value[2] for value in values], color='g', width=0.25, align='center')
ax.bar(np.arange(len(keys)) + 0.2, [value[3] for value in values], color='b', width=0.25, align='center')
ax.legend(["Training Recall", "Test Recall"])
ax.set_xticklabels(keys)
ax.set_xticks(np.arange(len(keys)))
plt.ylabel("Recall")
plt.show()


### Deep Learning (loaded model)
This section loads a pre-trained neural network from `../artifacts/` instead of training in-notebook.

In [None]:
# Load pre-trained Deep Learning model from artifacts
import os
import joblib
import numpy as np
import tensorflow as tf

_dl_model = None
_joblib_path = '../artifacts/model_dl.pkl'
_savedmodel_dir = '../artifacts/model_dl'
_h5_path = '../artifacts/model_dl.h5'

try:
    if os.path.exists(_joblib_path):
        _dl_model = joblib.load(_joblib_path)
        print('Loaded DL model from', _joblib_path)
    elif os.path.isdir(_savedmodel_dir):
        _dl_model = tf.keras.models.load_model(_savedmodel_dir)
        print('Loaded DL model from', _savedmodel_dir)
    elif os.path.exists(_h5_path):
        _dl_model = tf.keras.models.load_model(_h5_path)
        print('Loaded DL model from', _h5_path)
except Exception as e:
    print('Failed to load DL model:', e)

if _dl_model is None:
    print('Deep Learning model artifact not found. Place model_dl.pkl or model_dl(.h5) under ../artifacts/.')


In [None]:
# Evaluate the loaded DL model if available
from sklearn import metrics
if _dl_model is not None:
    x_eval = x_test.astype(np.float32)
    y_eval = y_test.astype(np.int32)
    y_pred_probs = _dl_model.predict(x_eval)
    import numpy as _np
    if _np.ndim(y_pred_probs) > 1:
        y_pred_probs = _np.ravel(y_pred_probs)
    y_pred = (y_pred_probs >= 0.5).astype(int)
    print(f'DL Test Accuracy: {metrics.accuracy_score(y_eval, y_pred) * 100:.2f}%')
    print(f'DL Test Precision: {metrics.precision_score(y_eval, y_pred) * 100:.2f}%')
    print(f'DL Test Recall: {metrics.recall_score(y_eval, y_pred) * 100:.2f}%')
    cm = metrics.confusion_matrix(y_eval, y_pred)
    disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['normal', 'attack'])
    disp.plot()


## Next Steps


- Ensure the DL artifact exists at `../artifacts/model_dl.pkl` (or SavedModel/H5).
- Re-run from the top to reproduce results and refresh plots.
