In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
from pathlib import Path
import random
import warnings
from typing import List

import numpy as np
import pandas as pd
from deap import base, creator, tools
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Silence sklearn warning about feature names
warnings.filterwarnings("ignore", category=UserWarning, message="X does not have valid feature names")


In [None]:


# ───────────────────────────────────────────────────────────────
# 0. Config
# ───────────────────────────────────────────────────────────────
DATA_CSV = Path("/dataset/balanced100k_2018.csv")  # TODO: update
TEST_SIZE = 0.1
RNG_SEED = 42
GA_GENERATIONS = 10
GA_POP_SIZE = 50

# Attacker‑controllable feature white‑list (17 fields)
MUTATED_FEATURES: List[str] = [
    # ── size / volume ──
    "TotLen Fwd Pkts", "TotLen Bwd Pkts",
    "Tot Fwd Pkts", "Tot Bwd Pkts",
    "Fwd Pkt Len Max", "Bwd Pkt Len Max",
    "Fwd Pkt Len Min", "Bwd Pkt Len Min",
    # ── timing ──
    "Flow Duration", "Flow IAT Mean", "Flow IAT Std",
    "Fwd IAT Mean", "Bwd IAT Mean",
    # ── flags ──
    "Fwd PSH Flags", "Bwd PSH Flags", "ACK Flag Cnt", "FIN Flag Cnt",
]

INT_FLAG_FIELDS = ["Fwd PSH Flags", "Bwd PSH Flags", "ACK Flag Cnt", "FIN Flag Cnt"]

DERIVED_DROP = [
    "Pkt Len Mean", "Pkt Len Std", "Pkt Len Var",
    "Fwd Pkt Len Mean", "Bwd Pkt Len Mean",
    "Pkt Size Avg", "Fwd Seg Size Avg", "Bwd Seg Size Avg",
    "Flow Byts/s", "Flow Pkts/s",
]

ID_COLUMNS = [
    "Flow ID", "Src IP", "Src Port", "Dst IP", "Dst Port", "Protocol", "Timestamp",
]


In [16]:

# ───────────────────────────────────────────────────────────────
# 1. Load & preprocess dataset
# ───────────────────────────────────────────────────────────────
df = pd.read_csv(DATA_CSV)
cols_to_drop = [c for c in ID_COLUMNS + DERIVED_DROP if c in df.columns]
df.drop(columns=cols_to_drop, inplace=True, errors="ignore")

numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
non_numeric = [c for c in df.columns if c not in numeric_cols + ["Label"]]
if non_numeric:
    df[non_numeric] = df[non_numeric].apply(pd.to_numeric, errors="coerce")

numeric_cols = [c for c in df.columns if c != "Label"]
df[numeric_cols] = df[numeric_cols].replace([np.inf, -np.inf], np.nan)
df.dropna(subset=numeric_cols, inplace=True)

scaler = MinMaxScaler()
X_scaled = scaler.fit_transform(df[numeric_cols])
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(df["Label"].values)

X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=TEST_SIZE, random_state=RNG_SEED, stratify=y
)

FEAT_IDX = {feat: numeric_cols.index(feat) for feat in numeric_cols}
MUT_IDX = [FEAT_IDX[f] for f in MUTATED_FEATURES]
INT_FLAG_IDX = [FEAT_IDX[f] for f in INT_FLAG_FIELDS]

  df = pd.read_csv(DATA_CSV)


In [17]:


# ───────────────────────────────────────────────────────────────
# 2. Train classifier
# ───────────────────────────────────────────────────────────────
rf = RandomForestClassifier(n_estimators=100, random_state=RNG_SEED, n_jobs=-1)
rf.fit(X_train, y_train)
model_predict_prob_vector = lambda sample: rf.predict_proba(sample.reshape(1, -1))[0]

ATTACK_LABELS = [
    "Bot", "DDoS attacks-LOIC-HTTP", "DoS attacks-GoldenEye", "DoS attacks-Hulk",
    "DoS attacks-SlowHTTPTest", "DoS attacks-Slowloris", "FTP-BruteForce",
    "Infilteration", "SSH-Bruteforce",
]
attack_indices = [np.where(label_encoder.classes_ == a)[0][0] for a in ATTACK_LABELS]
attack_probability = lambda p: float(np.sum(p[attack_indices]))


In [21]:
predict_proba = lambda v: rf.predict_proba(v.reshape(1, -1))[0]
label_from_proba = lambda pv, th=0.5: int(pv[1] >= th)


In [22]:

def enforce_constraints(scaled_vec: np.ndarray) -> np.ndarray:
    orig = scaler.inverse_transform(pd.DataFrame([scaled_vec], columns=numeric_cols))[0]
    # round / clip flags
    for name, idx in zip(INT_FLAG_FIELDS, INT_FLAG_IDX):
        orig[idx] = max(0, round(orig[idx]))
        tot_col = "Tot Fwd Pkts" if name.startswith("Fwd") else (
            "Tot Bwd Pkts" if name.startswith("Bwd") else None
        )
        if tot_col and tot_col in FEAT_IDX:
            orig[idx] = min(orig[idx], orig[FEAT_IDX[tot_col]])
    return np.clip(scaler.transform(pd.DataFrame([orig], columns=numeric_cols))[0], 0, 1)

# ───────────────────────── GA setup ─────────────────────────
creator.create("Fitness", base.Fitness, weights=(0.8, 1.2))
creator.create("Individual", list, fitness=creator.Fitness)

def bounded_uniform(center):
    return random.uniform(max(0, center - 0.2), min(1, center + 0.2))

def attr_gens(orig):
    return [(lambda c=v: bounded_uniform(c)) for v in orig[MUT_IDX]]

def fitness_factory(orig):
    def _f(ind):
        cand = orig.copy()
        for g, i in zip(ind, MUT_IDX):
            cand[i] = g
        cand = enforce_constraints(cand)
        p = predict_proba(cand)
        return (1 - p[1], -np.linalg.norm(cand[MUT_IDX] - orig[MUT_IDX]))
    return _f

def run_ga(orig):
    toolbox = base.Toolbox()
    for j, f in enumerate(attr_gens(orig)):
        toolbox.register(f"a{j}", f)
    toolbox.register("individual", tools.initCycle, creator.Individual,
                     tuple(toolbox.__getattribute__(f"a{j}") for j in range(len(MUT_IDX))), n=1)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("evaluate", fitness_factory(orig))
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutGaussian, mu=0, sigma=0.05, indpb=0.2)
    toolbox.register("select", tools.selTournament, tournsize=3)

    pop = toolbox.population(n=GA_POP_SIZE)
    for gen in range(GA_GENERATIONS):
        invalid = [ind for ind in pop if not ind.fitness.valid]
        for ind in invalid:
            ind.fitness.values = toolbox.evaluate(ind)
        # evolution
        offspring = toolbox.select(pop, len(pop))
        offspring = list(map(toolbox.clone, offspring))
        for c1, c2 in zip(offspring[::2], offspring[1::2]):
            if random.random() < 0.6:
                toolbox.mate(c1, c2)
                del c1.fitness.values, c2.fitness.values
        for mut in offspring:
            if random.random() < 0.3:
                toolbox.mutate(mut)
                del mut.fitness.values
                mut[:] = [np.clip(g, 0, 1) for g in mut]
        # evaluate new individuals
        invalid = [i for i in offspring if not i.fitness.valid]
        for ind in invalid:
            ind.fitness.values = toolbox.evaluate(ind)
        pop[:] = offspring
        best = tools.selBest(pop, 1)[0]
        print(f"   Gen {gen+1}/{GA_GENERATIONS}: best (1-P)= {best.fitness.values[0]:.3f}, dist= {-best.fitness.values[1]:.4f}")
    return tools.selBest(pop, 1)[0]




In [24]:
sel_idx = np.random.choice(X_test.shape[0], size=min(100, X_test.shape[0]), replace=False)

success, delta_sum = 0, 0.0
records = []
print("\n>> Running GA on 100 random test flows\n")
for k, idx in enumerate(sel_idx, 1):
    orig = X_test[idx].copy()
    p_b = predict_proba(orig)
    best = run_ga(orig)
    mut = orig.copy()
    for g, fi in zip(best, MUT_IDX):
        mut[fi] = g
    mut = enforce_constraints(mut)
    p_a = predict_proba(mut)

    success_flag = label_from_proba(p_b) == 1 and label_from_proba(p_a) == 0
    if success_flag:
        success += 1
    delta = p_b[1] - p_a[1]
    delta_sum += delta

    status = "SUCCESS" if success_flag else "FAIL   "
    arrow = "↓" if delta >= 0 else "↑"
    print(f"#{k:03d}  {status}  P_attack {p_b[1]:.3f} → {p_a[1]:.3f}  ({arrow} {abs(delta):.3f})")

    records.append({"idx": int(idx), "prob_before": float(p_b[1]), "prob_after": float(p_a[1]), "distortion": float(np.linalg.norm(mut - orig)), "success": int(success_flag)})

rate = 100 * success / len(records)
mean_delta = delta_sum / len(records)
print(f"\n✅  GA evaded {success}/{len(records)} flows ({rate:.1f} %)   mean ΔP = {mean_delta:.2f}\n")

pd.DataFrame(records).to_csv("ga_rf_100samples.csv", index=False)



>> Running GA on 100 random test flows

   Gen 1/10: best (1-P)= 1.000, dist= 0.2396
   Gen 2/10: best (1-P)= 1.000, dist= 0.2318
   Gen 3/10: best (1-P)= 1.000, dist= 0.1862
   Gen 4/10: best (1-P)= 1.000, dist= 0.1862
   Gen 5/10: best (1-P)= 1.000, dist= 0.1767
   Gen 6/10: best (1-P)= 1.000, dist= 0.1582
   Gen 7/10: best (1-P)= 1.000, dist= 0.1552
   Gen 8/10: best (1-P)= 1.000, dist= 0.1345
   Gen 9/10: best (1-P)= 1.000, dist= 0.1291
   Gen 10/10: best (1-P)= 1.000, dist= 0.1338
#001  FAIL     P_attack 0.000 → 0.000  (↓ 0.000)
   Gen 1/10: best (1-P)= 0.980, dist= 0.3319
   Gen 2/10: best (1-P)= 1.000, dist= 0.3896
   Gen 3/10: best (1-P)= 1.000, dist= 0.3702
   Gen 4/10: best (1-P)= 1.000, dist= 0.3326
   Gen 5/10: best (1-P)= 1.000, dist= 0.2972
   Gen 6/10: best (1-P)= 1.000, dist= 0.2529
   Gen 7/10: best (1-P)= 1.000, dist= 0.1973
   Gen 8/10: best (1-P)= 1.000, dist= 0.2064
   Gen 9/10: best (1-P)= 1.000, dist= 0.1982
   Gen 10/10: best (1-P)= 1.000, dist= 0.1902
#002  FA