# ART APGD sample generation (TF2.10)

Generate APGD-CE adversarial samples using ART against:
- $\mathcal{U}^{(0)}$: baseline ensemble (4 ResNet + 4 VGG probability models)
- $\mathcal{U}^{(1)}$: first immunized generation ensemble

Outputs are saved under `data/adversarial_samples/apgdce_ART/` as `.npz`.

In [None]:
from __future__ import annotations

import os
from pathlib import Path
from typing import List, Tuple
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model
from sklearn.model_selection import train_test_split


In [None]:
from source.utils import load_yaml

PATHS = load_yaml("./configs/paths.yaml")
EXP   = load_yaml("./configs/exp.yaml")

data_root   = PATHS["data_root"]
tf_model_dir = PATHS["tf_model_dir"]
apgd_out    = PATHS["apgd_out"]

seed = int(EXP["seed"])
apgd_cfg = EXP["art_apgd"]
APGD_SETTINGS = {
    "weak":   (0.5, 0.2, 2,  4),
    "strong": (0.7, 0.2, 10, 4),
}
APGD_OUT = Path(apgd_out)
APGD_OUT.mkdir(parents=True, exist_ok=True)

tf_model_dir = Path(tf_model_dir)

np.random.seed(seed)
tf.random.set_seed(seed)

(x_all, y_all), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

y_all  = y_all.reshape(-1).astype(np.int64)
y_test = y_test.reshape(-1).astype(np.int64)

x_train, x_val, y_train, y_val = train_test_split(
    x_all, y_all,
    test_size=0.2,
    random_state=seed,
    stratify=y_all
)

# normalize to [0,1]
x_train = x_train.astype(np.float32) / 255.0
x_val   = x_val.astype(np.float32) / 255.0
x_test  = x_test.astype(np.float32) / 255.0

y_train = y_train.astype(np.int64)
y_val   = y_val.astype(np.int64)

from source.utils import make_train_chunks

splits: List[Tuple[str, np.ndarray, np.ndarray]] = []
splits.extend(make_train_chunks(x_train, y_train, chunk_size=10000))
splits.append(("val",  x_val,  y_val))
splits.append(("test", x_test, y_test))


## Load ensembles for $\mathcal{U}^{(0)}$

We load probability-output Keras models (`.keras`) and build an average-probability ensemble for ART.

In [None]:
U0_GLOB = [
    str(tf_model_dir / "resnet_*.keras"),
    str(tf_model_dir / "vgg*_raw.keras"),
]

In [None]:
from source.helpers03note import gen_for_ensemble, merge_train_chunks
from source.utils import collect_model_paths

## Generate APGD-CE samples for $\mathcal{U}^{(0)}$

We generate and save `.npz` for each split chunk to avoid memory issues.

In [None]:
# current config is the "weak" perturbation.
u0_paths = collect_model_paths(U0_GLOB)
gen_for_ensemble("U0", u0_paths, [APGD_SETTINGS['weak']], splits, APGD_OUT)
merge_train_chunks("U0", [APGD_SETTINGS['weak']], APGD_OUT)

# Get $\mathcal{U}^{(1)}$

Get specialist on weak-APGD against $U^{(0)}$

In [None]:
SPECIAL_DIR = Path("./data/specialized_models")
from source.utils import specialize

In [None]:
samples = {}
y_true_dict = {}
for p in os.listdir(APGD_OUT):
    d = np.load(APGD_OUT/Path(p))
    adv_name = 'APGD_weak' if d['max_iter'] == 2 else 'APGD_strong'
    split = str(d['split'])
    tag = str(d["tag"]) if "tag" in d.files else ""
    sample_name = f"{adv_name}_{tag}_{split}" if tag else f"{adv_name}_{split}"
    samples[sample_name] = d['x_adv']
    y_true_dict[sample_name] = d['y']

x_temp = np.concatenate([samples['APGD_weak_U0_train'],samples['APGD_weak_U0_val']],axis=0)
y_temp = np.concatenate([y_true_dict['APGD_weak_U0_train'],y_true_dict['APGD_weak_U0_val']],axis=0)

new_stratified_x_train, new_stratified_x_val, new_stratified_y_train, new_stratified_y_val = train_test_split(
        x_temp,
        y_temp,
        test_size=0.2,
        random_state=42,
        stratify=y_temp
    )

specialized_model_dict={}

model_dict = {}
for f_name in sorted(os.listdir("./data/models/")):
    if 'original' not in f_name and f_name.endswith('keras'):
        print(f_name)
        m = load_model(f"./data/models/{f_name}")
        model_dict[f_name] = m
        
adv_sample_name = 'APGD_weak'
for model_name, m in model_dict.items():
    model_name = model_name[:-6]
    model_name = model_name.split('_')[0] if model_name[0] == 'v' else model_name.split('_')[0] + model_name.split('_')[-1] + 'v'+model_name.split('_')[1][-1]
    info = specialize(
        (new_stratified_x_train, to_categorical(new_stratified_y_train,10)),
        (new_stratified_x_val, to_categorical(new_stratified_y_val,10)),
        m,
        new_model_path=f"{model_name}_{adv_sample_name}-SP")

# Reload models since above specialization may affect the loaded original models.
model_dict = {}
for f_name in sorted(os.listdir("./data/models/")):
    if 'original' not in f_name and f_name.endswith('keras'):
        print(f_name)
        m = load_model(f"./data/models/{f_name}")
        model_dict[f_name] = m

SPs_GLOB = [
    "./data/specialized_models/*.keras"
]

U1_GLOB = U0_GLOB + SPs_GLOB


## Generate more adversarial samples

Generate 
- strong against $U^{(0)}$
- weak / strong against $U^{(1)}$
- weak / strong against SPs

In [None]:


gen_for_ensemble("U0", u0_paths, [APGD_SETTINGS['strong']], splits, APGD_OUT)
merge_train_chunks("U0", [APGD_SETTINGS['strong']], APGD_OUT)
SPs_paths = collect_model_paths(SPs_GLOB)
u1_paths = collect_model_paths(U1_GLOB)

for apgd_settings in APGD_SETTINGS.values():
    apgd_settings = [apgd_settings]
    gen_for_ensemble("SPs", SPs_paths, apgd_settings, splits, APGD_OUT)
    merge_train_chunks("SPs", apgd_settings, APGD_OUT)

    gen_for_ensemble("U1", u1_paths, apgd_settings, splits, APGD_OUT)
    merge_train_chunks("U1", apgd_settings, APGD_OUT)