# Intrusion Detection System (IDS) - Local Version
This notebook implements an IDS using NSL-KDD dataset with ML and Neural Network models.

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.preprocessing import LabelEncoder, StandardScaler
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import socket
import threading
import pickle

## Load and Preprocess Data
Loading NSL-KDD dataset from local files

In [None]:
# Load datasets (NSL-KDD format: 41 features + label)
columns = [f'feature_{i}' for i in range(41)] + ['label']

# Load from local directory
train_data = pd.read_csv('KDDTrain+.txt', header=None, names=columns)
test_data = pd.read_csv('KDDTest+.txt', header=None, names=columns)

print(f"Training data shape: {train_data.shape}")
print(f"Testing data shape: {test_data.shape}")
print("\nFirst few rows:")
print(train_data.head())

In [None]:
# Encode categorical features (protocol_type, service, flag)
# These are typically in columns 1, 2, 3
categorical_cols = ['feature_1', 'feature_2', 'feature_3']

le_cat = LabelEncoder()
for col in categorical_cols:
    train_data[col] = le_cat.fit_transform(train_data[col].astype(str))
    test_data[col] = le_cat.transform(test_data[col].astype(str))

# Encode labels (normal=0, attack=1)
le_label = LabelEncoder()
train_data['label'] = le_label.fit_transform(train_data['label'])
test_data['label'] = le_label.transform(test_data['label'])

# Separate features and labels
X_train = train_data.drop('label', axis=1)
y_train = train_data['label']
X_test = test_data.drop('label', axis=1)
y_test = test_data['label']

# Standardize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("Datasets loaded and preprocessed.")
print(f"Training samples: {len(X_train)}")
print(f"Testing samples: {len(X_test)}")

## Part 1: Exploratory Data Analysis (EDA)

In [None]:
print("\n--- Part 1: EDA ---")
print(f"Training dataset shape: {X_train.shape}")
print(f"Testing dataset shape: {X_test.shape}")
print("Summary statistics for training dataset:")
print(train_data.describe())

# Percentage distribution of normal and attack records
train_label_counts = y_train.value_counts()
print(f"\nTraining: Normal: {train_label_counts.get(0, 0)/len(y_train)*100:.2f}%, Attack: {train_label_counts.get(1, 0)/len(y_train)*100:.2f}%")

# Bar chart for normal vs attack in train/test
fig, ax = plt.subplots(1, 2, figsize=(10, 5))
train_label_counts.plot(kind='bar', ax=ax[0], title='Training Dataset')
ax[0].set_xlabel('Label')
ax[0].set_ylabel('Count')

test_label_counts = y_test.value_counts()
test_label_counts.plot(kind='bar', ax=ax[1], title='Testing Dataset')
ax[1].set_xlabel('Label')
ax[1].set_ylabel('Count')
plt.tight_layout()
plt.show()

# Correlation heatmap (using a subset for visibility)
corr = train_data.corr()
plt.figure(figsize=(12, 10))
sns.heatmap(corr, annot=False, cmap='coolwarm')
plt.title('Correlation Heatmap (Training Dataset)')
plt.show()

# Attack types distribution
attack_types = train_data['label'].value_counts()
attack_types.plot(kind='bar', title='Attack Types Distribution (Training)')
plt.xlabel('Label')
plt.ylabel('Count')
plt.show()

## Part 2: ML Model Implementation

In [None]:
print("\n--- Part 2: ML Models ---")
# Model 1: Random Forest
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train_scaled, y_train)
rf_pred = rf_model.predict(X_test_scaled)

print("Random Forest Metrics:")
print(f"Accuracy: {accuracy_score(y_test, rf_pred):.4f}")
print(f"Precision: {precision_score(y_test, rf_pred, average='weighted'):.4f}")
print(f"Recall: {recall_score(y_test, rf_pred, average='weighted'):.4f}")
print(f"F1-Score: {f1_score(y_test, rf_pred, average='weighted'):.4f}")

In [None]:
# Model 2: SVM
svm_model = SVC(kernel='rbf', random_state=42)
svm_model.fit(X_train_scaled, y_train)
svm_pred = svm_model.predict(X_test_scaled)

print("\nSVM Metrics:")
print(f"Accuracy: {accuracy_score(y_test, svm_pred):.4f}")
print(f"Precision: {precision_score(y_test, svm_pred, average='weighted'):.4f}")
print(f"Recall: {recall_score(y_test, svm_pred, average='weighted'):.4f}")
print(f"F1-Score: {f1_score(y_test, svm_pred, average='weighted'):.4f}")

## Part 3: Neural Network Model (PyTorch)

In [None]:
print("\n--- Part 3: Neural Network ---")

class IDSNet(nn.Module):
    def __init__(self, input_size):
        super(IDSNet, self).__init__()
        self.fc1 = nn.Linear(input_size, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)  # Binary classification
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = torch.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [None]:
# Prepare data for PyTorch
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.values, dtype=torch.long)
X_test_tensor = torch.tensor(X_test_scaled, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.values, dtype=torch.long)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

nn_model = IDSNet(X_train.shape[1])
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(nn_model.parameters(), lr=0.001)

# Train
epochs = 10
for epoch in range(epochs):
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = nn_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

# Evaluate
nn_model.eval()
with torch.no_grad():
    nn_outputs = nn_model(X_test_tensor)
    _, nn_pred = torch.max(nn_outputs, 1)
    nn_pred = nn_pred.numpy()

print("\nNeural Network Metrics:")
print(f"Accuracy: {accuracy_score(y_test, nn_pred):.4f}")
print(f"Precision: {precision_score(y_test, nn_pred, average='weighted'):.4f}")
print(f"Recall: {recall_score(y_test, nn_pred, average='weighted'):.4f}")
print(f"F1-Score: {f1_score(y_test, nn_pred, average='weighted'):.4f}")

## Part 4: Model Comparison and Analysis

In [None]:
print("\n--- Part 4: Model Comparison ---")
models = ['Random Forest', 'SVM', 'Neural Network']
accuracies = [accuracy_score(y_test, rf_pred), accuracy_score(y_test, svm_pred), accuracy_score(y_test, nn_pred)]
precisions = [precision_score(y_test, rf_pred, average='weighted'), precision_score(y_test, svm_pred, average='weighted'), precision_score(y_test, nn_pred, average='weighted')]
recalls = [recall_score(y_test, rf_pred, average='weighted'), recall_score(y_test, svm_pred, average='weighted'), recall_score(y_test, nn_pred, average='weighted')]
f1s = [f1_score(y_test, rf_pred, average='weighted'), f1_score(y_test, svm_pred, average='weighted'), f1_score(y_test, nn_pred, average='weighted')]

comparison_df = pd.DataFrame({
    'Model': models,
    'Accuracy': accuracies,
    'Precision': precisions,
    'Recall': recalls,
    'F1-Score': f1s
})
print(comparison_df)

# Visualization
comparison_df.set_index('Model')[['Accuracy', 'Precision', 'Recall', 'F1-Score']].plot(kind='bar', figsize=(10, 6))
plt.title('Model Performance Comparison')
plt.ylabel('Score')
plt.xticks(rotation=45)
plt.legend(loc='lower right')
plt.tight_layout()
plt.show()

## Part 5: Real-Time IDS Prototype
Note: The server/client code below is for demonstration. Run server and client in separate terminals for actual testing.

In [None]:
print("\n--- Part 5: Real-Time IDS Prototype ---")

# Use Random Forest as the trained model for predictions
def predict_traffic(data):
    data_scaled = scaler.transform([data])
    pred = rf_model.predict(data_scaled)[0]
    return "Normal" if pred == 0 else "Anomalous"

# Server function
def server():
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('localhost', 12345))
    server_socket.listen(1)
    print("Server listening on port 12345...")
    conn, addr = server_socket.accept()
    print(f"Connected to {addr}")
    while True:
        data = conn.recv(1024)
        if not data:
            break
        sample = pickle.loads(data)
        result = predict_traffic(sample)
        conn.send(result.encode())
    conn.close()
    server_socket.close()

# Client function
def client():
    import time
    time.sleep(1)  # Wait for server to start
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client_socket.connect(('localhost', 12345))
    sample_data = X_test.iloc[0].tolist()
    client_socket.send(pickle.dumps(sample_data))
    response = client_socket.recv(1024).decode()
    print(f"Prediction: {response}")
    client_socket.close()

# Uncomment to run (note: may not work well in Jupyter, better in separate scripts)
# threading.Thread(target=server).start()
# client()

print("\nCoursework code execution complete.")
print("Prepare report and video based on outputs.")