# Proposed Approach implementation on the sampled data
## -----------------------------------------------------------------

## -----  MLP1L -----

### MLP1L - 32

In [2]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L_32.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_32_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                          
0                              -  62 / 1145.3 / 26.4  100 / 833.5 / 24.4  97 / 1101.5 / 25.0  72 / 1143.7 / 25.8  100 / 801.9 / 24.3  100 / 600.7 / 24.3  100 / 747.4 / 24.6  66 / 1150.0 / 25.3   99 / 795.6 / 24.6
1              98 / 969.1 / 25.9                   -  100 / 465.5 / 23.4  100 / 492.5 / 23.5   99 / 856.1 / 25.0  100 / 668.2 / 24.2  100 / 392.4 / 23.6  100 / 349.5 / 23.5  100 / 571.5 / 23.8  100 / 409.8 / 23.6
2             77 / 1182.5 / 25.7   9 / 1223.0 / 25.9                   -  72 / 1026.6 / 24.8  38 / 1151.

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP1L - 64

In [18]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L_64.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_64_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                          
0                              -  68 / 1253.2 / 26.7  73 / 1097.3 / 24.9  95 / 1093.9 / 25.1  78 / 1142.4 / 25.8  96 / 1033.7 / 24.8  99 / 1012.6 / 25.1  100 / 794.3 / 24.6  30 / 1239.4 / 25.6   98 / 937.0 / 25.0
1             94 / 1186.1 / 26.4                   -  100 / 627.2 / 23.8  100 / 476.3 / 23.5   83 / 983.1 / 25.2  100 / 675.8 / 24.2  100 / 552.1 / 24.0  100 / 373.6 / 23.5  100 / 775.2 / 24.5  100 / 431.5 / 23.6
2             74 / 1268.2 / 25.8  67 / 1141.8 / 25.7                   -  100 / 805.0 / 24.4  80 / 1141.

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP1L - 128

In [36]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L_128.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_128_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                   8                    9
true_label                                                                                                                                                                                                           
0                              -  40 / 1335.2 / 26.8  99 / 1020.4 / 24.8  11 / 1228.5 / 25.4  43 / 1295.4 / 26.3  75 / 1185.6 / 25.2  96 / 1036.1 / 25.2  91 / 1177.7 / 25.4  14 / 1230.3 / 25.6   97 / 1075.1 / 25.3
1             93 / 1246.8 / 26.5                   -  100 / 569.4 / 23.9  100 / 601.4 / 23.9   56 / 957.9 / 25.2  82 / 1042.1 / 25.2  100 / 597.7 / 24.1  100 / 461.2 / 23.7   95 / 879.8 / 24.7   100 / 612.5 / 24.2
2             66 / 1276.8 / 25.8  14 / 1282.5 / 26.0                   -  59 / 1199.2 / 25.1  10 / 1

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP1L - 256

In [52]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1L_256.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1L_256_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                          
0                              -   2 / 1185.0 / 26.5   68 / 966.8 / 24.8  13 / 1173.8 / 25.4  25 / 1228.6 / 26.1  54 / 1186.8 / 25.3  80 / 1065.0 / 25.2  90 / 1117.2 / 25.3   2 / 1311.1 / 26.0  77 / 1182.4 / 25.5
1             99 / 1158.3 / 26.2                   -  100 / 569.5 / 23.9  100 / 890.2 / 24.5  13 / 1187.2 / 25.8  100 / 870.8 / 24.5  100 / 819.8 / 24.6  100 / 577.3 / 24.0  35 / 1138.6 / 25.5  100 / 632.5 / 24.2
2             36 / 1390.8 / 26.1    1 / 500.8 / 24.0                   -  49 / 1203.6 / 25.2            

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


## -----  MLP2L -----

### MLP2L - 32

In [69]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L_32.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_32_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                    3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                           
0                              -   3 / 1344.2 / 26.3  68 / 1054.6 / 24.9    7 / 1207.0 / 25.3  30 / 1268.0 / 26.3  28 / 1142.8 / 25.1   77 / 995.3 / 25.1  87 / 1162.0 / 25.4  38 / 1000.8 / 25.0  45 / 1167.3 / 25.4
1             24 / 1370.7 / 26.8                   -  100 / 784.5 / 24.2  100 / 1039.5 / 25.0   99 / 839.0 / 24.9   77 / 724.7 / 24.2  100 / 937.2 / 24.9  100 / 556.8 / 23.9  100 / 599.7 / 24.0  100 / 790.8 / 24.5
2             14 / 1415.2 / 26.1    1 / 344.6 / 24.0                   -   31 / 1240.1 / 25.4  24 / 

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP2L - 64

In [85]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L_64.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_64_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                    2                    3                   4                   5                    6                   7                   8                   9
true_label                                                                                                                                                                                                             
0                              -   6 / 1331.5 / 26.7   45 / 1213.4 / 25.2    3 / 1228.7 / 25.0  35 / 1312.4 / 26.3  25 / 1230.4 / 25.4   47 / 1263.3 / 25.6  39 / 1266.9 / 25.7  17 / 1185.9 / 25.4  41 / 1232.2 / 25.6
1             42 / 1349.9 / 26.7                   -  100 / 1000.4 / 24.7  100 / 1127.7 / 25.3  66 / 1229.0 / 25.9  96 / 1202.7 / 25.6  100 / 1138.5 / 25.3  100 / 841.8 / 24.5  90 / 1191.9 / 25.8  100 / 909.6 / 24.8
2             37 / 1337.0 / 25.9   8 / 1260.5 / 25.9                    -   61 / 1187.0 / 25

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP2L - 128

In [101]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L_128.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_128_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                    8                   9
true_label                                                                                                                                                                                                           
0                              -                   -  36 / 1147.9 / 25.1                   -  16 / 1346.7 / 26.4   9 / 1358.7 / 25.7  36 / 1266.7 / 25.6   1 / 1453.9 / 26.0    5 / 1416.6 / 25.8  18 / 1373.6 / 25.9
1             94 / 1106.6 / 26.1                   -  100 / 837.0 / 24.3  99 / 1005.7 / 24.7   78 / 968.1 / 25.1  94 / 1061.6 / 25.1  100 / 869.3 / 24.7  100 / 901.5 / 24.7  100 / 1037.5 / 25.2  100 / 767.9 / 24.5
2             45 / 1250.5 / 25.8                   -                   -  20 / 1317.1 / 25.6   6 / 1

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP2L - 256

In [117]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP2L_256.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP2L_256_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1     # FGSM step size
MAX_ITERS  = 5       # FGSM iterations
BIN_STEPS  = 20      # binary-search iterations
MAX_L2     = 1500    # maximum allowed L2 magnitude in pixel space

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    flat = x01.reshape(-1) if x01.ndim == 2 else x01
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

# Round and format L2 magnitude stats to 2 decimal places
mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                   0                   1                   2                   3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                          
0                              -   5 / 1310.7 / 26.8  36 / 1135.8 / 25.1   4 / 1302.1 / 26.2  35 / 1241.5 / 26.1   5 / 1337.1 / 25.6  37 / 1205.7 / 25.4  34 / 1279.7 / 25.8  17 / 1211.7 / 25.5  22 / 1211.5 / 25.5
1             84 / 1294.6 / 26.7                   -  100 / 845.5 / 24.3   99 / 912.1 / 24.6  100 / 862.0 / 24.9  92 / 1089.4 / 25.2  100 / 992.6 / 25.0  100 / 979.6 / 24.9  83 / 1034.3 / 25.1   99 / 972.3 / 25.1
2             20 / 1344.5 / 26.1                   -                   -  48 / 1235.5 / 25.4            

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### CNN

In [133]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate
from torchvision import transforms

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_CNN.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/CNN_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

# ──────────────── TRANSFORM FOR CNN INPUT ─────────────────────────────────
transform_28x28 = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((28, 28), interpolation=Image.BILINEAR),
    transforms.ToTensor()
])

# ───────────────────────────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning, message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x01: np.ndarray) -> torch.Tensor:
    if x01.ndim == 1:
        x01 = x01.reshape(28, 28)
    img = (x01 * 255).astype(np.uint8)
    tensor = transform_28x28(img)
    return tensor.unsqueeze(0).float()  # (1, 1, 28, 28)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix  = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src

                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)).argmax().item())
            l2_final = np.linalg.norm(
                x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28)
            )

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

# ─────────── enhanced summary matrix ──────────────────────────────────────
pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))





===== Targeted Attack Summary Matrix =====
target_label                  0                   1                   2                   3                   4                   5                  6                   7                   8                  9
true_label                                                                                                                                                                                                       
0                             -                   -  12 / 1464.2 / 25.9    1 / 993.4 / 25.0   5 / 1186.9 / 25.8   5 / 1421.0 / 25.8                  -   3 / 1374.0 / 25.7  23 / 1198.3 / 25.5   1 / 881.8 / 25.0
1                             -                   -  75 / 1294.0 / 25.7  10 / 1434.2 / 26.6  19 / 1243.8 / 25.8                   -  3 / 1125.2 / 25.0                   -  42 / 1205.9 / 25.5  1 / 1171.7 / 26.0
2                             -    1 / 855.2 / 25.0                   -   5 / 1406.2 / 25.6                   -   1 

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### RF

In [149]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_RF.pkl"  # RF model path
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/RF_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
    ez = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning, message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load RF model
rf_model = joblib.load(MODEL_PATH)

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32), s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi = np.rint(x_pix).astype(np.int16)
    sign = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = rf_model.predict(to_model(x0))[0]
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_arr: np.ndarray):
                query_count[0] += 1
                return rf_model.predict(x_arr)[0]

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)

                if model_query(to_model(x)) == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)) == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)) != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)))
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28))

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── Save Results & Display ───────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))




===== Targeted Attack Summary Matrix =====
target_label                  0                   2                   3                   4                   5                   6                  7                  8                  9
true_label                                                                                                                                                                                  
0                             -  18 / 1177.7 / 24.9   1 / 1377.0 / 25.0   3 / 1044.8 / 25.3                   -   8 / 1033.7 / 24.5                  -  24 / 868.4 / 24.7  1 / 1516.4 / 27.0
1             3 / 1314.2 / 26.3   96 / 163.9 / 23.1   75 / 446.8 / 23.7   75 / 609.3 / 24.4   57 / 413.8 / 23.7   62 / 551.3 / 24.0  31 / 578.5 / 23.9   99 / 96.2 / 23.1   3 / 614.3 / 24.3
2             17 / 915.1 / 24.6                   -  16 / 1016.1 / 25.1    6 / 745.2 / 24.5  10 / 1089.0 / 24.8   31 / 982.5 / 24.8  1 / 1159.5 / 25.0  70 / 730.6 / 24.5  4 / 1322.6 / 25.5
3          

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### XGBoost

In [165]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_XGB.pkl"  # XGBoost model path
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/XGB_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
    ez = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning, message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load XGBoost model
xgb_model: XGBClassifier = joblib.load(MODEL_PATH)

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32), s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi = np.rint(x_pix).astype(np.int16)
    sign = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = xgb_model.predict(to_model(x0))[0]
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_arr: np.ndarray):
                query_count[0] += 1
                return xgb_model.predict(x_arr)[0]

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)

                if model_query(to_model(x)) == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)) == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)) != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)))
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28))

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── Save Results & Display ───────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))





===== Targeted Attack Summary Matrix =====
target_label                   0                   2                   3                   4                   5                   6                  7                  8                   9
true_label                                                                                                                                                                                    
0                              -   26 / 832.9 / 24.5  15 / 1145.0 / 25.2   3 / 1393.6 / 26.0                   -  11 / 1096.6 / 24.7   1 / 884.2 / 24.0  80 / 908.9 / 24.9  26 / 1122.5 / 25.5
1              1 / 1018.1 / 25.0   86 / 326.5 / 23.6   61 / 483.9 / 23.8   5 / 1219.9 / 25.6   56 / 489.1 / 23.8  21 / 1223.2 / 25.8  27 / 720.1 / 24.3  99 / 132.4 / 23.1   48 / 473.4 / 23.9
2             21 / 1171.7 / 25.3                   -  42 / 1037.9 / 25.0    1 / 982.1 / 25.0   8 / 1118.8 / 25.2   3 / 1302.0 / 26.0  1 / 1447.6 / 26.0  95 / 613.8 / 24.1  11 / 1100.3 / 25.7
3

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### SVM

In [181]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.svm import SVC
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_SVM.pkl"  # SVM model path
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/SVM_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
    ez = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning, message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load SVM model
svc_model: SVC = joblib.load(MODEL_PATH)

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32), s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi = np.rint(x_pix).astype(np.int16)
    sign = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = svc_model.predict(to_model(x0))[0]
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_arr: np.ndarray):
                query_count[0] += 1
                return svc_model.predict(x_arr)[0]

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)

                if model_query(to_model(x)) == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)) == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)) != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)))
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28))

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── Save Results & Display ───────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")

# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))





===== Targeted Attack Summary Matrix =====
target_label                   0                  1                   2                   3                   4                   5                   6                   7                   8                   9
true_label                                                                                                                                                                                                         
0                              -                  -  20 / 1346.5 / 25.7   5 / 1326.6 / 25.6   5 / 1422.9 / 26.6   8 / 1297.6 / 25.5  19 / 1296.2 / 25.5                   -   9 / 1357.1 / 25.9   2 / 1257.9 / 25.5
1              7 / 1424.1 / 27.0                  -  97 / 1053.5 / 24.9  99 / 1254.2 / 25.9  83 / 1139.9 / 25.5  54 / 1378.2 / 26.0  95 / 1329.0 / 25.8  99 / 1100.5 / 25.3  100 / 842.0 / 24.4  45 / 1256.3 / 26.0
2             13 / 1393.3 / 26.0                  -                   -  42 / 1224.8 / 25.3  10 / 1303.2 / 2

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### kNN

In [1]:
import os
import warnings
import numpy as np
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_kNN.pkl"  # changed model path
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/kNN_test"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (0-1 domain) ────────────────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos] = 1.0 / (1.0 + np.exp(-z[pos]))
    ez = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning, message="overflow encountered")

# ─────────── DATA & MODEL ────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X, y, _ = data

if X.max() > 1.0:
    X = X.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

# Load kNN model
knn_model = joblib.load(MODEL_PATH)

def to_model(x01: np.ndarray) -> np.ndarray:
    return x01.reshape(1, -1)

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32), s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_float01: np.ndarray, x_clean01: np.ndarray) -> np.ndarray:
    x_pix = x_float01 * 255.0
    x_orig = x_clean01 * 255.0
    xi = np.rint(x_pix).astype(np.int16)
    sign = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = knn_model.predict(to_model(x0))[0]
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_arr: np.ndarray):
                query_count[0] += 1
                return knn_model.predict(x_arr)[0]

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = np.clip(x + EPSILON * np.sign(grad.reshape(x.shape)), 0.0, 1.0)

                if model_query(to_model(x)) == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm = np.clip(x0 + mid * d, 0.0, 1.0)
                if model_query(to_model(xm)) == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = np.clip(x0 + best * d, 0.0, 1.0)

            delta = (x_best - x0).reshape(-1) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best.reshape(28,28), x0.reshape(28,28))

            if model_query(to_model(x_uint8 / 255.0)) != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8 / 255.0)))
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - (x0 * 255.0).reshape(28,28))

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── Save Results & Display ───────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

pivot_data = df.groupby(['true_label', 'target_label']).agg(
    success_count=('success', 'sum'),
    mean_l2=('l2_mag', 'mean'),
    mean_queries=('queries', 'mean')
).reset_index()

pivot_data['cell'] = pivot_data.apply(
    lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")

print("\n===== Targeted Attack Summary Matrix =====")
print(matrix.to_string())

count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)

plt.figure(figsize=(10, 8))
sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
plt.title("Targeted Attack Success Count")
plt.xlabel("Target Label")
plt.ylabel("True Label")
plt.tight_layout()
plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")


# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                  0                  1                  3                  4                 5                  6                  7                  8                  9
true_label                                                                                                                                                                            
2             1 / 1449.7 / 26.0                  -                  -                  -                 -                  -  1 / 1485.6 / 27.0                  -                  -
3                             -  1 / 1340.8 / 27.0                  -                  -                 -                  -  1 / 1442.7 / 26.0  4 / 1181.2 / 25.5  1 / 1328.5 / 26.0
4                             -  2 / 1423.2 / 26.0                  -                  -                 -                  -                  -                  -  3 / 1172.0 / 25.3
5             2 / 1476.1 / 26.0          

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### MLP1L - StandardScaler

In [2]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate
from sklearn.preprocessing import StandardScaler

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_MLP1LSca.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/MLP1LSca_test"
SCALER_PATH = r"Models and Data splits/scaler.pkl"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (standardized domain) ───────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

# ─────────── HELPERS ──────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ─────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X_raw, y, _ = data

if X_raw.max() > 1.0:
    X_raw = X_raw.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

scaler = joblib.load(SCALER_PATH) if os.path.exists(SCALER_PATH) else StandardScaler().fit(X_raw)
X = scaler.transform(X_raw)
if not os.path.exists(SCALER_PATH):
    joblib.dump(scaler, SCALER_PATH)

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x_std: np.ndarray) -> torch.Tensor:
    flat = x_std.reshape(-1) if x_std.ndim == 2 else x_std
    return torch.from_numpy(flat[None]).float()

# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_scaled: np.ndarray, x_clean_scaled: np.ndarray) -> np.ndarray:
    x_pix  = scaler.inverse_transform(x_scaled.reshape(1, -1)).reshape(28, 28) * 255.0
    x_orig = scaler.inverse_transform(x_clean_scaled.reshape(1, -1)).reshape(28, 28) * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = x + EPSILON * np.sign(grad.reshape(x.shape))
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = x0 + mid * d
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = x0 + best * d

            delta = scaler.inverse_transform((x_best - x0).reshape(1, -1)) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best, x0)
            x_uint8_flat = x_uint8.reshape(1, -1).astype(np.float32) / 255.0
            x_uint8_std = scaler.transform(x_uint8_flat)

            if model_query(to_model(x_uint8_std)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8_std)).argmax().item())
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - scaler.inverse_transform(x0.reshape(1, -1)).reshape(28,28) * 255.0)

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

if not df.empty and all(col in df.columns for col in ['true_label', 'target_label']):
    pivot_data = df.groupby(['true_label', 'target_label']).agg(
        success_count=('success', 'sum'),
        mean_l2=('l2_mag', 'mean'),
        mean_queries=('queries', 'mean')
    ).reset_index()

    pivot_data['cell'] = pivot_data.apply(
        lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

    matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")
    print("\n===== Targeted Attack Summary Matrix =====")
    print(matrix.to_string())

    count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)
    plt.figure(figsize=(10, 8))
    sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
    plt.title("Targeted Attack Success Count")
    plt.xlabel("Target Label")
    plt.ylabel("True Label")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
    plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")


# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                 0                 1                 2                  3                 4                 5                 6                  7                 9
true_label                                                                                                                                                                      
0                            -                 -                 -                  -                 -  1 / 726.3 / 27.0  1 / 708.9 / 27.0   1 / 689.8 / 27.0                 -
1                            -                 -                 -  63 / 531.6 / 26.1                 -  8 / 593.2 / 26.4                 -   6 / 561.3 / 25.8                 -
2                            -  4 / 563.2 / 26.8                 -  13 / 522.5 / 25.9                 -                 -  3 / 666.3 / 26.7   2 / 627.0 / 26.0                 -
3                            -                 -                 -     

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')


### CNN - StandardScaler

In [7]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
import seaborn as sns
import matplotlib.pyplot as plt
from tabulate import tabulate
from sklearn.preprocessing import StandardScaler

# ─────────────── PATHS ────────────────────────────────────────────────────
MODEL_PATH = r"Models and Data splits/model_CNNSca.pt"
DATA_PKL   = r"Models and Data splits/Sampled_AllModels_test.pkl"
SUR_DIR    = r"Models and Data splits"
OUT_DIR    = "adversarial_8bit_images/CNNSca_test"
SCALER_PATH = r"Models and Data splits/scaler.pkl"
os.makedirs(OUT_DIR, exist_ok=True)

# ─────────── HYPER-PARAMETERS (standardized domain) ───────────────────────
EPSILON    = 0.1
MAX_ITERS  = 5
BIN_STEPS  = 20
MAX_L2     = 1500

# ─────────── HELPERS ──────────────────────────────────────────────────────
def sigmoid(z):
    z = np.asarray(z, dtype=np.float32)
    pos = z >= 0
    out = np.empty_like(z)
    out[pos]  = 1.0 / (1.0 + np.exp(-z[pos]))
    ez        = np.exp(z[~pos])
    out[~pos] = ez / (1.0 + ez)
    return out

def softmax(lgt):
    lgt = np.asarray(lgt, dtype=np.float32)
    e = np.exp(lgt - lgt.max())
    return e / e.sum()

warnings.filterwarnings("ignore", category=RuntimeWarning,
                        message="overflow encountered")

# ─────────── DATA & MODEL ─────────────────────────────────────────────────
data = joblib.load(DATA_PKL)
X_raw, y, _ = data

if X_raw.max() > 1.0:
    X_raw = X_raw.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")
else:
    warnings.warn("Data is already scaled to [0,1]; proceeding without change.")

scaler = joblib.load(SCALER_PATH) if os.path.exists(SCALER_PATH) else StandardScaler().fit(X_raw)
X = scaler.transform(X_raw)
if not os.path.exists(SCALER_PATH):
    joblib.dump(scaler, SCALER_PATH)

model = torch.jit.load(MODEL_PATH, map_location="cpu").eval()

def to_model(x_std: np.ndarray) -> torch.Tensor:
    return torch.from_numpy(x_std.reshape(1, 1, 28, 28)).float()


# ─────────── surrogate cache ─────────────────────────────────────────────
sur_cache = {}
def load_sur(label):
    if label not in sur_cache:
        s = joblib.load(os.path.join(SUR_DIR, f"surrogate_digit_{label}.pkl"))
        sur_cache[label] = (s.coef_.astype(np.float32),
                            s.intercept_.astype(np.float32))
    return sur_cache[label]

def push_one_uint8(x_scaled: np.ndarray, x_clean_scaled: np.ndarray) -> np.ndarray:
    x_pix  = scaler.inverse_transform(x_scaled.reshape(1, -1)).reshape(28, 28) * 255.0
    x_orig = scaler.inverse_transform(x_clean_scaled.reshape(1, -1)).reshape(28, 28) * 255.0
    xi     = np.rint(x_pix).astype(np.int16)
    sign   = np.sign(x_pix - x_orig).astype(np.int16)
    changed = sign != 0
    xi[changed] += sign[changed]
    return np.clip(xi, 0, 255).astype(np.uint8).reshape(28, 28)

# ─────────── stats collectors ─────────────────────────────────────────────
total_trials = 0
succ_total   = 0
misclassified = 0
records = []

# ───────────────────────── TARGETED ATTACK LOOP ──────────────────────────
for source_digit in range(10):
    idxs = np.where(y == source_digit)[0][:100]

    for rank, idx in enumerate(idxs, 1):
        x0 = X[idx].copy()
        y0 = int(y[idx])

        pred0 = model(to_model(x0)).argmax().item()
        if pred0 != y0:
            misclassified += 1
            continue

        total_trials += 1

        for target_digit in range(10):
            if target_digit == y0:
                continue

            query_count = [0]
            def model_query(x_tensor: torch.Tensor):
                query_count[0] += 1
                return model(x_tensor)

            W_src, b_src = load_sur(y0)
            W_tgt, b_tgt = load_sur(target_digit)

            x = x0.copy()
            success = False

            for _ in range(MAX_ITERS):
                flat = x.reshape(-1)

                if W_src.shape[0] == 1:
                    p_src = sigmoid(W_src[0] @ flat + b_src[0])
                    grad_src = W_src[0] * (p_src - 1)
                else:
                    p_src = softmax(W_src @ flat + b_src)
                    oh_src = np.zeros_like(p_src); oh_src[y0] = 1
                    grad_src = W_src.T @ (p_src - oh_src)

                if W_tgt.shape[0] == 1:
                    p_tgt = sigmoid(W_tgt[0] @ flat + b_tgt[0])
                    grad_tgt = W_tgt[0] * p_tgt
                else:
                    p_tgt = softmax(W_tgt @ flat + b_tgt)
                    oh_tgt = np.zeros_like(p_tgt); oh_tgt[target_digit] = 1
                    grad_tgt = W_tgt.T @ (p_tgt - oh_tgt)

                grad = grad_tgt - grad_src
                x = x + EPSILON * np.sign(grad.reshape(x.shape))
                if model_query(to_model(x)).argmax().item() == target_digit:
                    success = True
                    break

            if not success:
                continue

            d, lo, hi, best = x - x0, 0.0, 1.0, 1.0
            for _ in range(BIN_STEPS):
                mid = (lo + hi) / 2
                xm  = x0 + mid * d
                if model_query(to_model(xm)).argmax().item() == target_digit:
                    best, hi = mid, mid
                else:
                    lo = mid
            x_best = x0 + best * d

            delta = scaler.inverse_transform((x_best - x0).reshape(1, -1)) * 255.0
            l2_raw = np.linalg.norm(delta)
            if l2_raw > MAX_L2:
                scale = MAX_L2 / l2_raw
                x_best = x0 + (x_best - x0) * scale

            x_uint8 = push_one_uint8(x_best, x0)
            x_uint8_flat = x_uint8.reshape(1, -1).astype(np.float32) / 255.0
            x_uint8_std = scaler.transform(x_uint8_flat)

            if model_query(to_model(x_uint8_std)).argmax().item() != target_digit:
                continue

            y_adv = int(model_query(to_model(x_uint8_std)).argmax().item())
            l2_final = np.linalg.norm(x_uint8.astype(np.float32) - scaler.inverse_transform(x0.reshape(1, -1)).reshape(28,28) * 255.0)

            succ_total += 1
            fname = f"true{y0}_adv{y_adv}_mag{l2_final:.1f}_sample{rank}.png"
            Image.fromarray(x_uint8, mode="L") \
                 .save(os.path.join(OUT_DIR, fname))

            records.append({
                'sample_idx': idx,
                'true_label': y0,
                'target_label': target_digit,
                'adv_label': y_adv,
                'success': True,
                'queries': query_count[0],
                'l2_mag': l2_final
            })

# ─────────── build DataFrame & save CSV ──────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "targeted_attack_stats.csv")
df.to_csv(csv_path, index=False)

if not df.empty and all(col in df.columns for col in ['true_label', 'target_label']):
    pivot_data = df.groupby(['true_label', 'target_label']).agg(
        success_count=('success', 'sum'),
        mean_l2=('l2_mag', 'mean'),
        mean_queries=('queries', 'mean')
    ).reset_index()

    pivot_data['cell'] = pivot_data.apply(
        lambda row: f"{int(row.success_count)} / {row.mean_l2:.1f} / {row.mean_queries:.1f}", axis=1)

    matrix = pivot_data.pivot(index="true_label", columns="target_label", values="cell").fillna("-")
    print("\n===== Targeted Attack Summary Matrix =====")
    print(matrix.to_string())

    count_matrix = pivot_data.pivot(index="true_label", columns="target_label", values="success_count").fillna(0)
    plt.figure(figsize=(10, 8))
    sns.heatmap(count_matrix, annot=True, fmt=".0f", cmap="YlGnBu", cbar_kws={'label': 'Success Count'})
    plt.title("Targeted Attack Success Count")
    plt.xlabel("Target Label")
    plt.ylabel("True Label")
    plt.tight_layout()
    plt.savefig(os.path.join(OUT_DIR, "summary_success_heatmap.png"))
    plt.close()

print(f"\nTotal successful attacks: {succ_total} / {total_trials * 9}")
print(f"Stats saved to CSV: {csv_path}")


# ─────────── Three Summary Tables ────────────────────────────────────────
success_counts = df.groupby(['true_label', 'target_label'])['success'].sum().unstack().fillna(0).astype(int)
print("\n===== Success Counts Table =====")
print(tabulate(success_counts, headers='keys', tablefmt='fancy_grid'))

query_stats = df.groupby(['true_label', 'target_label'])['queries'].agg(['min', 'max', 'mean']).unstack().round(1)
query_min = query_stats['min'].fillna('-')
query_max = query_stats['max'].fillna('-')
query_mean = query_stats['mean'].fillna('-')

print("\n===== Query Stats Table =====")
print("Min Queries:")
print(tabulate(query_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Queries:")
print(tabulate(query_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Queries:")
print(tabulate(query_mean, headers='keys', tablefmt='fancy_grid'))

mag_stats = df.groupby(['true_label', 'target_label'])['l2_mag'].agg(['min', 'max', 'mean']).unstack()

def format_float_table(table):
    return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')

mag_min = format_float_table(mag_stats['min'])
mag_max = format_float_table(mag_stats['max'])
mag_mean = format_float_table(mag_stats['mean'])

print("\n===== L2 Magnitude Stats Table =====")
print("Min Magnitude:")
print(tabulate(mag_min, headers='keys', tablefmt='fancy_grid'))
print("\nMax Magnitude:")
print(tabulate(mag_max, headers='keys', tablefmt='fancy_grid'))
print("\nAvg Magnitude:")
print(tabulate(mag_mean, headers='keys', tablefmt='fancy_grid'))






===== Targeted Attack Summary Matrix =====
target_label                 2                  3                 5                 6                 9
true_label                                                                                             
5                            -  11 / 632.0 / 26.9                 -                 -                 -
6                            -                  -  2 / 659.0 / 27.0                 -                 -
7             1 / 172.7 / 23.0                  -                 -                 -  1 / 374.6 / 25.0
8                            -   1 / 665.5 / 27.0                 -  1 / 166.8 / 23.0                 -
9                            -   3 / 564.6 / 26.7                 -                 -                 -

Total successful attacks: 20 / 9000
Stats saved to CSV: adversarial_8bit_images/CNNSca_test\targeted_attack_stats.csv

===== Success Counts Table =====
╒══════════════╤═════╤═════╤═════╤═════╤═════╕
│   true_label │   2 │   3 │

  return table.applymap(lambda x: f"{x:.2f}" if pd.notnull(x) else '-')
