# TabNet Pipeline for NSL-KDD Dataset

In [25]:

# Import Required Libraries
# Ensure you have the following installed: pytorch-tabnet, sklearn, pandas, numpy, matplotlib, seaborn
# Install with: pip install pytorch-tabnet numpy pandas scikit-learn matplotlib seaborn

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, classification_report, roc_curve, auc
from matplotlib import pyplot as plt
from pytorch_tabnet.tab_model import TabNetClassifier
import torch
import seaborn as sns

# Create a folder for visualizations
import os
os.makedirs("visualization", exist_ok=True)


## Step 1: Load Data

In [26]:

# Column names for NSL-KDD dataset
c_names = [
    "duration", "protocol_type", "service", "flag", "src_bytes", "dst_bytes",
    "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in",
    "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations",
    "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login",
    "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate",
    "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count",
    "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate",
    "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate",
    "dst_host_rerror_rate", "dst_host_srv_rerror_rate", "labels"
]

# Load training and testing datasets
train = pd.read_csv("data/KDDTrain+.txt", names=c_names)
test = pd.read_csv("data/KDDTest+.txt", names=c_names)


## Step 2: Data Preprocessing

In [27]:

# Convert categorical features to numerical
categorical_features = ["protocol_type", "service", "flag"]

for col in categorical_features:
    train[col] = train[col].astype("category")
    test[col] = test[col].astype("category")
    test[col] = test[col].cat.set_categories(train[col].cat.categories).cat.codes
    train[col] = train[col].cat.codes

# Map 'labels' column to binary classes (1 for 'normal', 0 for 'attack')
train["labels"] = train["labels"].apply(lambda x: 1 if x == "normal" else 0)
test["labels"] = test["labels"].apply(lambda x: 1 if x == "normal" else 0)

# Separate features and labels
numerical_columns = train.select_dtypes(include=["int64", "float64"]).columns
X_train = train[numerical_columns].drop("labels", axis=1).values
X_test = test[numerical_columns].drop("labels", axis=1).values
y_train = train["labels"].values
y_test = test["labels"].values

# Normalize numerical features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Split training data into training and validation sets (Stratified)
x_train, x_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=42, stratify=y_train
)

# Optional: Rebalance the training data (if imbalance exists)
from imblearn.over_sampling import SMOTE
smote = SMOTE(random_state=42)
x_train, y_train = smote.fit_resample(x_train, y_train)

# Validate class distribution
print(f"Training labels distribution: {np.bincount(y_train)}")
print(f"Validation labels distribution: {np.bincount(y_val)}")
print(f"Test labels distribution: {np.bincount(y_test)}")


ModuleNotFoundError: No module named 'imblearn'

## Step 3: Build and Train TabNet Model

In [None]:

# Build TabNet Model
clf = TabNetClassifier(
    n_d=32, n_a=32, n_steps=5,
    gamma=1.5, lambda_sparse=1e-2,  # Regularization added
    optimizer_fn=torch.optim.Adam,
    optimizer_params=dict(lr=2e-2),
    scheduler_params={"step_size":10, "gamma":0.9},
    scheduler_fn=torch.optim.lr_scheduler.StepLR,
    mask_type="entmax"
)

# Train the TabNet model
print("Training TabNet Model...")
clf.fit(
    x_train, y_train,
    eval_set=[(x_train, y_train), (x_val, y_val)],
    eval_name=["train", "valid"],
    eval_metric=["accuracy"],
    max_epochs=50,
    patience=10,
    batch_size=256,
    virtual_batch_size=128,
    num_workers=0,
    drop_last=False
)


## Step 4: Evaluate the Model

In [None]:

# Evaluate on the test set
test_preds = clf.predict(X_test)
test_proba = clf.predict_proba(X_test)[:, 1]

# Confusion Matrix
cm = confusion_matrix(y_test, test_preds)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Attack", "Normal"], yticklabels=["Attack", "Normal"])
plt.title("Confusion Matrix", fontsize=14)
plt.xlabel("Predicted Label", fontsize=12)
plt.ylabel("True Label", fontsize=12)
plt.tight_layout()
plt.savefig("visualization/tabnet_confusion_matrix.png")
plt.close()

# Classification Report
report = classification_report(y_test, test_preds, target_names=["Attack", "Normal"], output_dict=True)
print("Classification Report:\n", classification_report(y_test, test_preds, target_names=["Attack", "Normal"]))

# ROC Curve
fpr, tpr, _ = roc_curve(y_test, test_proba)
roc_auc = auc(fpr, tpr)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC curve (area = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
plt.title("Receiver Operating Characteristic", fontsize=14)
plt.xlabel("False Positive Rate", fontsize=12)
plt.ylabel("True Positive Rate", fontsize=12)
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.savefig("visualization/tabnet_roc_curve.png")
plt.close()


## Step 5: Feature Importance

In [None]:

# Plot Feature Importance
feature_importances = clf.feature_importances_
plt.figure(figsize=(10, 6))
plt.bar(range(len(feature_importances)), feature_importances)
plt.title("TabNet Feature Importances", fontsize=14)
plt.xlabel("Feature Index", fontsize=12)
plt.ylabel("Importance", fontsize=12)
plt.tight_layout()
plt.savefig("visualization/tabnet_feature_importance.png")
plt.close()

print("Pipeline complete. All visualizations saved to the 'visualization' folder.")
