In [2]:
from scipy.io import arff
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, classification_report
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
import time


In [3]:
import zipfile
import os

zip_path = '/content/archive (5).zip'  # your zip file
extract_dir = 'NSL_KDD_Dataset'
os.makedirs(extract_dir, exist_ok=True)

if not os.path.exists(zip_path):
    print(f"Error: The file '{zip_path}' was not found.")
else:
    try:
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            print("Contents of the zip file:")
            for file_info in zip_ref.infolist():
                print(" -", file_info.filename)

            # ✅ Extract all files
            zip_ref.extractall(extract_dir)
            print(f"\n✅ Files extracted to: {os.path.abspath(extract_dir)}")

            # Show top-level extracted files/folders
            print("\nExtracted files:")
            print(os.listdir(extract_dir))

    except zipfile.BadZipFile:
        print(f"Error: The file '{zip_path}' is not a valid zip file or it is corrupted.")
    except Exception as e:
        print(f"An unexpected error occurred while extracting: {e}")


Contents of the zip file:
 - KDDTest+.arff
 - KDDTest+.txt
 - KDDTest-21.arff
 - KDDTest-21.txt
 - KDDTest1.jpg
 - KDDTrain+.arff
 - KDDTrain+.txt
 - KDDTrain+_20Percent.arff
 - KDDTrain+_20Percent.txt
 - KDDTrain1.jpg
 - index.html
 - nsl-kdd/KDDTest+.arff
 - nsl-kdd/KDDTest+.txt
 - nsl-kdd/KDDTest-21.arff
 - nsl-kdd/KDDTest-21.txt
 - nsl-kdd/KDDTest1.jpg
 - nsl-kdd/KDDTrain+.arff
 - nsl-kdd/KDDTrain+.txt
 - nsl-kdd/KDDTrain+_20Percent.arff
 - nsl-kdd/KDDTrain+_20Percent.txt
 - nsl-kdd/KDDTrain1.jpg
 - nsl-kdd/index.html

✅ Files extracted to: /content/NSL_KDD_Dataset

Extracted files:
['index.html', 'KDDTrain+.arff', 'KDDTest+.txt', 'KDDTest1.jpg', 'KDDTest-21.txt', 'KDDTest+.arff', 'KDDTrain+_20Percent.arff', 'nsl-kdd', 'KDDTrain+.txt', 'KDDTest-21.arff', 'KDDTrain+_20Percent.txt', 'KDDTrain1.jpg']


In [4]:
import re

def clean_arff(file_path, cleaned_path):
    """
    Cleans ARFF file categorical attribute definitions by removing extra spaces
    in the curly braces {} for nominal attributes.
    """
    with open(file_path, 'r') as f:
        lines = f.readlines()

    cleaned_lines = []
    for line in lines:
        if line.lower().startswith('@attribute'):
            # Look for {…} part
            match = re.search(r'{(.*)}', line)
            if match:
                values = match.group(1)
                # Remove spaces after commas and trim each value
                cleaned_values = ','.join(v.strip() for v in values.split(','))
                # Replace in line
                line = re.sub(r'{.*}', '{' + cleaned_values + '}', line)
        cleaned_lines.append(line)

    # Save cleaned ARFF
    with open(cleaned_path, 'w') as f:
        f.writelines(cleaned_lines)
    print(f"Cleaned ARFF saved to {cleaned_path}")

# Example usage
clean_arff('/content/NSL_KDD_Dataset/KDDTrain+.arff', 'KDDTrain_cleaned.arff')
clean_arff('/content/NSL_KDD_Dataset/KDDTest+.arff', 'KDDTest_cleaned.arff')


Cleaned ARFF saved to KDDTrain_cleaned.arff
Cleaned ARFF saved to KDDTest_cleaned.arff


In [5]:
train_path = '/content/KDDTrain_cleaned.arff'
test_path = '/content/KDDTest_cleaned.arff'

train_arff = arff.loadarff(train_path)
test_arff = arff.loadarff(test_path)


In [6]:
train_df = pd.DataFrame(train_arff[0])
test_df = pd.DataFrame(test_arff[0])

In [7]:
for col in train_df.columns:
  if train_df[col].dtype == 'object':
    train_df[col] = train_df[col].str.decode('utf-8')

for col in test_df.columns:
  if test_df[col].dtype == 'object':
    test_df[col] = test_df[col].str.decode('utf-8')

In [8]:
train_df.head()

Unnamed: 0,duration,protocol_type,service,flag,src_bytes,dst_bytes,land,wrong_fragment,urgent,hot,...,dst_host_srv_count,dst_host_same_srv_rate,dst_host_diff_srv_rate,dst_host_same_src_port_rate,dst_host_srv_diff_host_rate,dst_host_serror_rate,dst_host_srv_serror_rate,dst_host_rerror_rate,dst_host_srv_rerror_rate,class
0,0.0,tcp,ftp_data,SF,491.0,0.0,0,0.0,0.0,0.0,...,25.0,0.17,0.03,0.17,0.0,0.0,0.0,0.05,0.0,normal
1,0.0,udp,other,SF,146.0,0.0,0,0.0,0.0,0.0,...,1.0,0.0,0.6,0.88,0.0,0.0,0.0,0.0,0.0,normal
2,0.0,tcp,private,S0,0.0,0.0,0,0.0,0.0,0.0,...,26.0,0.1,0.05,0.0,0.0,1.0,1.0,0.0,0.0,anomaly
3,0.0,tcp,http,SF,232.0,8153.0,0,0.0,0.0,0.0,...,255.0,1.0,0.0,0.03,0.04,0.03,0.01,0.0,0.01,normal
4,0.0,tcp,http,SF,199.0,420.0,0,0.0,0.0,0.0,...,255.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,normal


In [9]:
# Save train_df to CSV
train_csv_path = 'KDDTrain+.csv'
train_df.to_csv(train_csv_path, index=False)

# Save test_df to CSV
test_csv_path = 'KDDTest+.csv'
test_df.to_csv(test_csv_path, index=False)

print("DataFrames saved as CSV files.")

DataFrames saved as CSV files.


In [10]:
def preprocess_data(X_train, X_test):
    categorical_features = ['protocol_type', 'service', 'flag']
    numerical_features = X_train.drop(columns=categorical_features).columns

    # Preprocessor for handling both categorical and numerical features
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numerical_features),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
        ]
    )

    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_test)

    return X_train_processed, X_test_processed

In [11]:
X = train_df.drop('class', axis=1)
y = train_df['class']

x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

x_train, x_test = preprocess_data(x_train, x_test)

y_train = LabelEncoder().fit_transform(y_train)
y_test = LabelEncoder().fit_transform(y_test)


In [12]:
models = {
    "Decision Tree": DecisionTreeClassifier(),
    "Random Forest": RandomForestClassifier(),
    "SVM": SVC(),
    "KNN": KNeighborsClassifier(),
    "XGBoost": XGBClassifier(),
    "AdaBoost": AdaBoostClassifier()
}

In [13]:
# Train and evaluate each model
results = {}
for name, model in models.items():
    start_time = time.time()
    model.fit(x_train, y_train)
    y_pred = model.predict(x_test)
    accuracy = accuracy_score(y_test, y_pred)
    report = classification_report(y_test, y_pred, output_dict=True)
    end_time = time.time()
    elapsed_time = end_time - start_time
    results[name] = {
        "Accuracy": accuracy,
        "Precision": report['weighted avg']['precision'],
        "Recall": report['weighted avg']['recall'],
        "F1-Score": report['weighted avg']['f1-score'],
        "Computation Time": elapsed_time
    }

# Display results
for model_name, metrics in results.items():
    print(f"{model_name}:")
    for metric, value in metrics.items():
        print(f"  {metric}: {value:.4f}")

Decision Tree:
  Accuracy: 0.9986
  Precision: 0.9986
  Recall: 0.9986
  F1-Score: 0.9986
  Computation Time: 1.7482
Random Forest:
  Accuracy: 0.9988
  Precision: 0.9989
  Recall: 0.9988
  F1-Score: 0.9988
  Computation Time: 9.9766
SVM:
  Accuracy: 0.9918
  Precision: 0.9918
  Recall: 0.9918
  F1-Score: 0.9918
  Computation Time: 72.7184
KNN:
  Accuracy: 0.9957
  Precision: 0.9957
  Recall: 0.9957
  F1-Score: 0.9957
  Computation Time: 37.7334
XGBoost:
  Accuracy: 0.9991
  Precision: 0.9991
  Recall: 0.9991
  F1-Score: 0.9991
  Computation Time: 2.9019
AdaBoost:
  Accuracy: 0.9776
  Precision: 0.9777
  Recall: 0.9776
  F1-Score: 0.9776
  Computation Time: 10.7723


In [14]:
def label_attack_type(row):
    # Attack labeling rules (as provided previously)
    if row['protocol_type'] == 'icmp' and row['service'] in ['eco_i', 'ecr_i'] and row['flag'] == 'SF' and row['dst_host_same_srv_rate'] > 0.5:
        return 'Smurf'
    elif row['protocol_type'] == 'tcp' and row['flag'] == 'S0' and (row['serror_rate'] > 0.5 or row['srv_serror_rate'] > 0.5):
        return 'Neptune'
    elif row['protocol_type'] == 'tcp' and row['diff_srv_rate'] > 0.5 and row['flag'] in ['SF', 'S1']:
        return 'Nmap'
    elif row['srv_count'] > 10 and row['dst_host_srv_count'] > 10 and row['protocol_type'] in ['tcp', 'icmp'] and row['flag'] in ['SF', 'S1']:
        return 'Satan'
    elif row['protocol_type'] == 'tcp' and row['service'] == 'private' and row['dst_host_count'] > 200 and row['flag'] == 'REJ':
        return 'Portsweep'
    elif row['protocol_type'] == 'icmp' and row['service'] == 'other' and row['serror_rate'] < 0.1:
        return 'Ping_of_Death'
    elif row['service'] in ['ftp', 'telnet'] and row['num_failed_logins'] > 0 and row['protocol_type'] == 'tcp':
        return 'Guess_password'
    elif row['service'] == 'ftp' and row['num_file_creations'] > 0 and row['num_failed_logins'] > 0:
        return 'FTP_write'
    elif row['root_shell'] > 0 or row['num_root'] > 0 or row['num_shells'] > 0 or row['su_attempted'] > 0:
        return 'Rootkit'
    elif (row['src_bytes'] > 1000 or row['dst_bytes'] > 1000) and row['flag'] in ['S0', 'REJ'] and (row['serror_rate'] > 0.5 or row['dst_host_serror_rate'] > 0.5):
        return 'DoS'
    elif (row['count'] > 10 or row['srv_count'] > 10) and row['flag'] in ['S1', 'S3', 'SF'] and row['protocol_type'] in ['tcp', 'icmp']:
        return 'Probe'
    elif (row['num_failed_logins'] > 0 or row['is_guest_login'] == '1') and row['service'] in ['ftp', 'telnet', 'ssh'] and (row['num_access_files'] > 0 or row['num_file_creations'] > 0):
        return 'R2L'
    elif (row['root_shell'] > 0 or row['su_attempted'] > 0) and (row['num_root'] > 0 or row['num_file_creations'] > 0 or row['num_shells'] > 0):
        return 'U2R'
    else:
        return 'Other'

In [15]:
# Apply the labeling function to training and test datasets
train_df['attack_type'] = train_df.apply(lambda row: label_attack_type(row) if row['class'] == 'anomaly' else 'normal', axis=1)
test_df['attack_type'] = test_df.apply(lambda row: label_attack_type(row) if row['class'] == 'anomaly' else 'normal', axis=1)


In [16]:
# Encode categorical columns
categorical_columns = ['protocol_type', 'service', 'flag', 'land', 'logged_in', 'is_host_login', 'is_guest_login']
train_df_encoded = pd.get_dummies(train_df, columns=categorical_columns)
test_df_encoded = pd.get_dummies(test_df, columns=categorical_columns)

In [17]:
# Align columns between train and test sets
train_df_encoded, test_df_encoded = train_df_encoded.align(test_df_encoded, join='left', axis=1, fill_value=0)

In [18]:
from sklearn.model_selection import train_test_split

# Prepare features and labels
X = train_df_encoded.drop(columns=['class', 'attack_type'])
y = train_df_encoded['class'].apply(lambda x: 0 if x == 'normal' else 1)

# Train-validation split
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# Prepare test data
X_test = test_df_encoded.drop(columns=['class', 'attack_type'])
y_test = test_df_encoded['class'].apply(lambda x: 0 if x == 'normal' else 1)

In [19]:
# Display shapes of datasets
print("X_train shape:", X_train.shape)
print("y_train shape:", y_train.shape)
print("X_val shape:", X_val.shape)
print("y_val shape:", y_val.shape)
print("X_test shape:", X_test.shape)
print("y_test shape:", y_test.shape)


X_train shape: (100778, 126)
y_train shape: (100778,)
X_val shape: (25195, 126)
y_val shape: (25195,)
X_test shape: (22544, 126)
y_test shape: (22544,)


In [20]:
from xgboost import XGBClassifier
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix

# Initialize and train the XGBoost model without tuning
model = XGBClassifier(eval_metric='logloss')
model.fit(X_train, y_train)

In [21]:
y_val_pred = model.predict(X_val)

In [22]:
# Evaluate on validation set
print("Validation Accuracy:", accuracy_score(y_val, y_val_pred))
print("Validation Classification Report:\n", classification_report(y_val, y_val_pred))
print("Validation Confusion Matrix:\n", confusion_matrix(y_val, y_val_pred))

Validation Accuracy: 0.9992458821194682
Validation Classification Report:
               precision    recall  f1-score   support

           0       1.00      1.00      1.00     13469
           1       1.00      1.00      1.00     11726

    accuracy                           1.00     25195
   macro avg       1.00      1.00      1.00     25195
weighted avg       1.00      1.00      1.00     25195

Validation Confusion Matrix:
 [[13460     9]
 [   10 11716]]


In [23]:
# Predict on test set
y_test_pred = model.predict(X_test)

# Evaluate on test set
print("Test Accuracy:", accuracy_score(y_test, y_test_pred))
print("Test Classification Report:\n", classification_report(y_test, y_test_pred))
print("Test Confusion Matrix:\n", confusion_matrix(y_test, y_test_pred))


Test Accuracy: 0.7851312987934705
Test Classification Report:
               precision    recall  f1-score   support

           0       0.67      0.97      0.80      9711
           1       0.97      0.64      0.77     12833

    accuracy                           0.79     22544
   macro avg       0.82      0.81      0.78     22544
weighted avg       0.84      0.79      0.78     22544

Test Confusion Matrix:
 [[9439  272]
 [4572 8261]]


In [24]:
from sklearn.model_selection import StratifiedKFold, GridSearchCV
from xgboost import XGBClassifier
from sklearn.metrics import accuracy_score, classification_report

# Define parameter grid with regularization
param_grid = {
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.1, 0.2],
    'n_estimators': [50, 100],
    'subsample': [0.8, 1.0],
    'reg_alpha': [0, 0.1, 0.5],  # L1 regularization
    'reg_lambda': [1, 1.5, 2]    # L2 regularization
}

In [25]:
# Calculate scale_pos_weight for class imbalance
scale_pos_weight = len(y_train[y_train == 0]) / len(y_train[y_train == 1])

# Set up GridSearchCV with StratifiedKFold and scale_pos_weight
grid_search = GridSearchCV(
    estimator=XGBClassifier(eval_metric='logloss', scale_pos_weight=scale_pos_weight),
    param_grid=param_grid,
    scoring='f1',
    cv=StratifiedKFold(n_splits=3),
    verbose=1,
    n_jobs=-1
)

In [26]:
# Run grid search
grid_search.fit(X_train, y_train)

Fitting 3 folds for each of 324 candidates, totalling 972 fits


In [27]:
# Best parameters and score
best_params = grid_search.best_params_
best_score = grid_search.best_score_

print("Best Parameters:", best_params)
print("Best F1 Score on Training Set:", best_score)

Best Parameters: {'learning_rate': 0.2, 'max_depth': 7, 'n_estimators': 100, 'reg_alpha': 0.5, 'reg_lambda': 1, 'subsample': 1.0}
Best F1 Score on Training Set: 0.998720315905571


In [35]:
import xgboost as xgb
from xgboost import XGBClassifier

# build classifier: put eval_metric and early_stopping_rounds (or callbacks) in the constructor
best_model = XGBClassifier(
    learning_rate = best_params['learning_rate'],
    max_depth      = best_params['max_depth'],
    n_estimators   = best_params['n_estimators'],
    subsample      = best_params['subsample'],
    reg_alpha      = best_params['reg_alpha'],
    reg_lambda     = best_params['reg_lambda'],
    scale_pos_weight = scale_pos_weight,

    # ← set evaluation and early-stopping behavior here (NOT in fit)
    eval_metric = "logloss",
    early_stopping_rounds = 10,
    # alternatively you can use the callback object (also must go in constructor):
    # callbacks = [ xgb.callback.EarlyStopping(rounds=10, metric_name='logloss', save_best=True) ]
)

# fit: pass eval_set (validation data) only; do NOT pass eval_metric / callbacks / early_stopping_rounds here
best_model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    verbose=True
)

[0]	validation_0-logloss:0.51735
[1]	validation_0-logloss:0.39902
[2]	validation_0-logloss:0.31369
[3]	validation_0-logloss:0.24932
[4]	validation_0-logloss:0.20009
[5]	validation_0-logloss:0.16188
[6]	validation_0-logloss:0.13196
[7]	validation_0-logloss:0.10819
[8]	validation_0-logloss:0.08883
[9]	validation_0-logloss:0.07341
[10]	validation_0-logloss:0.06094
[11]	validation_0-logloss:0.05089
[12]	validation_0-logloss:0.04262
[13]	validation_0-logloss:0.03579
[14]	validation_0-logloss:0.03013
[15]	validation_0-logloss:0.02566
[16]	validation_0-logloss:0.02205
[17]	validation_0-logloss:0.01868
[18]	validation_0-logloss:0.01623
[19]	validation_0-logloss:0.01400
[20]	validation_0-logloss:0.01219
[21]	validation_0-logloss:0.01078
[22]	validation_0-logloss:0.00951
[23]	validation_0-logloss:0.00861
[24]	validation_0-logloss:0.00787
[25]	validation_0-logloss:0.00715
[26]	validation_0-logloss:0.00665
[27]	validation_0-logloss:0.00622
[28]	validation_0-logloss:0.00580
[29]	validation_0-loglos

In [36]:
# Evaluate on validation set
y_val_pred_tuned = best_model.predict(X_val)
print("Tuned Model Validation Accuracy:", accuracy_score(y_val, y_val_pred_tuned))
print("Tuned Model Validation Classification Report:\n", classification_report(y_val, y_val_pred_tuned))

Tuned Model Validation Accuracy: 0.999285572534233
Tuned Model Validation Classification Report:
               precision    recall  f1-score   support

           0       1.00      1.00      1.00     13469
           1       1.00      1.00      1.00     11726

    accuracy                           1.00     25195
   macro avg       1.00      1.00      1.00     25195
weighted avg       1.00      1.00      1.00     25195



In [37]:
# Evaluate on test set
y_test_pred_tuned = best_model.predict(X_test)
print("Tuned Model Test Accuracy:", accuracy_score(y_test, y_test_pred_tuned))
print("Tuned Model Test Classification Report:\n", classification_report(y_test, y_test_pred_tuned))


Tuned Model Test Accuracy: 0.7871717530163236
Tuned Model Test Classification Report:
               precision    recall  f1-score   support

           0       0.68      0.97      0.80      9711
           1       0.97      0.65      0.78     12833

    accuracy                           0.79     22544
   macro avg       0.82      0.81      0.79     22544
weighted avg       0.84      0.79      0.79     22544



In [38]:
# Save the trained model to a file
best_model.save_model("best_xgboost_model.json")

In [39]:
import shap
import matplotlib.pyplot as plt

# Initialize SHAP explainer
explainer = shap.TreeExplainer(best_model)

def predict_instance(instance_index):
    """
    Function to predict and analyze a single instance from X_test using SHAP.

    Parameters:
    - instance_index: Index of the instance in X_test to analyze.
    """
    # Select the instance
    instance = X_test.iloc[instance_index:instance_index+1]

    # Predict the class (0: normal, 1: anomaly)
    prediction = best_model.predict(instance)[0]
    print(f"Prediction for instance {instance_index}: {'Anomaly' if prediction == 1 else 'Normal'}")

    # Identify the attack type if it's an anomaly
    if prediction == 1:
        attack_type = label_attack_type(test_df.iloc[instance_index])
        print(f"Identified Attack Type: {attack_type}")
    else:
        attack_type = 'Normal'

    # Calculate SHAP values for the instance
    shap_values = explainer.shap_values(instance)

    # Generate SHAP force plot for the instance
    print(f"Generating SHAP Force Plot for Instance {instance_index}...")
    shap.force_plot(explainer.expected_value, shap_values, instance, matplotlib=True)
    plt.show()

    # Generate SHAP waterfall plot for the instance
    print(f"Generating SHAP Waterfall Plot for Instance {instance_index}...")
    shap.waterfall_plot(shap.Explanation(values=shap_values[0], base_values=explainer.expected_value, data=instance.iloc[0]))
    plt.show()

In [40]:
import shap
import pandas as pd
import matplotlib.pyplot as plt

# Initialize SHAP explainer
explainer = shap.TreeExplainer(best_model)

# Function to get top N important features based on SHAP values for each instance
def get_top_features(shap_values, feature_names, top_n=5):
    # Get the absolute SHAP values
    shap_abs = pd.DataFrame(shap_values, columns=feature_names).abs()
    # Get the top N features by SHAP value for each instance
    top_features = shap_abs.apply(lambda row: row.nlargest(top_n).index.tolist(), axis=1)
    return top_features

# Calculate SHAP values for the test set
shap_values = explainer.shap_values(X_test)

# Get the top 5 features for each instance
top_features = get_top_features(shap_values, X_test.columns, top_n=5)

# Add the top features as a new column to the test DataFrame for easy grouping
test_df['top_features'] = top_features

# Aggregate results by anomaly type
top_features_by_type = test_df.groupby('attack_type')['top_features'].apply(lambda x: x.explode().value_counts().head(5))

# Display the top features for each attack type
print("Top features by attack type:")
print(top_features_by_type)

# Save the results to a CSV file for further analysis if needed
top_features_by_type.to_csv("top_features_by_attack_type.csv")

Top features by attack type:
attack_type                                
Guess_password  src_bytes                       472
                dst_host_same_src_port_rate     434
                service_telnet                  373
                count                           322
                dst_host_srv_count              234
Neptune         src_bytes                      1974
                count                          1891
                same_srv_rate                  1504
                dst_bytes                       940
                service_private                 926
Nmap            src_bytes                       191
                dst_bytes                       160
                count                           115
                dst_host_rerror_rate            112
                dst_host_diff_srv_rate           90
Other           src_bytes                      6474
                count                          4512
                dst_host_rerror_rate       