# Imports

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, confusion_matrix

# Check if CUDA (GPU) is available and set the device accordingly.
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

# The rest of your code goes here...


Using device: cuda


In [1]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())

2.1.1
True


# Functions

In [3]:
# Function to convert IPv4 string to integer
def ip_to_int(ip):
    octets = ip.split('.')
    if len(octets) != 4:
        return None
    try:
        int_ip = (int(octets[0]) << 24) + (int(octets[1]) << 16) + (int(octets[2]) << 8) + int(octets[3])
        return int_ip
    except ValueError:
        return None
    
# Function to convert int to bitmask
def int_to_bits(num):
    bits = bin(num)[2:]
    # pad with zeros if needed
    bits = bits.zfill(32)

    # return ".".join(str(bit) for bit in bits)
    return [int(bit) for bit in bits]

# Function to convert IPV4 address to bitmask
def ip_to_bits(ip):
    octets = ip.split('.')
    if len(octets) != 4:
        return None
    int_ip = (int(octets[0]) << 24) + (int(octets[1]) << 16) + (int(octets[2]) << 8) + int(octets[3])
    
    # Convert to bits
    bits = bin(int_ip)[2:]

    # Pad with zeros if needed
    bits = bits.zfill(32)

    # return ".".join(str(bit) for bit in bits)
    return [int(bit) for bit in bits]

In [14]:
print(ip_to_bits('255.255.255.255'))
print(bin(ip_to_int('255.255.255.255'))[2:])
print(ip_to_bits('48.198.255.128'))
print(bin(ip_to_int('48.198.255.128'))[2:])

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
11111111111111111111111111111111
[0, 0, 1, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0]
110000110001101111111110000000


# Data Pre-processing

In [15]:
# Load the dataset
data = pd.read_csv("data/NF-UQ-NIDS.csv")

# Convert IP address strings into bitmask
data['IPV4_SRC_ADDR'] = data['IPV4_SRC_ADDR'].apply(ip_to_bits)
data['IPV4_DST_ADDR'] = data['IPV4_DST_ADDR'].apply(ip_to_bits)

In [16]:
# Drop any rows that have NaN values
old_size = len(data)
data = data.dropna(how='any')
print('Entries with NaN dropped: ' + str(old_size-len(data)))

Entries with NaN dropped: 2


In [17]:
# Split IP columns into 32 separate columns, one for each bit of address
data[['sIP31', 'sIP30', 'sIP29', 'sIP28', 'sIP27', 'sIP26', 'sIP25', 'sIP24', 'sIP23', 'sIP22', 'sIP21', 'sIP20', 'sIP19', 'sIP18', 'sIP17', 'sIP16', 'sIP15', 'sIP14', 'sIP13', 'sIP12', 'sIP11', 'sIP10', 'sIP9', 'sIP8', 'sIP7', 'sIP6', 'sIP5', 'sIP4', 'sIP3', 'sIP2', 'sIP1', 'sIP0']] = pd.DataFrame(data['IPV4_SRC_ADDR'].tolist()).astype('bool')
data[['dIP31', 'dIP30', 'dIP29', 'dIP28', 'dIP27', 'dIP26', 'dIP25', 'dIP24', 'dIP23', 'dIP22', 'dIP21', 'dIP20', 'dIP19', 'dIP18', 'dIP17', 'dIP16', 'dIP15', 'dIP14', 'dIP13', 'dIP12', 'dIP11', 'dIP10', 'dIP9', 'dIP8', 'dIP7', 'dIP6', 'dIP5', 'dIP4', 'dIP3', 'dIP2', 'dIP1', 'dIP0']] = pd.DataFrame(data['IPV4_DST_ADDR'].tolist()).astype('bool')

In [18]:
# Drop any rows that have NaN values, again
old_size = len(data)
data = data.dropna(how='any')
print('Entries with NaN dropped: ' + str(old_size-len(data)))

Entries with NaN dropped: 2


In [19]:
# Separate features (X) and labels (y)
X = data.drop({'Label', 'Attack', 'Dataset', 'IPV4_SRC_ADDR', 'IPV4_DST_ADDR'}, axis=1)
y = data['Label']

# Delete data so it can be garbage collected
del data

# Drop column labels
X = X.values
y = y.values

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=99)

# Delete data so it can be garbage collected
del X
del y

# Standardize the features
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [20]:
# Convert data to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.float32).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.float32).to(device)

# Model Definition

In [21]:
# Model Hyperparameters
input_dim = X_train.shape[1]
hidden_dim = 256
output_dim = 1

# Model Definition
class Classifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, hidden_dim)
        self.fc4 = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.sigmoid(self.fc4(x))  # Sigmoid activation for binary classification
        return x

model = Classifier(input_dim, hidden_dim, output_dim).to(device)

# Optimizer Definition
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Model Training

In [22]:
# Training
num_epochs = 100

# Lists to store the loss values for training and validation sets
train_loss_list = []
test_loss_list = []

for epoch in tqdm(range(num_epochs)):
    train_outputs = model(X_train).squeeze()
    train_loss = criterion(train_outputs, y_train)
    train_loss_list.append(train_loss.item())

    test_outputs = model(X_test).squeeze()
    test_loss = criterion(test_outputs, y_test)
    test_loss_list.append(test_loss.item())

    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()
    if(((epoch+1) % 10) == 0):
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Test Loss: {test_loss.item():.4f}')

del train_outputs
del test_outputs
del train_loss
del test_loss

# Plot the loss curve
plt.plot(train_loss_list, label="train")
plt.plot(test_loss_list, label="validation")
plt.title("Training Loss Curve")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.legend()
plt.show()

  0%|          | 0/100 [00:00<?, ?it/s]

# Model Evaluation

In [None]:
# Evaluation
model.eval()
with torch.no_grad():
    test_outputs = model(X_test)
    _, predictions = test_outputs.max(1)


    accuracy = accuracy_score(y_test.cpu(), predictions.cpu())
    confusion = confusion_matrix(y_test.cpu(), predictions.cpu())

    print(f'Accuracy: {accuracy:.2f}')
    print(f'Confusion Matrix:\n{confusion}')