<a href="https://colab.research.google.com/github/Munazza-Farees/NITW-SIP2025-Project/blob/main/LNN_LDoS_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
# Import libraries
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
# Load dataset
URL = '/content/drive/MyDrive/Colab Notebooks/Final_data.csv'
data = pd.read_csv(URL)

In [3]:
# counts = data['Connection_Patterns'].value_counts()
# print(counts)
# counts = data['Traffic_Patterns'].value_counts()
# print(counts)

In [4]:
# Handle missing values
data = data.fillna(data.mean(numeric_only=True))

# Encode categorical features
data['Protocol'] = data['Protocol'].astype('category').cat.codes
data['Connection_Patterns'] = data['Connection_Patterns'].astype('category').cat.codes
data['Traffic_Patterns'] = data['Traffic_Patterns'].astype('category').cat.codes

In [5]:
# data.head()

In [6]:
features_to_be_scaled = data.drop(columns=['Source', 'Destination', 'Label'])
labels = data['Label']

scaler = StandardScaler()
X_scaled = scaler.fit_transform(features_to_be_scaled)

In [7]:
# Create sequences for LNN
# 10 time steps per sequence
seq_len = 10
X_seq = []
y_seq = []

for i in range(0, len(X_scaled) - seq_len):
    X_seq.append(X_scaled[i:i + seq_len])
    y_seq.append(labels[i + seq_len])

X_seq = np.array(X_seq)
y_seq = np.array(y_seq)

In [8]:
# Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X_seq, y_seq, test_size=0.3, random_state=42)

# Convert to PyTorch tensors
X_train = torch.FloatTensor(X_train)

label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)
y_test_encoded = label_encoder.transform(y_test)

y_train = torch.FloatTensor(y_train_encoded).unsqueeze(1)  # Shape: (num_samples, 1)
X_test = torch.FloatTensor(X_test)
y_test = torch.FloatTensor(y_test_encoded).unsqueeze(1)

AttributeError: 'numpy.ndarray' object has no attribute 'head'

In [18]:
# Define the LiquidTimeStep class
class LiquidTimeStep(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(LiquidTimeStep, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.W_in = nn.Linear(input_size, hidden_size)
        self.W_hid = nn.Linear(hidden_size, hidden_size)
        self.tau = nn.Parameter(torch.ones(hidden_size) * 0.1)  # Initialize tau to 0.1

    def forward(self, x, h):
        dx = torch.tanh(self.W_in(x) + self.W_hid(h))
        h_new = h + (dx - h) / self.tau
        return h_new

# Define the LNN model
class LNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LNN, self).__init__()
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.liquid_step = LiquidTimeStep(input_size, hidden_size)
        self.output_layer = nn.Linear(hidden_size, output_size)
        self.sigmoid = nn.Sigmoid()  # For binary classification

    def forward(self, x):
        batch_size, seq_len, _ = x.size()
        h = torch.zeros(batch_size, self.hidden_size, device=x.device)
        for t in range(seq_len):
            h = self.liquid_step(x[:, t, :], h)
        output = self.output_layer(h)
        output = self.sigmoid(output)  # Output probability for LDoS (0 to 1)
        return output

# Hyperparameters
input_size = 25  # Number of features
hidden_size = 64  # Increased for better capacity
output_size = 1  # Binary classification (LDoS or not)

# Initialize model, loss, and optimizer
model = LNN(input_size, hidden_size, output_size)
criterion = nn.BCELoss()  # Binary cross-entropy for classification
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [23]:
# Training loop
num_epochs = 10
batch_size = 20
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for i in range(0, len(X_train), batch_size):
        batch_X = X_train[i:i + batch_size].to(device)
        batch_y = y_train[i:i + batch_size].to(device)

        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / (len(X_train) // batch_size)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_loss:.4f}")

# Save the model
# torch.save(model.state_dict(), 'lnn_ldos.pth')

Epoch 1/10, Loss: 56.5795
Epoch 2/10, Loss: 56.5795
Epoch 3/10, Loss: 56.5795
Epoch 4/10, Loss: 56.5795
Epoch 5/10, Loss: 56.5795
Epoch 6/10, Loss: 56.5795
Epoch 7/10, Loss: 56.5795
Epoch 8/10, Loss: 56.5795


KeyboardInterrupt: 

In [24]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Evaluate the model
model.eval()
with torch.no_grad():
    X_test = X_test.to(device)
    y_test = y_test.to(device)
    outputs = model(X_test)
    predictions = (outputs >= 0.5).float()  # Threshold at 0.5 for binary classification

    # Compute metrics
    accuracy = accuracy_score(y_test.cpu(), predictions.cpu())
    precision = precision_score(y_test.cpu(), predictions.cpu())
    recall = recall_score(y_test.cpu(), predictions.cpu())
    f1 = f1_score(y_test.cpu(), predictions.cpu())

    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

Test Accuracy: 0.4340
Precision: 0.7155
Recall: 0.3115
F1 Score: 0.4340


In [25]:
# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

# Add to training loop after loss computation
scheduler.step(avg_loss)

# Class weights for imbalanced data
pos_weight = torch.tensor([len(y_train) / sum(y_train)]).to(device)  # Weight for positive class
criterion = nn.BCELoss(weight=pos_weight)

In [26]:
# Load the model
# model.load_state_dict(torch.load('lnn_ldos.pth'))
model.eval()

# Example inference on new data
new_data = torch.FloatTensor(X_test[:1]).to(device)  # Example: first test sample
with torch.no_grad():
    prediction = model(new_data)
    is_ldos = "LDoS Attack" if prediction >= 0.5 else "Benign"
    print(f"Prediction: {is_ldos}, Probability: {prediction.item():.4f}")

Prediction: LDoS Attack, Probability: 1.0000
