In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import polars as pl

In [2]:
# load network dataset
splits = {'train': 'train.csv', 'test': 'test.csv'}
ds_test = pl.read_csv('hf://datasets/rdpahalavan/network-packet-flow-header-payload/' + splits['train'])
print(ds_test)

shape: (1_187_781, 2)
┌─────────────────────────────────┬─────────────┐
│ packet_dat                      ┆ attack_cat  │
│ ---                             ┆ ---         │
│ str                             ┆ str         │
╞═════════════════════════════════╪═════════════╡
│ 0 0 141 -1 80 63713 2960 2920 … ┆ DDoS        │
│ 1190 1582 3526815 -1 80 50095 … ┆ Normal      │
│ 0 0 4 -1 80 41471 4420 4380 64… ┆ DDoS        │
│ 0 0 176 -1 80 45284 2948 2896 … ┆ DoS Hulk    │
│ 0 0 128 -1 80 46654 1500 1448 … ┆ DoS Hulk    │
│ …                               ┆ …           │
│ 14492 14492 0 -1 51328 22 164 … ┆ SSH Patator │
│ 14 98 131788 -1 80 52067 1500 … ┆ DoS         │
│ 1 2 397 -1 47188 22 692 640 62… ┆ SSH Patator │
│ 2063 0 0 -1 80 32768 1500 1448… ┆ DoS Hulk    │
│ 1 1 34 -1 56628 21 78 26 62 0 … ┆ FTP Patator │
└─────────────────────────────────┴─────────────┘


In [3]:
sample = ds_test[0]   # first row
print(sample)
print(sample["packet_dat"])  # torch.Tensor
print(sample["attack_cat"])  # torch.Tensor

shape: (1, 2)
┌─────────────────────────────────┬────────────┐
│ packet_dat                      ┆ attack_cat │
│ ---                             ┆ ---        │
│ str                             ┆ str        │
╞═════════════════════════════════╪════════════╡
│ 0 0 141 -1 80 63713 2960 2920 … ┆ DDoS       │
└─────────────────────────────────┴────────────┘
shape: (1,)
Series: 'packet_dat' [str]
[
	"0 0 141 -1 80 63713 2960 2920 …
]
shape: (1,)
Series: 'attack_cat' [str]
[
	"DDoS"
]


In [4]:
def get_label_categories(labels):
    counter = 0
    categories = {}
    for label in labels:
        if label not in categories:
            categories[label] = counter
            counter += 1
    return categories

In [5]:
categories = get_label_categories(ds_test["attack_cat"])
print(categories)

{'DDoS': 0, 'Normal': 1, 'DoS Hulk': 2, 'DoS': 3, 'Bot': 4, 'Exploits': 5, 'Fuzzers': 6, 'Reconnaissance': 7, 'Web Attack - XSS': 8, 'Heartbleed': 9, 'SSH Patator': 10, 'DoS SlowHTTPTest': 11, 'FTP Patator': 12, 'Generic': 13, 'Web Attack - Brute Force': 14, 'DoS GoldenEye': 15, 'Analysis': 16, 'Worms': 17, 'Infiltration': 18, 'DoS Slowloris': 19, 'Shellcode': 20, 'Backdoor': 21, 'Port Scan': 22, 'Web Attack - SQL Injection': 23}


In [8]:
ds_test_tesnors = []


for sid in range(len(ds_test)):
    sample = ds_test[sid]
    # get strings from sample
    example = sample["packet_dat"][0].split()
    name = sample["attack_cat"][0]

    # parse string to floats or attack cat id
    floats = [float(string) for string in example]
    id = categories[name]

    sample_dict = {
        "packet_tensor": torch.tensor(floats, dtype=torch.float), 
        "attack_tensor": torch.tensor(id, dtype=torch.int64)
    }

    ds_test_tesnors.append(sample_dict)


In [9]:
# student model
class LightMLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(513, 32)
        self.output = nn.Linear(32, 24)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        out = self.output(x)

        return out

In [10]:
test_loader = DataLoader(ds_test_tesnors, batch_size=32)

In [11]:
def test(model, test_loader, criterion):
    model.eval()
    test_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for batch in test_loader:
            inputs = batch["packet_tensor"]   # tensor of shape [B, L]
            labels = batch["attack_tensor"]   # true attack category labels
            
            # Forward pass
            predicted = model(inputs)
            loss = criterion(predicted, labels)
            test_loss += loss.item() * inputs.size(0)

            # calculate accuracy
            total += labels.size(0)
            correct += (predicted.argmax(dim=1) == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Loss: {test_loss:.2f}, Test Accuracy: {accuracy:.2f}")
    return accuracy

In [12]:
torch.manual_seed(42)
model = LightMLP()
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.0001)

In [13]:
# load model
checkpoint_path = "/home/ubuntu/Network-Packet-ML-Model/checkpoint/checkpoint.pth"

checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))

model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']

print(f"Checkpoint loaded from {checkpoint_path}, starting from epoch {epoch}")

Checkpoint loaded from /home/ubuntu/Network-Packet-ML-Model/checkpoint/checkpoint.pth, starting from epoch 3


In [14]:
test_accuracy = test(model, test_loader, criterion=criterion)

Test Loss: 79990365.19, Test Accuracy: 70.57
