In [17]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim

from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, confusion_matrix

torch.manual_seed(42)
np.random.seed(42)


In [18]:
train_path = "../raw_data/UNSW_NB15_training-set.csv"
test_path  = "../raw_data/UNSW_NB15_testing-set.csv"

assert os.path.exists(train_path), "Arquivo de treino não encontrado"
assert os.path.exists(test_path), "Arquivo de teste não encontrado"

train_df = pd.read_csv(train_path)
test_df  = pd.read_csv(test_path)


In [19]:
target_col = "label"

cat_cols = ["proto", "service", "state"]

drop_cols = ["attack_cat"]  # NÃO entra no modelo

num_cols = [
    c for c in train_df.columns
    if c not in cat_cols + drop_cols + [target_col]
]


In [20]:
X_train = train_df.drop(columns=[target_col]).copy()
y_train = train_df[target_col].copy()

X_test  = test_df.drop(columns=[target_col]).copy()
y_test  = test_df[target_col].copy()


In [21]:
cat_maps = {}

for col in cat_cols:
    # categorias vistas no treino
    uniques = X_train[col].astype(str).unique().tolist()

    # índice 0 reservado para UNK
    mapping = {v: i + 1 for i, v in enumerate(uniques)}
    cat_maps[col] = mapping

    # aplica no treino
    X_train[col] = X_train[col].astype(str).map(mapping).fillna(0).astype(int)

    # aplica no teste (valores novos viram 0)
    X_test[col] = X_test[col].astype(str).map(mapping).fillna(0).astype(int)


In [22]:
scaler = StandardScaler()

X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
X_test.loc[:, num_cols]  = scaler.transform(X_test[num_cols])


  1.73204093]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
 -0.13367695]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
 -0.17204736]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
 -0.04995758]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
 -0.10392289]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_transform(X_train[num_cols])
  0.72326799]' has dtype incompatible with int64, please explicitly cast to a compatible dtype first.
  X_train.loc[:, num_cols] = scaler.fit_tra

In [23]:
X_train_num = X_train[num_cols].values.astype(np.float32)
X_test_num  = X_test[num_cols].values.astype(np.float32)

X_train_cat = X_train[cat_cols].values.astype(np.int64)
X_test_cat  = X_test[cat_cols].values.astype(np.int64)

y_train_t = torch.tensor(y_train.values, dtype=torch.float32)
y_test_t  = torch.tensor(y_test.values, dtype=torch.float32)

X_train_num_t = torch.tensor(X_train_num)
X_test_num_t  = torch.tensor(X_test_num)

X_train_cat_t = torch.tensor(X_train_cat)
X_test_cat_t  = torch.tensor(X_test_cat)


In [24]:
class IDSModel(nn.Module):
    def __init__(self, num_features, cat_cardinalities, emb_dims):
        super().__init__()

        self.emb_layers = nn.ModuleList([
            nn.Embedding(card, dim)
            for card, dim in zip(cat_cardinalities, emb_dims)
        ])

        emb_out_dim = sum(emb_dims)

        self.net = nn.Sequential(
            nn.Linear(num_features + emb_out_dim, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x_num, x_cat):
        embs = [emb(x_cat[:, i]) for i, emb in enumerate(self.emb_layers)]
        x = torch.cat([x_num] + embs, dim=1)
        return self.net(x).squeeze()


In [25]:
cat_cardinalities = [
    X_train[col].max() + 1 for col in cat_cols
]

emb_dims = [min(16, c // 2) for c in cat_cardinalities]

model = IDSModel(
    num_features=X_train_num.shape[1],
    cat_cardinalities=cat_cardinalities,
    emb_dims=emb_dims
)


In [26]:
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

epochs = 10

for epoch in range(epochs):
    model.train()

    optimizer.zero_grad()
    logits = model(X_train_num_t, X_train_cat_t)
    loss = criterion(logits, y_train_t)
    loss.backward()
    optimizer.step()

    print(f"Epoch {epoch+1}/{epochs} - Loss: {loss.item():.4f}")


Epoch 1/10 - Loss: 0.6782
Epoch 2/10 - Loss: 0.6582
Epoch 3/10 - Loss: 0.6399
Epoch 4/10 - Loss: 0.6231
Epoch 5/10 - Loss: 0.6072
Epoch 6/10 - Loss: 0.5916
Epoch 7/10 - Loss: 0.5766
Epoch 8/10 - Loss: 0.5618
Epoch 9/10 - Loss: 0.5470
Epoch 10/10 - Loss: 0.5323


In [27]:
model.eval()

with torch.no_grad():
    logits = model(X_test_num_t, X_test_cat_t)
    probs = torch.sigmoid(logits).numpy()

threshold = 0.32
y_pred = (probs >= threshold).astype(int)

y_test_np = y_test.values

tn, fp, fn, tp = confusion_matrix(y_test_np, y_pred).ravel()

print(f"Threshold : {threshold}")
print(f"Accuracy  : {accuracy_score(y_test_np, y_pred)*100:.2f}%")
print(f"Precision : {precision_score(y_test_np, y_pred)*100:.2f}%")
print(f"Recall    : {recall_score(y_test_np, y_pred)*100:.2f}%")
print(f"FPR       : {fp / (fp + tn) * 100:.2f}%")
print(f"ROC AUC   : {roc_auc_score(y_test_np, probs)*100:.2f}%")


Threshold : 0.32
Accuracy  : 55.06%
Precision : 55.06%
Recall    : 100.00%
FPR       : 100.00%
ROC AUC   : 75.10%


In [28]:
threshold = 0.7333
y_scores = probs
y_pred = (y_scores >= threshold).astype(int)



tn, fp, fn, tp = confusion_matrix(y_test_np, y_pred).ravel()

accuracy  = accuracy_score(y_test_np, y_pred)
precision = precision_score(y_test_np, y_pred)
recall    = recall_score(y_test_np, y_pred)
roc_auc   = roc_auc_score(y_test_np, y_scores)
fpr       = fp / (fp + tn)

print(f"Threshold : {threshold}")
print(f"Accuracy  : {accuracy*100:.2f}%")
print(f"Precision : {precision*100:.2f}%")
print(f"Recall    : {recall*100:.2f}%")
print(f"FPR       : {fpr*100:.2f}%")
print(f"ROC AUC   : {roc_auc*100:.2f}%")


Threshold : 0.7333
Accuracy  : 56.16%
Precision : 85.68%
Recall    : 24.47%
FPR       : 5.01%
ROC AUC   : 75.10%


In [14]:
print([c for c in X_train.columns if "attack" in c.lower()])


['attack_cat']


In [15]:
from sklearn.metrics import roc_curve

fpr, tpr, thresholds = roc_curve(y_test_np, probs)

# exemplo: queremos FPR <= 5%
target_fpr = 0.05

idx = np.where(fpr <= target_fpr)[0][-1]
best_threshold = thresholds[idx]

print(f"Threshold escolhido: {best_threshold:.4f}")
print(f"FPR   : {fpr[idx]*100:.2f}%")
print(f"Recall: {tpr[idx]*100:.2f}%")


Threshold escolhido: 0.7333
FPR   : 4.99%
Recall: 24.47%
