# Proposed Approach implementation on the sampled data
## -------------------------------------------------------------------------------

## -----  MLP1L -----

### MLP1L using train/seen set

In [108]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20       # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if already misclassified
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=5, success=True, queries=25, l2_mag=1102.584716796875
Sample idx 1: true=0, adv=2, success=True, queries=25, l2_mag=1117.5777587890625
Sample idx 2: true=0, adv=7, success=True, queries=25, l2_mag=790.7255859375
Sample idx 3: true=0, adv=2, success=True, queries=25, l2_mag=864.9415893554688
Sample idx 4: true=0, adv=7, success=True, queries=24, l2_mag=352.40032958984375
Sample idx 5: true=0, adv=2, success=True, queries=26, l2_mag=1322.10400390625
Sample idx 6: true=0, adv=7, success=True, queries=25, l2_mag=931.2673950195312
Sample idx 7: true=0, adv=2, success=True, queries=25, l2_mag=1008.915771484375
Sample idx 8: true=0, adv=2, success=True, queries=25, l2_mag=909.7268676757812
Sample idx 9: true=0, adv=2, success=True, queries=24, l2_mag=347.5859680175781
Sample idx 10: true=0, adv=5, success=True, queries=25, l2_mag=1148.36181640625
Sample idx 11: true=0, adv=2, success=True, queries=24, l2_mag=458.094970703125
Sample idx 12: true=0, 

### MLP1L using test/unseen set

In [110]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20       # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if already misclassified
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 1: true=0, adv=7, success=True, queries=26, l2_mag=1320.380615234375
Sample idx 2: true=0, adv=2, success=True, queries=25, l2_mag=1110.315673828125
Sample idx 3: true=0, adv=2, success=True, queries=25, l2_mag=997.7955932617188
Sample idx 4: true=0, adv=5, success=True, queries=26, l2_mag=1174.3372802734375
Sample idx 5: true=0, adv=2, success=True, queries=25, l2_mag=592.6128540039062
Sample idx 6: true=0, adv=7, success=True, queries=25, l2_mag=637.62841796875
Sample idx 7: true=0, adv=6, success=True, queries=25, l2_mag=978.5576171875
Sample idx 8: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 9: true=0, adv=2, success=True, queries=25, l2_mag=973.7612915039062
Sample idx 10: true=0, adv=2, success=True, queries=24, l2_mag=505.8062744140625
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=5, success=True, queries=26

## -----  MLP2L -----

### MLP2L using train/seen set

In [113]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20       # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if already misclassified
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 1: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 2: true=0, adv=6, success=True, queries=26, l2_mag=1221.05615234375
Sample idx 3: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 4: true=0, adv=6, success=True, queries=25, l2_mag=612.8156127929688
Sample idx 5: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 6: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 7: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 8: true=0, adv=6, success=True, queries=25, l2_mag=1130.98583984375
Sample idx 9: true=0, adv=8, success=True, queries=25, l2_mag=903.9076538085938
Sample idx 10: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 11: true=0, adv=2, success=True, queries=25, l2_mag=778.44140625
Sample idx 12: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 13: true=0, ad

### MLP2L using test/unseen set

In [115]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20       # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if already misclassified
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 1: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 2: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 3: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 4: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 5: true=0, adv=2, success=True, queries=26, l2_mag=1481.6021728515625
Sample idx 6: true=0, adv=7, success=True, queries=24, l2_mag=553.3986206054688
Sample idx 7: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 8: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 9: true=0, adv=None, success=False, queries=25, l2_mag=nan
Sample idx 10: true=0, adv=6, success=True, queries=26, l2_mag=1212.33447265625
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 13: true=0, adv=None, succes

## -----  CNN -----

### CNN using train/seen set

In [118]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
# Path to your scripted CNN model
MODEL_PATH = r"Models and Data splits/model_CNN.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/CNN_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1      # FGSM step size
MAX_ITERS  = 5        # FGSM iterations
BIN_STEPS  = 5        # binary-search iterations
MAX_L2     = 1500     # max allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the CNN (expects input of shape (B,1,28,28))
model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    """
    Convert a single image in [0,1]—either shape (784,) or (28,28)—
    into a torch.FloatTensor of shape (1,1,28,28) for the CNN.
    """
    arr = x01.astype(np.float32)
    if arr.ndim == 1:
        arr = arr.reshape(28, 28)
    # add channel and batch dims: (1,1,28,28)
    return torch.from_numpy(arr[None, None, :, :]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()     # (28,28)
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if the clean model is already wrong
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM on surrogate ———————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary‐search shrink —————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— quantize & final verify ——————————————————————
        x_uint8 = push_one_uint8(x_best, x0)
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# query‐count stats over successful attacks
query_counts = [r['queries'] for r in records if r['success']]
q_min = min(query_counts) if query_counts else 0
q_avg = sum(query_counts) / len(query_counts) if query_counts else 0.0
q_max = max(query_counts) if query_counts else 0

# L2‐magnitude stats over successful attacks
mags = [r['l2_mag'] for r in records if r['success']]
m_min = min(mags) if mags else float('nan')
m_avg = sum(mags) / len(mags) if mags else float('nan')
m_max = max(mags) if mags else float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples     : {total_trials}")
print(f"Successful attacks          : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude           : {mean_mag:.2f}")
print(f"Query count (min/avg/max)   : {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max)  : {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 1: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 2: true=0, adv=5, success=True, queries=11, l2_mag=1482.7457275390625
Sample idx 3: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 4: true=0, adv=2, success=True, queries=10, l2_mag=722.2471923828125
Sample idx 5: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 6: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 7: true=0, adv=8, success=True, queries=11, l2_mag=1354.8690185546875
Sample idx 8: true=0, adv=6, success=True, queries=11, l2_mag=1494.685546875
Sample idx 9: true=0, adv=8, success=True, queries=10, l2_mag=1076.226318359375
Sample idx 10: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 11: true=0, adv=6, success=True, queries=11, l2_mag=1460.2205810546875
Sample idx 12: true=0, adv=2, success=True, queries=11, l2_mag=1273.80456542968

### CNN using test/unseen set

In [120]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image

# ─────────────── PATHS ────────────────────────────────────────────────────
# Path to your scripted CNN model
MODEL_PATH = r"Models and Data splits/model_CNN.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/CNN_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1      # FGSM step size
MAX_ITERS  = 5        # FGSM iterations
BIN_STEPS  = 5        # binary-search iterations
MAX_L2     = 1500     # max allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the CNN (expects input of shape (B,1,28,28))
model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    """
    Convert a single image in [0,1]—either shape (784,) or (28,28)—
    into a torch.FloatTensor of shape (1,1,28,28) for the CNN.
    """
    arr = x01.astype(np.float32)
    if arr.ndim == 1:
        arr = arr.reshape(28, 28)
    # add channel and batch dims: (1,1,28,28)
    return torch.from_numpy(arr[None, None, :, :]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()     # (28,28)
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: torch.Tensor):
            query_count[0] += 1
            return model(x_tensor)

        # Skip if the clean model is already wrong
        pred0 = model_query(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM on surrogate ———————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)).argmax().item() != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary‐search shrink —————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)).argmax().item() != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— quantize & final verify ——————————————————————
        x_uint8 = push_one_uint8(x_best, x0)
        if model_query(to_model(x_uint8 / 255.0)).argmax().item() == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# query‐count stats over successful attacks
query_counts = [r['queries'] for r in records if r['success']]
q_min = min(query_counts) if query_counts else 0
q_avg = sum(query_counts) / len(query_counts) if query_counts else 0.0
q_max = max(query_counts) if query_counts else 0

# L2‐magnitude stats over successful attacks
mags = [r['l2_mag'] for r in records if r['success']]
m_min = min(mags) if mags else float('nan')
m_avg = sum(mags) / len(mags) if mags else float('nan')
m_max = max(mags) if mags else float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples     : {total_trials}")
print(f"Successful attacks          : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude           : {mean_mag:.2f}")
print(f"Query count (min/avg/max)   : {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max)  : {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 1: true=0, adv=3, success=True, queries=11, l2_mag=1452.1119384765625
Sample idx 2: true=0, adv=2, success=True, queries=11, l2_mag=1501.093994140625
Sample idx 3: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 4: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 5: true=0, adv=8, success=True, queries=11, l2_mag=1481.6396484375
Sample idx 6: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 7: true=0, adv=6, success=True, queries=11, l2_mag=1449.79345703125
Sample idx 8: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 9: true=0, adv=None, success=False, queries=10, l2_mag=nan
Sample idx 10: true=0, adv=2, success=True, queries=11, l2_mag=1363.768310546875
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=None, success=False, queries=11, l2_mag=nan
Sample idx 13: true=0

## -----  Random Forests (RF) -----

### RF using train/seen set

In [123]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_RF.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/RF_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=8, success=True, queries=26, l2_mag=1156.6927490234375
Sample idx 1: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 2: true=0, adv=2, success=True, queries=24, l2_mag=62.864933013916016
Sample idx 3: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 4: true=0, adv=6, success=True, queries=24, l2_mag=44.0
Sample idx 5: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 6: true=0, adv=None, success=False, queries=24, l2_mag=nan
Sample idx 7: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 8: true=0, adv=2, success=True, queries=25, l2_mag=1130.98583984375
Sample idx 9: true=0, adv=8, success=True, queries=24, l2_mag=108.70602416992188
Sample idx 10: true=0, adv=2, success=True, queries=24, l2_mag=477.80645751953125
Sample idx 11: true=0, adv=6, success=True, queries=24, l2_mag=283.5859680175781
Sample idx 12: true=0, adv=8, success=True, queries=24, l2_mag=310.5623779296875


### RF using test/unseen set

In [125]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_RF.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/RF_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 1: true=0, adv=2, success=True, queries=24, l2_mag=310.99517822265625
Sample idx 2: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 3: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 4: true=0, adv=6, success=True, queries=24, l2_mag=157.09869384765625
Sample idx 5: true=0, adv=8, success=True, queries=25, l2_mag=1031.1484375
Sample idx 6: true=0, adv=2, success=True, queries=24, l2_mag=221.35943603515625
Sample idx 7: true=0, adv=None, success=False, queries=26, l2_mag=nan
Sample idx 8: true=0, adv=None, success=False, queries=23, l2_mag=nan
Sample idx 9: true=0, adv=None, success=False, queries=24, l2_mag=nan
Sample idx 10: true=0, adv=8, success=True, queries=24, l2_mag=549.6107788085938
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=8, success=True, queries=26, l2_mag=1511.4449462890625
Sample idx

## -----  Decision Tree (DT) -----

### DT using train/seen set

In [128]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_DT.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/DT_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=8, success=True, queries=24, l2_mag=45.67274856567383
Sample idx 1: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 2: true=0, adv=9, success=True, queries=24, l2_mag=44.57577896118164
Sample idx 3: true=0, adv=2, success=True, queries=24, l2_mag=44.9110221862793
Sample idx 4: true=0, adv=5, success=True, queries=24, l2_mag=44.0
Sample idx 5: true=0, adv=5, success=True, queries=24, l2_mag=45.98912811279297
Sample idx 6: true=0, adv=4, success=True, queries=24, l2_mag=44.9777717590332
Sample idx 7: true=0, adv=8, success=True, queries=24, l2_mag=203.0517120361328
Sample idx 8: true=0, adv=5, success=True, queries=24, l2_mag=45.033321380615234
Sample idx 9: true=0, adv=3, success=True, queries=24, l2_mag=86.20904541015625
Sample idx 10: true=0, adv=5, success=True, queries=24, l2_mag=341.4747314453125
Sample idx 11: true=0, adv=8, success=True, queries=24, l2_mag=566.78564453125
Sample idx 12: true=0, adv=6, success=True, q

### DT using test/unseen set

In [130]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_DT.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/DT_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=8, success=True, queries=24, l2_mag=202.3931884765625
Sample idx 1: true=0, adv=3, success=True, queries=24, l2_mag=45.14421463012695
Sample idx 2: true=0, adv=8, success=True, queries=24, l2_mag=197.59555053710938
Sample idx 3: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 4: true=0, adv=8, success=True, queries=24, l2_mag=45.343135833740234
Sample idx 5: true=0, adv=6, success=True, queries=27, l2_mag=974.6152954101562
Sample idx 6: true=0, adv=2, success=True, queries=24, l2_mag=44.271888732910156
Sample idx 7: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 8: true=0, adv=5, success=True, queries=24, l2_mag=339.9705810546875
Sample idx 9: true=0, adv=6, success=True, queries=25, l2_mag=995.901611328125
Sample idx 10: true=0, adv=5, success=True, queries=24, l2_mag=44.03407669067383
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=2, success=True, quer

## -----  K-nearest Kneighbor (kNN) -----

### kNN using train/seen set

In [133]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_kNN.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_train.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/kNN_train"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 1: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 2: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 3: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 4: true=0, adv=6, success=True, queries=26, l2_mag=1198.7818603515625
Sample idx 5: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 6: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 7: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 8: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 9: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 10: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 13: true=0, adv=None, success=False, queries=6, l2_mag=nan

### kNN using test/unseen set

In [135]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier  # Import RF classifier

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_kNN.pkl"  # Update this to the path of your RF model
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/kNN_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space
# ───────────────────────────────────────────────────────────────────────────

# ───────── numerically-stable helpers ─────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

# Normalize to [0,1] if necessary
if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load the Random Forest model instead of MLP
model = joblib.load(MODEL_PATH)  # Load the RF model directly

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)  # Reshape for RF (expects 2D array)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

# ─────────── guarantee ≥1-grey-level change ───────────────────────────────
def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0

    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]

    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials  = 0
succ_total    = 0
misclassified = 0
records = []

# ───────────────────────── ATTACK LOOP ────────────────────────────────────
for digit in range(10):
    idxs = np.where(y == digit)[0][:50]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        query_count = [0]
        def model_query(x_tensor: np.ndarray):
            query_count[0] += 1
            return model.predict(x_tensor)[0]  # RF prediction

        # Skip if already misclassified
        pred0 = model_query(to_model(x0))
        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        total_trials += 1
        W, b = load_sur(y0)

        # —— iterative FGSM —————————————————————————
        x = x0.copy()
        for _ in range(MAX_ITERS):
            flat = x.reshape(-1)
            if W.shape[0] == 1:
                p    = sigmoid(W[0] @ flat + b[0])
                grad = W[0] * (p - 1)
            else:
                p    = softmax(W @ flat + b)
                oh   = np.zeros_like(p); oh[y0] = 1
                grad = W.T @ (p - oh)

            x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
            if model_query(to_model(x)) != y0:
                break
        else:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        # —— binary search —————————————————————————————
        d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
        for _ in range(BIN_STEPS):
            mid = (lo + hi) / 2
            xm  = np.clip(x0 + mid * d, 0.0, 1.0)
            if model_query(to_model(xm)) != y0:
                best, hi = mid, mid
            else:
                lo = mid
        x_best = np.clip(x0 + best * d, 0.0, 1.0)

        # —— enforce L2 ≤ MAX_L2 in pixel space ——————————————————
        delta = (x_best - x0).reshape(-1) * 255.0
        l2_raw = np.linalg.norm(delta)
        if l2_raw > MAX_L2:
            scale = MAX_L2 / l2_raw
            x_best = x0 + (x_best - x0) * scale

        # —— push to uint8 ————————————————————————————
        x_uint8 = push_one_uint8(
            x_best.reshape(28,28),
            x0.reshape(28,28)
        )

        # —— final verification ——————————————————————
        if model_query(to_model(x_uint8 / 255.0)) == y0:
            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'adv_label': None,
                'success': False,
                'queries': query_count[0],
                'l2_mag': np.nan
            })
            continue

        y_adv = int(model_query(to_model(x_uint8 / 255.0)))
        l2_final = np.linalg.norm(
            x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
        )

        succ_total += 1
        fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
        Image.fromarray(x_uint8, mode="L") \
             .save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx,
            'true_label': y0,
            'adv_label': y_adv,
            'success': True,
            'queries': query_count[0],
            'l2_mag': l2_final
        })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── print per-sample stats ───────────────────────────────────────
print("\nPer-sample stats:")
for rec in records:
    print(
        f"Sample idx {rec['sample_idx']}: true={rec['true_label']}, "
        f"adv={rec['adv_label']}, success={rec['success']}, "
        f"queries={rec['queries']}, l2_mag={rec['l2_mag']}"
    )

# ─────────── overall summary & new counters ───────────────────────────────
total = total_trials + misclassified
rate  = (succ_total / total_trials * 100) if total_trials > 0 else 0.0
mean_mag = np.nanmean([r['l2_mag'] for r in records if r['success']])

# --- new: query‐count stats over successful attacks ---
query_counts = [r['queries'] for r in records if r['success']]
if query_counts:
    q_min = min(query_counts)
    q_avg = sum(query_counts) / len(query_counts)
    q_max = max(query_counts)
else:
    q_min = q_avg = q_max = 0

# --- new: L2‐magnitude stats over successful attacks ---
mags = [r['l2_mag'] for r in records if r['success']]
if mags:
    m_min = min(mags)
    m_avg = sum(mags) / len(mags)
    m_max = max(mags)
else:
    m_min = m_avg = m_max = float('nan')

print("\n===== Attack Summary =====")
print(f"Total attempted samples : {total_trials}")
print(f"Successful attacks      : {succ_total} ({rate:.1f}%)")
print(f"Mean L2 magnitude       : {mean_mag:.2f}")
print(f"Query counts (min/avg/max): {q_min}/{q_avg:.1f}/{q_max}")
print(f"L2 magnitude (min/avg/max): {m_min:.2f}/{m_avg:.2f}/{m_max:.2f}")
print(f"Per-sample stats saved to CSV: {csv_path}")





Per-sample stats:
Sample idx 0: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 1: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 2: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 3: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 4: true=0, adv=None, success=False, queries=27, l2_mag=nan
Sample idx 5: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 6: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 7: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 8: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 9: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 10: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 11: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 12: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx 13: true=0, adv=None, success=False, queries=6, l2_mag=nan
Sample idx