# SNN su NSL-KDD (Colab) — baseline con snntorch

Questo notebook scarica NSL-KDD, esegue preprocessing (numeriche normalizzate + categoriche one-hot), applica rate coding e allena una SNN minimale con snntorch. Funziona su Google Colab (CPU/GPU).

- Dataset: NSL-KDD (KDDTrain+ / KDDTest+)
- Modello: MLP con neuroni Leaky IF (snntorch)
- Codifica: Rate encoding con T=10 step
- Obiettivo: binaria (anomalo vs normale)

Suggerimenti:
- Runtime → Cambia tipo di runtime → GPU (facoltativo)
- Epoche e dimensioni layer possono essere aumentate su GPU



In [None]:
# Installazione dipendenze
!pip -q install snntorch pyyaml pandas scikit-learn

import os, sys, numpy as np, pandas as pd
from pathlib import Path



In [None]:
# Download NSL-KDD (train/test)
base = Path('/content/data'); base.mkdir(parents=True, exist_ok=True)
urls = {
    'KDDTrain+': 'https://raw.githubusercontent.com/defcom17/NSL_KDD/master/KDDTrain+.txt',
    'KDDTest+':  'https://raw.githubusercontent.com/defcom17/NSL_KDD/master/KDDTest+.txt',
}
for name, url in urls.items():
    !wget -q -O /content/data/{name}.txt "{url}"
!ls -lh /content/data



In [None]:
# Preprocessing: split, one-hot, normalizzazione
cols = [
  "duration","protocol_type","service","flag","src_bytes","dst_bytes","land","wrong_fragment","urgent",
  "hot","num_failed_logins","logged_in","num_compromised","root_shell","su_attempted","num_root",
  "num_file_creations","num_shells","num_access_files","num_outbound_cmds","is_host_login","is_guest_login",
  "count","srv_count","serror_rate","srv_serror_rate","rerror_rate","srv_rerror_rate","same_srv_rate",
  "diff_srv_rate","srv_diff_host_rate","dst_host_count","dst_host_srv_count","dst_host_same_srv_rate",
  "dst_host_diff_srv_rate","dst_host_same_src_port_rate","dst_host_srv_diff_host_rate","dst_host_serror_rate",
  "dst_host_srv_serror_rate","dst_host_rerror_rate","dst_host_srv_rerror_rate","label","difficulty"
]

def load_split(split):
  df = pd.read_csv(f"/content/data/{split}.txt", names=cols)
  df.drop(columns=["difficulty"], inplace=True, errors="ignore")
  return df

train_df, test_df = load_split("KDDTrain+"), load_split("KDDTest+")

y_train = (train_df["label"] != "normal").astype(int).values
y_test  = (test_df["label"]  != "normal").astype(int).values
X_train_df = train_df.drop(columns=["label"])
X_test_df  = test_df.drop(columns=["label"])

cat_cols = ["protocol_type","service","flag"]
num_cols = [c for c in X_train_df.columns if c not in cat_cols]

from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
X_train_num = scaler.fit_transform(X_train_df[num_cols].values.astype(float))
X_test_num  = scaler.transform(X_test_df[num_cols].values.astype(float))

X_train_cat = pd.get_dummies(X_train_df[cat_cols].astype(str)).values
X_test_cat  = pd.get_dummies(X_test_df[cat_cols].astype(str)).reindex(
    columns=pd.get_dummies(X_train_df[cat_cols].astype(str)).columns, fill_value=0
).values

X_train = np.hstack([X_train_num, X_train_cat]).astype(np.float32)
X_test  = np.hstack([X_test_num,  X_test_cat]).astype(np.float32)
input_size, output_size = X_train.shape[1], 2
X_train.shape, X_test.shape



In [None]:
# Rate coding e SNN con snntorch
import torch, torch.nn as nn
import snntorch as snn
from torch.utils.data import TensorDataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
timesteps = 10

def rate_encode(X, timesteps=10):
  Xc = torch.tensor(X, device=device)
  return Xc.unsqueeze(1).repeat(1, timesteps, 1)

train_data = TensorDataset(rate_encode(X_train, timesteps), torch.tensor(y_train, device=device))
test_data  = TensorDataset(rate_encode(X_test, timesteps),   torch.tensor(y_test,  device=device))

train_loader = DataLoader(train_data, batch_size=256, shuffle=True)
test_loader  = DataLoader(test_data,  batch_size=512, shuffle=False)

class SNNNet(nn.Module):
  def __init__(self, input_size, hidden=256, output_size=2):
    super().__init__()
    self.fc1 = nn.Linear(input_size, hidden)
    self.lif1 = snn.Leaky(beta=0.9)
    self.fc2 = nn.Linear(hidden, output_size)
    self.lif2 = snn.Leaky(beta=0.9)
  def forward(self, x):
    mem1 = self.lif1.init_leaky(); mem2 = self.lif2.init_leaky()
    spk_sum = 0
    for t in range(x.size(1)):
      cur = self.fc1(x[:, t, :])
      spk1, mem1 = self.lif1(cur, mem1)
      cur2 = self.fc2(spk1)
      spk2, mem2 = self.lif2(cur2, mem2)
      spk_sum = spk_sum + spk2
    return spk_sum

model = SNNNet(input_size, 256, output_size).to(device)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()



In [None]:
# Training e valutazione
import numpy as np

def accuracy(logits, y):
  preds = torch.argmax(logits, dim=1)
  return (preds == y).float().mean().item()

for epoch in range(5):
  model.train(); losses=[]; accs=[]
  for Xb, yb in train_loader:
    opt.zero_grad()
    logits = model(Xb)
    loss = criterion(logits, yb)
    loss.backward(); opt.step()
    losses.append(loss.item()); accs.append(accuracy(logits, yb))
  print(f"Epoch {epoch+1}: loss={np.mean(losses):.4f}, acc={np.mean(accs):.3f}")

model.eval(); accs=[]
with torch.no_grad():
  for Xb, yb in test_loader:
    logits = model(Xb)
    accs.append(accuracy(logits, yb))
print(f"Test Accuracy: {np.mean(accs):.3f}")



In [None]:
# Salvataggio artefatti
Path('/content/out').mkdir(exist_ok=True)
torch.save(model.state_dict(), "/content/out/snn_nslkdd.pt")
np.save("/content/out/scaler_min.npy", scaler.min_)
np.save("/content/out/scaler_scale.npy", scaler.scale_)
!ls -lh /content/out

