In [None]:
# Imports and Configuration
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score

# ART Imports (Adversarial Robustness Toolbox)
from art.estimators.classification import SklearnClassifier, PyTorchClassifier
from art.attacks.evasion import HopSkipJump, FastGradientMethod

# Import KNN model script
import sys
import os
sys.path.append(os.path.abspath(".."))
try:
    from scripts.models.knn import train_knn 
except ImportError:
    # Fallback for standalone testing if script is missing
    from sklearn.neighbors import KNeighborsClassifier
    def train_knn(X, y, n_neighbors=5, cv=5, logger=None):
        knn = KNeighborsClassifier(n_neighbors=n_neighbors)
        knn.fit(X, y)
        return knn, []

# Check for GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Libraries loaded. PyTorch device: {device}")

Libraries loaded. PyTorch device: cpu


In [5]:
# Dataset Preparation - UNSW-NB15
import sys
import os
import numpy as np

# Adjust path to find the UNSWNB15 package
sys.path.append(os.path.abspath(".."))
from UNSWNB15.preprocessing.dataset import UNSWNB15

print("Loading UNSW-NB15 dataset (small size for testing)...")

# 1. Initialization
dataset = UNSWNB15(dataset_size="small")

# 2. Optimization and Encoding
dataset.optimize_memory()
dataset.encode(attack_encoder="label")

# 3. Scaling (Normalization)
try:
    dataset.scale(scaler="minmax")
except:
    print("Warning: 'minmax' scaler not supported, falling back to standard scaler.")
    dataset.scale(scaler="standard")

# 4. Split Train/Test
X_train, X_test, y_train, y_test = dataset.split(
    test_size=0.25, 
    multiclass=False # Using binary classification
)

# 5. Explicit conversion for ART/PyTorch compatibility
X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
y_train = y_train.astype(np.int64)
y_test = y_test.astype(np.int64)

# Create a small subset for quick attack testing
X_sub = X_test[:100]
y_sub = y_test[:100]

print(f"UNSW-NB15 data ready. Train shape: {X_train.shape}, Test shape: {X_test.shape}")

Loading UNSW-NB15 dataset (small size for testing)...
[INFO] Downloading dataset: mrwellsdavid/unsw-nb15
Downloading from https://www.kaggle.com/api/v1/datasets/download/mrwellsdavid/unsw-nb15?dataset_version_number=1...


100%|██████████| 149M/149M [00:39<00:00, 3.91MB/s] 

Extracting files...





[DEBUG] Dataset downloaded to: C:\Users\pauli\.cache\kagglehub\datasets\mrwellsdavid\unsw-nb15\versions\1
[INFO] Loaded UNSW-NB15_1.csv with shape: (700000, 46)
[INFO] DataFrame shape: (700000, 46)
[INFO] Initial dimensions: 700,000 rows x 46 columns = 32,200,000 cells
[DEBUG] Cleaning column names
[DEBUG] Removing duplicate rows
[DEBUG] Removed 59,342 duplicate rows. Remaining: 640,658
[DEBUG] Removing rows with missing values (initial pass)
[DEBUG] Removed 0 rows with missing values. Remaining: 640,658
[DEBUG] Checking for infinite values in numeric columns
[DEBUG] Missing values before processing infinite values: 0
[DEBUG] Missing values after processing infinite values: 0
[DEBUG] Infinite values converted to NaN: 0
[DEBUG] Dropping 0 columns with only one unique value:
[INFO] Preprocessing completed successfully
[INFO] Final dimensions: 640,658 rows x 46 columns
[INFO] Total rows removed: 59,342 (8.48%)
[INFO] data retention rate: 91.52%
[INFO] Optimizing memory usage of the datase

In [None]:
# Dataset Preparation - CICIDS2017 (OVERWRITING UNSW-NB15 VARIABLES)
# CAREFUL: This will overwrite the UNSW-NB15 dataset variables.
# We have to run either this cell or the previous one for the test.

from CICIDS2017.preprocessing.dataset import CICIDS2017

print("\n\nLoading CICIDS2017 dataset...")
dataset_cicids = CICIDS2017(dataset_size=None) 
dataset_cicids.optimize_memory()
dataset_cicids.encode(attack_encoder="label")

try:
    dataset_cicids.scale(scaler="minmax")
except:
    print("Warning: 'minmax' scaler not supported, falling back to standard scaler.")
    dataset_cicids.scale(scaler="standard")

X_train, X_test, y_train, y_test = dataset_cicids.split(test_size=0.25, multiclass=False)

X_sub = X_test[:100].astype(np.float32)
y_sub = y_test[:100].astype(np.int64)
X_train = X_train.astype(np.float32)
y_train = y_train.astype(np.int64)

print(f"CICIDS2017 data loaded. Current Train shape: {X_train.shape}, Test subset shape: {X_sub.shape}")

In [6]:
# Train Target Model (KNN)

knn_model, _ = train_knn(X_train, y_train, n_neighbors=5)
print("KNN Model trained.")

# Baseline accuracy on the subset
baseline_acc = accuracy_score(y_sub, knn_model.predict(X_sub))
print(f"Baseline accuracy on subset: {baseline_acc:.2f}")

KNN Model trained.
Baseline accuracy on subset: 0.99


In [7]:
# Black Box Attack - HopSkipJump
# We do not know the model internals, only the output class.

# 1. Wrap the Scikit-Learn model for ART
art_knn = SklearnClassifier(model=knn_model, clip_values=(0.0, 1.0)) # Because our data is normalized

# 2. Configure the HopSkipJump attack
attack_hsj = HopSkipJump(classifier=art_knn, targeted=False, max_iter=50, max_eval=1000, init_eval=10)

print("Generating adversarial examples using HopSkipJump (Black Box)...")
X_adv_hsj = attack_hsj.generate(x=X_sub)

# 3. Evaluate the attack
preds_clean = knn_model.predict(X_sub)
preds_adv_hsj = knn_model.predict(X_adv_hsj)

acc_clean = accuracy_score(y_sub, preds_clean)
acc_adv = accuracy_score(y_sub, preds_adv_hsj)

print(f"\n--- HopSkipJump Results (Black Box) ---")
print(f"Accuracy on clean data: {acc_clean:.2f}")
print(f"Accuracy on adversarial data: {acc_adv:.2f}")
print(f"Performance drop: {(acc_clean - acc_adv)*100:.1f}%")

Generating adversarial examples using HopSkipJump (Black Box)...


HopSkipJump:   0%|          | 0/100 [00:00<?, ?it/s]


--- HopSkipJump Results (Black Box) ---
Accuracy on clean data: 0.99
Accuracy on adversarial data: 0.01
Performance drop: 98.0%


In [14]:
# White Box Strategy - Defining a PyTorch Surrogate
# Since KNN is non-differentiable (no gradients), we cannot use the Fast Gradient Sign Method (FGSM) directly.
# We will train a differentiable Neural Network (Surrogate) to mimic the KNN, attack the surrogate, and transfer the examples to the KNN.

class SurrogateModel(nn.Module):
    def __init__(self, input_size, num_classes):
        super(SurrogateModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, num_classes)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        out = self.relu(out)
        out = self.fc3(out)
        return out

# Prepare data for PyTorch
# We use KNN predictions (y_train_surrogate) as labels, not the ground truth.
# We want the surrogate to learn the KNN's mistakes and decision boundaries.
y_train_surrogate = knn_model.predict(X_train)

tensor_x = torch.Tensor(X_train).to(device)
tensor_y = torch.LongTensor(y_train_surrogate).to(device)

dataset = TensorDataset(tensor_x, tensor_y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Initialize model and optimizer
surrogate = SurrogateModel(input_size=X_train.shape[1], num_classes=2).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(surrogate.parameters(), lr=0.01)

print("Surrogate PyTorch model initialized.")

Surrogate PyTorch model initialized.


In [15]:
# Train the Surrogate Model
epochs = 30
surrogate.train()

print("Training surrogate model to mimic KNN...")
for epoch in range(epochs):
    for inputs, labels in dataloader:
        optimizer.zero_grad()
        outputs = surrogate(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
    
    if (epoch+1) % 5 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

print("Surrogate training complete.")

Training surrogate model to mimic KNN...
Epoch [5/30], Loss: 0.0000
Epoch [10/30], Loss: 0.0208
Epoch [15/30], Loss: 0.0052
Epoch [20/30], Loss: 0.0001
Epoch [25/30], Loss: 0.0209
Epoch [30/30], Loss: 0.0000
Surrogate training complete.


In [16]:
# Transfer Attack (FGSM via Surrogate)
# We generate attacks on the differentiable Surrogate using gradients,
# then test if these attacks also fool the KNN.

# 1. Wrap the PyTorch model for ART
art_surrogate = PyTorchClassifier(
    model=surrogate,
    loss=criterion,
    optimizer=optimizer,
    input_shape=(X_train.shape[1],),
    nb_classes=2,
    clip_values=(0.0, 1.0),
    device_type='gpu' if torch.cuda.is_available() else 'cpu'
)

# 2. Generate attacks using FGSM
# eps = perturbation magnitude (0.1 is noticeable, 0.01 is subtle)
attack_fgsm = FastGradientMethod(estimator=art_surrogate, eps=0.3)

print("Generating adversarial examples using FGSM (on Surrogate)...")
X_adv_surr = attack_fgsm.generate(x=X_sub)

# 3. Transfer Evaluation: Test the generated examples on the original KNN
preds_clean_knn = knn_model.predict(X_sub)
preds_adv_transfer = knn_model.predict(X_adv_surr)

acc_clean = accuracy_score(y_sub, preds_clean_knn)
acc_transf = accuracy_score(y_sub, preds_adv_transfer)

print(f"\n--- Transfer Attack Results (White Box Surrogate) ---")
print(f"KNN Accuracy (Clean): {acc_clean:.2f}")
print(f"KNN Accuracy (Transferred Attack): {acc_transf:.2f}")
print(f"Attack Success (Drop): {(acc_clean - acc_transf)*100:.1f}%")

Generating adversarial examples using FGSM (on Surrogate)...

--- Transfer Attack Results (White Box Surrogate) ---
KNN Accuracy (Clean): 0.99
KNN Accuracy (Transferred Attack): 0.99
Attack Success (Drop): 0.0%


HopSkipJump	98.0%	Vulnérable aux attaques basées sur les décisions.
FGSM via Surrogate	0.0%	Robuste aux attaques basées sur les gradients (car la fonction de perte du KNN est trop différente de celle du Surrogate).