In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from sklearn.model_selection import train_test_split

In [None]:
# Step 1: Load the CSV dataset
def load_data(csv_path):
    data = pd.read_csv(csv_path, low_memory=False)
    data = data.sample(frac=1, random_state=42)  # Shuffle data
    return data

# Step 2: Prepare the dataset for 1D CNN
def prepare_data(data):
    label_col = None
    for col in data.columns:
        if "label" in col.lower() or "class" in col.lower() or "attack" in col.lower():
            label_col = col
            break

    if label_col is None:
        raise ValueError("No label column found in dataset. Check column names!")

    feature_columns = data.select_dtypes(include=[np.number]).columns.tolist()
    if label_col in feature_columns:
        feature_columns.remove(label_col)
    
    features = data[feature_columns].values.astype(np.float32)
    labels = data[label_col].values.astype(int)
    return features, labels

# Step 3: Create a custom PyTorch Dataset class
class DDoSDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.long)
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        return self.features[idx].unsqueeze(0), self.labels[idx]  # Add channel dimension

In [None]:
# Step 4: Load data, split into train, validation, and test sets
data = load_data("/path/to/ddos_dataset.csv")
features, labels = prepare_data(data)

train_feats, test_feats, train_labels, test_labels = train_test_split(features, labels, test_size=0.2, random_state=42)
train_feats, val_feats, train_labels, val_labels = train_test_split(train_feats, train_labels, test_size=0.2, random_state=42)

# Create DataLoaders
train_dataset = DDoSDataset(train_feats, train_labels)
val_dataset = DDoSDataset(val_feats, val_labels)
test_dataset = DDoSDataset(test_feats, test_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
# Step 5: Define the 1D CNN model
class CNN1D(nn.Module):
    def __init__(self, input_size, num_classes):
        super(CNN1D, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.relu = nn.ReLU()
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(32 * (input_size // 2 // 2), 64)
        self.fc2 = nn.Linear(64, num_classes)
    
    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
# Step 6: Initialize model, loss, and optimizer
input_size = features.shape[1]
num_classes = len(set(labels))

model = CNN1D(input_size, num_classes)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Step 7: Train the model
def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10):
    model.train()
    for epoch in range(epochs):
        total_loss = 0
        for features, labels in train_loader:
            features, labels = features.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader)}")
    return model

model = train_model(model, train_loader, val_loader, criterion, optimizer, epochs=10)

In [None]:
# Step 8: Evaluate on test data
def evaluate_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for features, labels in test_loader:
            features, labels = features.to(device), labels.to(device)
            outputs = model(features)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    print(f"Test Accuracy: {100 * correct / total:.2f}%")

evaluate_model(model, test_loader)