In [1]:
# =========================
# chargement de donnees
# =========================
import os
from zipfile import ZipFile
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
from google.colab import drive

# 1) Monter Google Drive
drive.mount('/content/drive')

# 2) Chemin du zip
zip_path = "/content/drive/MyDrive/Colab Notebooks/COVID-19_Radiography_dataset.zip"
extract_path = "/content/COVID-19_Radiography_Dataset"

# 3) D√©zipper si besoin
if not os.path.exists(extract_path):
    with ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall("/content/")



Mounted at /content/drive


In [2]:
# =========================
# R√©organisation (2 classes)
# =========================
import shutil

prepared_data_dir = "/content/covid_data_prepared"
os.makedirs(prepared_data_dir, exist_ok=True)


classes = ["COVID", "Normal"]

def find_src_dir(cls):
    
    candidates = [
        os.path.join("/content/COVID-19_Radiography_Dataset", cls, "images"),
        os.path.join("/content/COVID-19_Radiography_Dataset", cls),
        os.path.join("/content", cls, "images"),
        os.path.join("/content", cls),
    ]
    for p in candidates:
        if os.path.isdir(p):
            return p
    raise FileNotFoundError(f"Dossier images introuvable pour la classe '{cls}'.")

copied_counts = {}
for cls in classes:
    src_img_dir = find_src_dir(cls)
    dst_class_dir = os.path.join(prepared_data_dir, cls.replace(" ", "_"))
    os.makedirs(dst_class_dir, exist_ok=True)

   
    exts = {".png", ".jpg", ".jpeg", ".bmp"}
    n = 0
    for filename in os.listdir(src_img_dir):
        if os.path.splitext(filename.lower())[1] in exts:
            shutil.copy(os.path.join(src_img_dir, filename),
                        os.path.join(dst_class_dir, filename))
            n += 1
    copied_counts[cls] = n

print("‚úÖ R√©organisation termin√©e. Structure ImageFolder pr√™te.")
print("Comptes copi√©s:", copied_counts)

‚úÖ R√©organisation termin√©e. Structure ImageFolder pr√™te.
Comptes copi√©s: {'COVID': 3616, 'Normal': 10192}


In [3]:
import os, random, numpy as np, torch
# =========================
# Param√®tres globaux, Seeds & Device
# =========================

# -- chemins --
prepared_data_dir = "/content/covid_data_prepared" 

# -- hyper g√©n√©raux --
SEED = 42
IMG_SIZE = 224
BATCH_TRAIN = 32
BATCH_EVAL  = 128
WORKERS = 2   

# -- reproductibilit√© & perf --
def set_seeds(seed=SEED):
    random.seed(seed); np.random.seed(seed)
    torch.manual_seed(seed); torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

set_seeds()

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", DEVICE)


Device: cuda


In [4]:
from torchvision import datasets
# =========================
# Bloc 4 ‚Äî R√©duction √©quilibr√©e + splits (identique baseline)
# =========================

PER_CLASS = 500     
VAL_FRAC  = 0.10      
TEST_FRAC = 0.20     
rng = np.random.RandomState(SEED)

base = datasets.ImageFolder(prepared_data_dir)
name_to_idx = base.class_to_idx
KEEP_NAMES = ["COVID", "Normal"]
KEEP_IDX = [name_to_idx[n] for n in KEEP_NAMES]

# indices par classe (2 classes)
idxs_by_class = {ci: [] for ci in KEEP_IDX}
for i, (_, y) in enumerate(base.samples):
    if y in KEEP_IDX:
        idxs_by_class[y].append(i)

# r√©duction √©quilibr√©e
kept_by_class = {}
for c, idxs in idxs_by_class.items():
    idxs = np.array(idxs); rng.shuffle(idxs)
    k = min(PER_CLASS, len(idxs))
    kept_by_class[c] = idxs[:k]

print("Apr√®s r√©duction :",
      {base.classes[c]: len(kept_by_class[c]) for c in KEEP_IDX},
      "Total:", sum(len(v) for v in kept_by_class.values()))

# split stratifi√© sur les indices R√âDUITS
train_idx, val_idx, test_idx = [], [], []
for c in KEEP_IDX:
    idxs = kept_by_class[c].copy(); rng.shuffle(idxs)
    n = len(idxs)
    n_test = int(round(n * TEST_FRAC))
    n_val  = int(round((n - n_test) * VAL_FRAC))
    test_idx.extend(idxs[:n_test].tolist())
    val_idx.extend(idxs[n_test:n_test+n_val].tolist())
    train_idx.extend(idxs[n_test+n_val:].tolist())

print(f"Splits -> train:{len(train_idx)} | val:{len(val_idx)} | test:{len(test_idx)}")


Apr√®s r√©duction : {'COVID': 500, 'Normal': 500} Total: 1000
Splits -> train:720 | val:80 | test:200


In [5]:
# =========================
# Bloc 5 ‚Äî DataLoaders (identique baseline)
# =========================
from torchvision import transforms, datasets
from torch.utils.data import Subset, DataLoader

MEAN = [0.485, 0.456, 0.406]
STD  = [0.229, 0.224, 0.225]

train_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])
eval_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])
# pour attaques : pas de Normalize
attack_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
])

base_train  = datasets.ImageFolder(prepared_data_dir, transform=train_tfms)
base_eval   = datasets.ImageFolder(prepared_data_dir, transform=eval_tfms)
base_attack = datasets.ImageFolder(prepared_data_dir, transform=attack_tfms)

train_ds       = Subset(base_train,  train_idx)
val_ds         = Subset(base_eval,   val_idx)
test_ds        = Subset(base_eval,   test_idx)
attack_test_ds = Subset(base_attack, test_idx)

loader_kwargs = dict(pin_memory=(DEVICE.type=="cuda"))
if WORKERS > 0:
    loader_kwargs.update(num_workers=WORKERS, persistent_workers=True, prefetch_factor=2)

train_loader       = DataLoader(train_ds,       batch_size=BATCH_TRAIN, shuffle=True,  **loader_kwargs)
val_loader         = DataLoader(val_ds,         batch_size=BATCH_EVAL,  shuffle=False, **loader_kwargs)
test_loader        = DataLoader(test_ds,        batch_size=BATCH_EVAL,  shuffle=False, **loader_kwargs)
attack_test_loader = DataLoader(attack_test_ds, batch_size=BATCH_EVAL,  shuffle=False, **loader_kwargs)

print("‚úÖ DataLoaders pr√™ts.")


‚úÖ DataLoaders pr√™ts.


In [6]:
# =========================
# Bloc 6 ‚Äî Mod√®le CNN (identique baseline)
# =========================
import torch.nn as nn
import torch.nn.functional as F

class SimpleCNN(nn.Module):
    def __init__(self, num_classes=2, dropout=0.3):
        super().__init__()
        self.b1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, padding=1, bias=False),
            nn.BatchNorm2d(32), nn.ReLU(inplace=True), nn.MaxPool2d(2)
        )
        self.b2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1, bias=False),
            nn.BatchNorm2d(64), nn.ReLU(inplace=True), nn.MaxPool2d(2)
        )
        self.b3 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1, bias=False),
            nn.BatchNorm2d(128), nn.ReLU(inplace=True), nn.MaxPool2d(2)
        )
        self.gap  = nn.AdaptiveAvgPool2d((1,1))
        self.drop = nn.Dropout(dropout)
        self.fc   = nn.Linear(128, num_classes)
        # init propre
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, nonlinearity="relu")
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight); nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight); nn.init.zeros_(m.bias)

    def forward(self, x):
        x = self.b1(x); x = self.b2(x); x = self.b3(x)
        x = self.gap(x); x = torch.flatten(x, 1)
        x = self.drop(x); x = self.fc(x)
        return x

    def extract_features(self, x):
        x = self.b1(x); x = self.b2(x); x = self.b3(x)
        x = self.gap(x); return torch.flatten(x, 1)

model = SimpleCNN(num_classes=2, dropout=0.3).to(DEVICE)
model = model.to(memory_format=torch.channels_last)


In [7]:
import time, copy, torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)
EPOCHS, PATIENCE = 30, 5
best_val, no_impr = float("inf"), 0
best_state = copy.deepcopy(model.state_dict())
scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE.type=="cuda"))

def evaluate(model, loader):
    model.eval(); loss_sum=0.0; n=0; correct=0
    with torch.no_grad():
        for x,y in loader:
            x = x.to(DEVICE, memory_format=torch.channels_last, non_blocking=True)
            y = y.to(DEVICE)
            with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):
                logits = model(x); loss = criterion(logits, y)
            loss_sum += loss.item()*y.size(0); n+=y.size(0)
            correct += (logits.argmax(1)==y).sum().item()
    return loss_sum/n, correct/n

for epoch in range(1, EPOCHS+1):
    model.train(); t0=time.time(); run_loss=0.0; n=0
    for x,y in train_loader:
        x = x.to(DEVICE, memory_format=torch.channels_last, non_blocking=True)
        y = y.to(DEVICE)
        optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):
            logits = model(x); loss = criterion(logits, y)
        scaler.scale(loss).backward(); scaler.step(optimizer); scaler.update()
        run_loss += loss.item()*y.size(0); n += y.size(0)
    train_loss = run_loss/n
    val_loss, val_acc = evaluate(model, val_loader)
    print(f"Epoch {epoch:02d} | train_loss={train_loss:.4f} | val_loss={val_loss:.4f} | val_acc={val_acc:.3f} | {time.time()-t0:.1f}s")
    if val_loss < best_val - 1e-4:
        best_val = val_loss; no_impr = 0
        best_state = copy.deepcopy(model.state_dict())
    else:
        no_impr += 1
        if no_impr >= PATIENCE:
            print("Early stopping."); break

# recharge meilleur √©tat et √©value sur test propre
model.load_state_dict(best_state); model.eval()
y_true, y_pred, y_prob = [], [], []
with torch.no_grad():
    for x,y in test_loader:
        x = x.to(DEVICE, memory_format=torch.channels_last, non_blocking=True)
        logits = model(x)
        probs = torch.softmax(logits, dim=1)[:,1].cpu().numpy()
        y_prob += probs.tolist()
        y_pred += logits.argmax(1).cpu().numpy().tolist()
        y_true += y.numpy().tolist()

print("Test | acc:{:.4f} | prec:{:.4f} | rec:{:.4f} | f1:{:.4f} | auc:{:.4f}".format(
    accuracy_score(y_true, y_pred),
    precision_score(y_true, y_pred, zero_division=0),
    recall_score(y_true, y_pred, zero_division=0),
    f1_score(y_true, y_pred, zero_division=0),
    roc_auc_score(y_true, y_prob),
))


  scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE.type=="cuda"))
  with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):
  with torch.cuda.amp.autocast(enabled=(DEVICE.type=="cuda")):


Epoch 01 | train_loss=0.6609 | val_loss=0.8580 | val_acc=0.537 | 5.3s
Epoch 02 | train_loss=0.6109 | val_loss=0.6917 | val_acc=0.662 | 3.2s
Epoch 03 | train_loss=0.5808 | val_loss=0.6796 | val_acc=0.662 | 4.3s
Epoch 04 | train_loss=0.5849 | val_loss=0.7301 | val_acc=0.625 | 3.3s
Epoch 05 | train_loss=0.5774 | val_loss=0.6199 | val_acc=0.675 | 4.2s
Epoch 06 | train_loss=0.5577 | val_loss=0.5801 | val_acc=0.725 | 4.2s
Epoch 07 | train_loss=0.5370 | val_loss=0.6315 | val_acc=0.688 | 3.1s
Epoch 08 | train_loss=0.5077 | val_loss=0.6278 | val_acc=0.688 | 3.0s
Epoch 09 | train_loss=0.5081 | val_loss=0.5744 | val_acc=0.675 | 2.9s
Epoch 10 | train_loss=0.5279 | val_loss=0.6960 | val_acc=0.675 | 3.4s
Epoch 11 | train_loss=0.5016 | val_loss=0.5294 | val_acc=0.738 | 3.7s
Epoch 12 | train_loss=0.4854 | val_loss=0.5454 | val_acc=0.713 | 2.9s
Epoch 13 | train_loss=0.4774 | val_loss=1.5357 | val_acc=0.562 | 2.8s
Epoch 14 | train_loss=0.4868 | val_loss=0.6031 | val_acc=0.675 | 2.8s
Epoch 15 | train_los

In [8]:
# =========================
# Bloc 7 ‚Äî Jeux d‚Äôattaque (sans Normalize) + conversion NumPy
# =========================
from torch.utils.data import Subset, DataLoader
import numpy as np
import torch


attack_train_ds = Subset(base_attack, train_idx)
attack_train_loader = DataLoader(
    attack_train_ds, batch_size=BATCH_EVAL, shuffle=False, **loader_kwargs
)

def loader_to_numpy(loader):
    xs, ys = [], []
    for x, y in loader:
        xs.append(x.numpy())  
        ys.append(y.numpy())
    X = np.concatenate(xs, axis=0).astype(np.float32)
    y = np.concatenate(ys, axis=0).astype(np.int64)
    return X, y

X_clean_train_np, y_clean_train_np = loader_to_numpy(attack_train_loader)
X_clean_test_np,  y_clean_test_np  = loader_to_numpy(attack_test_loader)

print("Shapes (clean) -> train:", X_clean_train_np.shape, " test:", X_clean_test_np.shape)



Shapes (clean) -> train: (720, 3, 224, 224)  test: (200, 3, 224, 224)


In [9]:
# =========================
# Bloc 8 ‚Äî Attaques ART (FGSM / PGD / BIM / C&W) sur images en [0,1]
#        + Wrapper ART avec preprocessing 
# =========================
!pip -q install adversarial-robustness-toolbox==1.17.1

from art.estimators.classification import PyTorchClassifier
from art.attacks.evasion import FastGradientMethod, ProjectedGradientDescent, BasicIterativeMethod, CarliniL2Method


MEAN_VEC = np.array(MEAN, dtype=np.float32)   
STD_VEC  = np.array(STD,  dtype=np.float32)

model.eval()  
art_classifier = PyTorchClassifier(
    model=model,
    loss=criterion,
    optimizer=optimizer,  
    input_shape=(3, IMG_SIZE, IMG_SIZE),
    nb_classes=2,
    clip_values=(0.0, 1.0),           
    preprocessing=(MEAN_VEC, STD_VEC) 
)

# Grille d‚Äôattaques

ATTACK_GRID_TRAIN = {
    "FGSM": {"eps_list": [4/255]},
    "PGD":  {"eps_list": [4/255], "steps": 10, "step_frac": 0.25},  # step = eps/4
    "BIM":  {"eps_list": [4/255], "steps": 7,  "step_frac": 0.10},  # step = eps/10
    "CW":   {"initial_const": [0.3]}
    
}

ATTACK_GRID_TEST = {
    "FGSM": {"eps_list": [4/255, 8/255]},
    "PGD":  {"eps_list": [4/255, 8/255], "steps": 40, "step_frac": 0.25},
    "BIM":  {"eps_list": [4/255, 8/255], "steps": 10, "step_frac": 0.10},
    "CW":   {"initial_const": [0.3]}  
}


def generate_adv_set(art_clf, X_np, y_np, attack_name, **kwargs):
    if attack_name == "FGSM":
        outs, ys = [], []
        for eps in kwargs["eps_list"]:
            atk = FastGradientMethod(estimator=art_clf, eps=eps, batch_size=BATCH_EVAL)
            outs.append(atk.generate(X_np))
            ys.append(y_np)
        return np.concatenate(outs, 0), np.concatenate(ys, 0), np.array([f"FGSM@{eps:.5f}" for eps in kwargs["eps_list"] for _ in range(len(y_np))])
    if attack_name == "PGD":
        outs, ys, tags = [], [], []
        for eps in kwargs["eps_list"]:
            step = eps * kwargs.get("step_frac", 0.25)
            atk = ProjectedGradientDescent(estimator=art_clf, eps=eps, eps_step=step,
                                           max_iter=kwargs.get("steps", 40), targeted=False,
                                           num_random_init=1, batch_size=BATCH_EVAL)
            adv = atk.generate(X_np)
            outs.append(adv); ys.append(y_np); tags += [f"PGD@{eps:.5f}"] * len(y_np)
        return np.concatenate(outs, 0), np.concatenate(ys, 0), np.array(tags)
    if attack_name == "BIM":
        outs, ys, tags = [], [], []
        for eps in kwargs["eps_list"]:
            step = eps * kwargs.get("step_frac", 0.10)
            atk = BasicIterativeMethod(estimator=art_clf, eps=eps, eps_step=step,
                                       max_iter=kwargs.get("steps",10), targeted=False,
                                       batch_size=BATCH_EVAL)
            adv = atk.generate(X_np)
            outs.append(adv); ys.append(y_np); tags += [f"BIM@{eps:.5f}"] * len(y_np)
        return np.concatenate(outs, 0), np.concatenate(ys, 0), np.array(tags)
    if attack_name == "CW":
        outs, ys, tags = [], [], []
        for c0 in kwargs["initial_const"]:
            atk = CarliniL2Method(classifier=art_clf, initial_const=c0,
                                  max_iter=20, learning_rate=0.01, targeted=False,
                                  batch_size=BATCH_EVAL)
            adv = atk.generate(X_np)
            outs.append(adv); ys.append(y_np); tags += [f"CW@{c0:.2f}"] * len(y_np)
        return np.concatenate(outs, 0), np.concatenate(ys, 0), np.array(tags)
    raise ValueError("Attack inconnue:", attack_name)

def build_mixed_adv(art_clf, X_np, y_np, grid):
    XX, yy, src = [], [], []
    for name, cfg in grid.items():
        Xa, ya, tags = generate_adv_set(art_clf, X_np, y_np, name, **cfg)
        XX.append(Xa); yy.append(ya); src.append(tags)
    return np.concatenate(XX, 0), np.concatenate(yy, 0), np.concatenate(src, 0)

print("‚ö° G√©n√©ration adversaires pour TRAIN (d√©tecteur, grille light)...")
X_adv_train_np, y_adv_train_np, src_train = build_mixed_adv(art_classifier, X_clean_train_np, y_clean_train_np, ATTACK_GRID_TRAIN)





[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.4/1.7 MB[0m [31m11.8 MB/s[0m eta [36m0:00:01[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.0/1.7 MB[0m [31m24.3 MB/s[0m eta [36m0:00:01[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.1/1.7 MB[0m [31m10.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.7/1.7 MB[0m [31m13

PGD - Batches:   0%|          | 0/6 [00:00<?, ?it/s]

PGD - Batches:   0%|          | 0/6 [00:00<?, ?it/s]

In [10]:
print("‚ö° G√©n√©ration adversaires pour TEST (d√©tecteur & pipeline, grille compl√®te)...")
X_adv_test_np,  y_adv_test_np,  src_test  = build_mixed_adv(art_classifier, X_clean_test_np,  y_clean_test_np,  ATTACK_GRID_TEST)
print("Adversaires -> train:", X_adv_train_np.shape, " test:", X_adv_test_np.shape)

‚ö° G√©n√©ration adversaires pour TEST (d√©tecteur & pipeline, grille compl√®te)...


PGD - Batches:   0%|          | 0/2 [00:00<?, ?it/s]

PGD - Batches:   0%|          | 0/2 [00:00<?, ?it/s]

PGD - Batches:   0%|          | 0/2 [00:00<?, ?it/s]

PGD - Batches:   0%|          | 0/2 [00:00<?, ?it/s]

C&W L_2:   0%|          | 0/2 [00:00<?, ?it/s]

Adversaires -> train: (2160, 3, 224, 224)  test: (1400, 3, 224, 224)


In [11]:
# =========================
# Bloc 9 ‚Äî D√©tecteur : embeddings du CNN + MLP binaire
# =========================
import torch.nn as nn
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix

def normalize_np_for_model(X_np):
    return (X_np - MEAN_VEC[None, :, None, None]) / STD_VEC[None, :, None, None]

@torch.no_grad()
def extract_embeddings(model, X_np, bs=256):
    model.eval()
    embs = []
    for i in range(0, len(X_np), bs):
        xb = torch.from_numpy(normalize_np_for_model(X_np[i:i+bs])).to(DEVICE)
        xb = xb.to(memory_format=torch.channels_last, non_blocking=True)
        eb = model.extract_features(xb).float().cpu().numpy()
        embs.append(eb)
    return np.concatenate(embs, axis=0).astype(np.float32)

@torch.no_grad()
def predict_classes(model, X_np, bs=256):
    model.eval()
    preds, probs = [], []
    for i in range(0, len(X_np), bs):
        xb = torch.from_numpy(normalize_np_for_model(X_np[i:i+bs])).to(DEVICE)
        xb = xb.to(memory_format=torch.channels_last, non_blocking=True)
        logits = model(xb)
        pb = torch.softmax(logits, dim=1)[:, 1].cpu().numpy()
        yh = logits.argmax(1).cpu().numpy()
        probs.append(pb); preds.append(yh)
    return np.concatenate(preds), np.concatenate(probs)

# Embeddings pour entra√Ænement du d√©tecteur
Xemb_clean_tr = extract_embeddings(model, X_clean_train_np)
Xemb_adv_tr   = extract_embeddings(model, X_adv_train_np)
Xdet_tr = np.vstack([Xemb_clean_tr, Xemb_adv_tr])
ydet_tr = np.concatenate([np.zeros(len(Xemb_clean_tr), dtype=np.int64),
                          np.ones(len(Xemb_adv_tr),   dtype=np.int64)])

# Embeddings pour test du d√©tecteur
Xemb_clean_te = extract_embeddings(model, X_clean_test_np)
Xemb_adv_te   = extract_embeddings(model, X_adv_test_np)
Xdet_te = np.vstack([Xemb_clean_te, Xemb_adv_te])
ydet_te = np.concatenate([np.zeros(len(Xemb_clean_te), dtype=np.int64),
                          np.ones(len(Xemb_adv_te),   dtype=np.int64)])

class DetectorMLP(nn.Module):
    def __init__(self, in_dim, h=128, p=0.2):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_dim, h), nn.ReLU(inplace=True),
            nn.Dropout(p),
            nn.Linear(h, 1) 
        )
    def forward(self, x):
        return self.net(x).squeeze(1)

det_in = Xdet_tr.shape[1]
detector = DetectorMLP(det_in, h=128, p=0.2).to(DEVICE)
det_crit = nn.BCEWithLogitsLoss()
det_opt  = torch.optim.AdamW(detector.parameters(), lr=1e-3, weight_decay=1e-4)
EPOCHS_DET = 20

def as_loader_feats(X, y, bs=256, shuffle=False):
    ds = torch.utils.data.TensorDataset(
        torch.from_numpy(X).float(), torch.from_numpy(y).float()
    )
    return DataLoader(ds, batch_size=bs, shuffle=shuffle, num_workers=0)

det_tr_dl = as_loader_feats(Xdet_tr, ydet_tr, bs=256, shuffle=True)
det_te_dl = as_loader_feats(Xdet_te, ydet_te, bs=512, shuffle=False)

def train_detector_epoch(model, loader, opt, crit):
    model.train(); loss_sum=0.0; n=0
    for xb, yb in loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        opt.zero_grad()
        logits = model(xb)
        loss = crit(logits, yb)
        loss.backward(); opt.step()
        loss_sum += loss.item()*xb.size(0); n += xb.size(0)
    return loss_sum / max(1, n)

@torch.no_grad()
def eval_detector(model, loader, thr=0.5):
    model.eval(); ys, yh, yp = [], [], []
    for xb, yb in loader:
        xb, yb = xb.to(DEVICE), yb.to(DEVICE)
        logits = model(xb)
        prob = torch.sigmoid(logits)
        ys.append(yb.cpu().numpy())
        yp.append(prob.cpu().numpy())
        yh.append((prob > thr).float().cpu().numpy())
    y_true = np.concatenate(ys).astype(int).ravel()
    y_prob = np.concatenate(yp).astype(float).ravel()
    y_hat  = np.concatenate(yh).astype(int).ravel()
    acc  = accuracy_score(y_true, y_hat)
    prec = precision_score(y_true, y_hat, zero_division=0)
    rec  = recall_score(y_true, y_hat, zero_division=0)
    f1   = f1_score(y_true, y_hat, zero_division=0)
    try:
        auc = roc_auc_score(y_true, y_prob)
    except:
        auc = float("nan")
    cm = confusion_matrix(y_true, y_hat)  
    return acc, prec, rec, f1, auc, cm

print("üîß Entra√Ænement du d√©tecteur ...")
best_f1, best_state = -1, None
for ep in range(1, EPOCHS_DET+1):
    tr_loss = train_detector_epoch(detector, det_tr_dl, det_opt, det_crit)
    acc, prec, rec, f1, auc, cm = eval_detector(detector, det_te_dl, thr=0.5)
    print(f"[DET][{ep:02d}/{EPOCHS_DET}] loss_tr={tr_loss:.4f} | acc={acc:.3f} "
          f"prec={prec:.3f} rec={rec:.3f} f1={f1:.3f} auc={auc:.3f}")
    if f1 > best_f1:
        best_f1 = f1
        best_state = {k: v.cpu().clone() for k, v in detector.state_dict().items()}
# recharge meilleur d√©tecteur
detector.load_state_dict({k: v.to(DEVICE) for k, v in best_state.items()})

acc, prec, rec, f1, auc, cm = eval_detector(detector, det_te_dl, thr=0.5)
print("\nüìä D√©tecteur (TEST)")
print(f"Accuracy={acc:.3f}  Precision={prec:.3f}  Recall={rec:.3f}  F1={f1:.3f}  AUC={auc:.3f}")
print("Matrice de confusion [[TN FP],[FN TP]] =\n", cm)



üîß Entra√Ænement du d√©tecteur ...
[DET][01/20] loss_tr=0.5611 | acc=0.875 prec=0.875 rec=1.000 f1=0.933 auc=0.853
[DET][02/20] loss_tr=0.5092 | acc=0.876 prec=0.876 rec=1.000 f1=0.934 auc=0.867
[DET][03/20] loss_tr=0.4829 | acc=0.886 prec=0.885 rec=0.999 f1=0.939 auc=0.871
[DET][04/20] loss_tr=0.4570 | acc=0.882 prec=0.901 rec=0.973 f1=0.935 auc=0.878
[DET][05/20] loss_tr=0.4341 | acc=0.877 prec=0.909 rec=0.956 f1=0.932 auc=0.885
[DET][06/20] loss_tr=0.4104 | acc=0.872 prec=0.931 rec=0.922 f1=0.926 auc=0.896
[DET][07/20] loss_tr=0.3915 | acc=0.885 prec=0.924 rec=0.946 f1=0.935 auc=0.903
[DET][08/20] loss_tr=0.3717 | acc=0.882 prec=0.924 rec=0.944 f1=0.934 auc=0.910
[DET][09/20] loss_tr=0.3519 | acc=0.877 prec=0.939 rec=0.920 f1=0.929 auc=0.918
[DET][10/20] loss_tr=0.3389 | acc=0.874 prec=0.949 rec=0.904 f1=0.926 auc=0.924
[DET][11/20] loss_tr=0.3254 | acc=0.874 prec=0.948 rec=0.906 f1=0.927 auc=0.927
[DET][12/20] loss_tr=0.3095 | acc=0.877 prec=0.951 rec=0.906 f1=0.928 auc=0.930
[DE

In [12]:
# =========================
# Bloc 10 ‚Äî √âvaluation du pipeline global
#    √âtape 1: D√©tecteur -> 0 propre / 1 adversarial
#    √âtape 2: CNN si d√©tecteur=0
# =========================
# Construit un set TEST global : propres + adversaires
X_test_global = np.vstack([X_clean_test_np, X_adv_test_np])
y_is_adv      = np.concatenate([np.zeros(len(X_clean_test_np), dtype=np.int64),
                                np.ones(len(X_adv_test_np),  dtype=np.int64)])
y_true_cls    = np.concatenate([y_clean_test_np, y_adv_test_np])

# 1) D√©tection (sur embeddings)
Xemb_global = extract_embeddings(model, X_test_global)
with torch.no_grad():
    det_logits = []
    for i in range(0, len(Xemb_global), 512):
        xb = torch.from_numpy(Xemb_global[i:i+512]).to(DEVICE)
        det_logits.append(detector(xb).cpu().numpy())
    det_logits = np.concatenate(det_logits)
det_prob = 1.0 / (1.0 + np.exp(-det_logits))
det_pred = (det_prob > 0.5).astype(int)  # 1=adversarial, 0=propre

# 2) Classification CNN sur images accept√©es
accepted_mask = (det_pred == 0)
X_accepted    = X_test_global[accepted_mask]
y_true_acc    = y_true_cls[accepted_mask]
y_is_adv_acc  = y_is_adv[accepted_mask]
yhat_acc, _   = predict_classes(model, X_accepted)

# KPIs demand√©s
n_adv_total       = int((y_is_adv == 1).sum())
n_adv_blocked     = int(((y_is_adv == 1) & (det_pred == 1)).sum())
pct_adv_blocked   = 100.0 * n_adv_blocked / max(1, n_adv_total)

clean_acc_mask    = (y_is_adv_acc == 0)
n_clean_accepted  = int(clean_acc_mask.sum())
n_clean_correct   = int((yhat_acc[clean_acc_mask] == y_true_acc[clean_acc_mask]).sum())
pct_clean_correct = 100.0 * n_clean_correct / max(1, n_clean_accepted)

n_clean_total     = int((y_is_adv == 0).sum())
n_clean_blocked   = int(((y_is_adv == 0) & (det_pred == 1)).sum())
pct_false_rejects = 100.0 * n_clean_blocked / max(1, n_clean_total)

print("\nüîé Pipeline global (TEST √©tendu)")
print(f"‚Ä¢ % d‚Äôadversariales bloqu√©es               : {pct_adv_blocked:.2f}%  ({n_adv_blocked}/{n_adv_total})")
print(f"‚Ä¢ % d‚Äôimages propres correctement class√©es : {pct_clean_correct:.2f}%  ({n_clean_correct}/{max(1,n_clean_accepted)})")
print(f"‚Ä¢ % de faux rejets (propres bloqu√©es)      : {pct_false_rejects:.2f}%  ({n_clean_blocked}/{n_clean_total})")

# Breakdown par type d‚Äôattaque (sur la partie adversariale du test)
print("\nüìå Breakdown par type d'attaque (TEST adversarial) :")
start_adv = len(X_clean_test_np)
det_pred_adv = det_pred[start_adv:]
for tag in np.unique(src_test):
    m = (src_test == tag)
    n_tot = int(m.sum())
    n_blk = int((det_pred_adv[m] == 1).sum())
    print(f"- {tag:>10s}: bloqu√©es {n_blk}/{n_tot}  ({100.0*n_blk/max(1,n_tot):.1f}%)")



üîé Pipeline global (TEST √©tendu)
‚Ä¢ % d‚Äôadversariales bloqu√©es               : 92.71%  (1298/1400)
‚Ä¢ % d‚Äôimages propres correctement class√©es : 71.85%  (97/135)
‚Ä¢ % de faux rejets (propres bloqu√©es)      : 32.50%  (65/200)

üìå Breakdown par type d'attaque (TEST adversarial) :
- BIM@0.01569: bloqu√©es 199/200  (99.5%)
- BIM@0.03137: bloqu√©es 200/200  (100.0%)
-    CW@0.30: bloqu√©es 99/200  (49.5%)
- FGSM@0.01569: bloqu√©es 200/200  (100.0%)
- FGSM@0.03137: bloqu√©es 200/200  (100.0%)
- PGD@0.01569: bloqu√©es 200/200  (100.0%)
- PGD@0.03137: bloqu√©es 200/200  (100.0%)
