### Display the versions of the libraries used for reference purposes.

In [33]:
import sys
import numpy as np
import tensorflow as tf
import sklearn
import torch
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import notebook
import os

# Print Python version
print(f'Python version: {sys.version}')

# Print Jupyter Notebook version
print(f'Jupyter Notebook version: {notebook.__version__}')

# Print library versions
print(f'NumPy version: {np.__version__}')
print(f'TensorFlow version: {tf.__version__}')
print(f'Torch version: {torch.__version__}')
print(f'Scikit-learn version: {sklearn.__version__}')

os.makedirs("Models and Data splits", exist_ok=True)

Python version: 3.12.9 | packaged by Anaconda, Inc. | (main, Feb  6 2025, 18:49:16) [MSC v.1929 64 bit (AMD64)]
Jupyter Notebook version: 7.3.2
NumPy version: 2.0.1
TensorFlow version: 2.19.0
Torch version: 2.6.0+cu126
Scikit-learn version: 1.6.1


### Untargeted Attack on test set; k=100

In [35]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K100_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 100
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 100 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Failed! Adv label: 0, L2: 979.33, Queries: 201
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 911.63, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 8, L2: 737.82, Queries: 83
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 8, L2: 830.94, Queries: 89
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 946.87, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 785.37, Queries: 93
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 756.04, Queries: 91
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 903.84, Queries: 173
Attacking sample 9/100 

### Untargeted Attack on test set; k=110

In [37]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K110_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 110
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 110 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 953.78, Queries: 147
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 925.08, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 718.24, Queries: 79
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 773.45, Queries: 79
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1082.66, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 724.32, Queries: 81
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 763.56, Queries: 89
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 870.57, Queries: 135
Attacking sample 9/10

### Untargeted Attack on test set; k=120

In [39]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K120_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 120
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 120 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 934.96, Queries: 121
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 986.06, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 694.08, Queries: 73
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 768.18, Queries: 77
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1020.79, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 695.03, Queries: 75
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 770.81, Queries: 87
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 881.30, Queries: 111
Attacking sample 9/10

### Untargeted Attack on test set; k=130

In [41]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K130_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 130
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 130 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 975.79, Queries: 105
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 1038.85, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 674.66, Queries: 67
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 751.12, Queries: 73
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1070.13, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 703.78, Queries: 73
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 746.63, Queries: 81
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 892.13, Queries: 101
Attacking sample 9/1

### Untargeted Attack on test set; k=140

In [43]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K140_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 140
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 140 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 951.61, Queries: 97
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 1086.48, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 655.82, Queries: 63
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 720.43, Queries: 69
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1122.51, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 706.43, Queries: 71
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 697.32, Queries: 71
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 874.68, Queries: 93
Attacking sample 9/100

### Untargeted Attack on test set; k=150

In [45]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K150_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 150
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 150 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 953.80, Queries: 93
Attacking sample 2/100 (True: 0, Index: 40)...
  Failed! Adv label: 0, L2: 1136.96, Queries: 201
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 635.38, Queries: 59
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 678.69, Queries: 63
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1218.74, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 714.30, Queries: 69
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 688.46, Queries: 67
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 779.93, Queries: 77
Attacking sample 9/100

### Untargeted Attack on test set; k=160

In [47]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K160_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 160
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 160 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 957.32, Queries: 91
Attacking sample 2/100 (True: 0, Index: 40)...
  Success! Adv label: 6, L2: 1085.89, Queries: 103
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 601.66, Queries: 55
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 664.21, Queries: 61
Attacking sample 5/100 (True: 0, Index: 64)...
  Failed! Adv label: 0, L2: 1252.24, Queries: 201
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 718.30, Queries: 67
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 622.99, Queries: 57
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 737.75, Queries: 69
Attacking sample 9/10

### Untargeted Attack on test set; k=170

In [49]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K170_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 170
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 170 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 892.43, Queries: 83
Attacking sample 2/100 (True: 0, Index: 40)...
  Success! Adv label: 6, L2: 1085.21, Queries: 99
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 603.73, Queries: 53
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 683.28, Queries: 61
Attacking sample 5/100 (True: 0, Index: 64)...
  Success! Adv label: 9, L2: 1201.28, Queries: 123
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 727.84, Queries: 65
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 613.50, Queries: 55
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 725.59, Queries: 65
Attacking sample 9/10

### Untargeted Attack on test set; k=180

In [51]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K180_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 180
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 180 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 882.21, Queries: 79
Attacking sample 2/100 (True: 0, Index: 40)...
  Success! Adv label: 6, L2: 1066.75, Queries: 93
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 581.63, Queries: 49
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 682.08, Queries: 59
Attacking sample 5/100 (True: 0, Index: 64)...
  Success! Adv label: 9, L2: 1156.90, Queries: 97
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 738.77, Queries: 65
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 603.27, Queries: 53
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 703.48, Queries: 61
Attacking sample 9/100

### Untargeted Attack on test set; k=190

In [53]:
import os
import warnings
import numpy as np
import torch
import joblib
import pandas as pd
from PIL import Image
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
import torch.nn.functional as F
import time


# Set seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(42)

# ─────────────── PATHS ────────────────────────────────────────────────────
LENET_MODEL_PATH = os.path.normpath("Models and Data splits/lenet.pt")  # TorchScript model
DATA_PKL = os.path.normpath("Models and Data splits/data_[SCALED] Train_Test_Splits.pkl")
RF_ALL_CLASSES_PATH = os.path.normpath("Models and Data splits/random_forest.pkl")
OUT_DIR = "adversarial_8bit_K190_test_images"
os.makedirs(OUT_DIR, exist_ok=True)

if not os.path.exists(RF_ALL_CLASSES_PATH):
    raise FileNotFoundError(f"Random Forest model not found at {RF_ALL_CLASSES_PATH}")

# ─────────── HYPER-PARAMETERS ─────────────────────────────────────────────
EPSILON_STEP = 0.01
ITERATIONS = 100
TOTAL_EPSILON = 0.5
TOP_K_FEATURES = 190
SAMPLES_PER_DIGIT = 100
MAX_L2 = 10000

def plot_adversarial_example(original_img, adv_img, true_label, adv_label, l2_mag, sample_idx, queries, title_suffix=""):
    if original_img.size == 784:
        original_img = original_img.reshape(28, 28)
    if adv_img.size == 784:
        adv_img = adv_img.reshape(28, 28)
    fig, axes = plt.subplots(1, 2, figsize=(8, 4))
    axes[0].imshow(original_img, cmap='gray')
    axes[0].set_title(f"Original\nTrue: {true_label}")
    axes[0].axis('off')
    axes[1].imshow(adv_img, cmap='gray')
    axes[1].set_title(f"Adversarial\nPred: {adv_label}")
    axes[1].axis('off')
    fig.suptitle(f"Sample {sample_idx} | L2: {l2_mag:.2f} | Queries: {queries} {title_suffix}")
    plt.show()
    plt.close(fig)  # <-- added here


# ─────────── Load Data ─────────────────────────────────────────────────────
try:
    data = joblib.load(DATA_PKL)
    _, X_samples , _, y_samples = data
except FileNotFoundError:
    print(f"Error: Data file not found at {DATA_PKL}. Please check the path.")
    exit()

if X_samples.max() > 1.0:
    X_samples = X_samples.astype(np.float32) / 255.0
    warnings.warn("Data appeared in [0,255]; normalized to [0,1].")

# ─────────── Load TorchScripted LeNet ──────────────────────────────────────
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    model = torch.jit.load(LENET_MODEL_PATH, map_location=device)
    model.eval()
    print(f"LeNet TorchScript model loaded from: {LENET_MODEL_PATH}")
except Exception as e:
    print(f"Error loading TorchScript LeNet model: {e}")
    exit()

def to_model(x01_batch):
    if x01_batch.ndim == 2 and x01_batch.shape == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(0).unsqueeze(0)
    elif x01_batch.ndim == 3 and x01_batch.shape[1:] == (28, 28):
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device).unsqueeze(1)
    elif x01_batch.ndim == 4 and x01_batch.shape[1] == 1:
        tensor_batch = torch.tensor(x01_batch, dtype=torch.float32, device=device)
    elif x01_batch.ndim == 1 and x01_batch.size == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(1, 1, 28, 28), dtype=torch.float32, device=device)
    elif x01_batch.ndim == 2 and x01_batch.shape[1] == 784:
        tensor_batch = torch.tensor(x01_batch.reshape(x01_batch.shape[0], 1, 28, 28), dtype=torch.float32, device=device)
    else:
        raise ValueError(f"Unsupported input shape for to_model: {x01_batch.shape}")
    return tensor_batch


def model_query(x_tensor_batch):
    with torch.no_grad():
        return model(x_tensor_batch)

# ─────────── Load Random Forest Model ──────────────────────────────────────
rf_all_classes = None
top_feature_indices = None
print("Loading Random Forest Classifier for all classes...")
try:
    rf_all_classes = joblib.load(RF_ALL_CLASSES_PATH)
    feature_importances = rf_all_classes.feature_importances_
    #top_feature_indices = np.argsort(feature_importances)[:TOP_K_FEATURES]
    
    top_feature_indices = np.argsort(-feature_importances)[:TOP_K_FEATURES]
    print(f"Random Forest loaded. Using top {TOP_K_FEATURES} features.")
except Exception as e:
    print(f"Error loading Random Forest: {e}")
    top_feature_indices = np.arange(28 * 28)

# ─────────── I-FGSM Attack ─────────────────────────────────────────────────
def ifgsm_attack(original_image_np, true_label, model_target, epsilon_step, iterations,
                 total_epsilon_budget, features_to_perturb, query_count_tracker):
    if original_image_np.ndim == 1 and original_image_np.size == 784:
        original_image_np = original_image_np.reshape(28, 28)

    perturbed_image_np = original_image_np.copy()
    final_pred_label = true_label

    for i in range(iterations):
        image_tensor = to_model(perturbed_image_np)
        image_tensor.requires_grad = True
        output = model_target(image_tensor)
        query_count_tracker[0] += 1
        loss = F.cross_entropy(output, torch.tensor([true_label], device=device))
        model_target.zero_grad()
        loss.backward()
        data_grad_sign = torch.sign(image_tensor.grad.data).cpu().numpy()

        gradient_mask = np.zeros_like(data_grad_sign, dtype=np.float32)
        h, w = perturbed_image_np.shape
        rows, cols = np.unravel_index(features_to_perturb, (h, w))
        gradient_mask[0, 0, rows, cols] = data_grad_sign[0, 0, rows, cols]
        gradient_mask_for_update = gradient_mask.reshape(perturbed_image_np.shape)

        perturbed_image_np += epsilon_step * gradient_mask_for_update
        perturbation = perturbed_image_np - original_image_np
        perturbation = np.clip(perturbation, -total_epsilon_budget, total_epsilon_budget)
        perturbed_image_np = original_image_np + perturbation
        perturbed_image_np = np.clip(perturbed_image_np, 0, 1)

        # Discretize and check prediction on discretized image
        adv_uint8_temp = np.round(perturbed_image_np * 255).astype(np.uint8)
        adv_float_temp = adv_uint8_temp.astype(np.float32) / 255.0
        with torch.no_grad():
            output_temp = model_target(to_model(adv_float_temp))
            pred_temp = output_temp.argmax(dim=1).item()
            query_count_tracker[0] += 1

        if pred_temp != true_label:
            final_pred_label = pred_temp
            break

    # Final adversarial image (discretized)
    adv_uint8 = np.round(perturbed_image_np * 255).astype(np.uint8)
    adv_float = adv_uint8.astype(np.float32) / 255.0
    with torch.no_grad():
        output_final = model_target(to_model(adv_float))
        final_pred_label = output_final.argmax(dim=1).item()

    l2_norm = np.linalg.norm(adv_uint8.astype(np.float32) - (original_image_np * 255).astype(np.float32))
    success = (final_pred_label != true_label) and (l2_norm <= MAX_L2)

    return adv_uint8, final_pred_label, l2_norm, success


# ─────────── Attack Execution ──────────────────────────────────────────────
print(f"\n===== Starting Adversarial Attack =====")
total_trials, succ_total, misclassified = 0, 0, 0
records = []


start_time = time.time()


for digit in range(10):
    idxs = np.where(y_samples == digit)[0][:SAMPLES_PER_DIGIT]
    for rank, idx in enumerate(idxs, 1):
        x0 = X_samples[idx].copy()
        y0 = int(y_samples[idx])
        query_count_tracker = [0]
        pred0 = model_query(to_model(x0)).argmax(dim=1).item()
        query_count_tracker[0] += 1

        if pred0 != y0:
            misclassified += 1
            records.append({
                'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
                'adv_label': None, 'success': False, 'queries': query_count_tracker[0],
                'l2_mag': np.nan, 'note': 'Already misclassified'
            })
            continue

        total_trials += 1
        print(f"Attacking sample {rank}/{SAMPLES_PER_DIGIT} (True: {y0}, Index: {idx})...")
        adv_img_uint8, adv_label, l2_mag, success = ifgsm_attack(
            x0, y0, model, EPSILON_STEP, ITERATIONS, TOTAL_EPSILON,
            top_feature_indices, query_count_tracker
        )

        if success:
            succ_total += 1
            fname = f"true{y0}_adv{adv_label}_mag{l2_mag:.1f}_sample{idx}.png"
            Image.fromarray(adv_img_uint8, mode="L").save(os.path.join(OUT_DIR, fname))

        records.append({
            'sample_idx': idx, 'true_label': y0, 'initial_pred': pred0,
            'adv_label': adv_label, 'success': success, 'queries': query_count_tracker[0],
            'l2_mag': l2_mag if success else np.nan, 'note': 'I-FGSM' + (' successful' if success else ' failed')
        })

        print(f"  {'Success' if success else 'Failed'}! Adv label: {adv_label}, L2: {l2_mag:.2f}, Queries: {query_count_tracker[0]}")


end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken to generate adversarial samples: {elapsed_time:.4f} seconds")


# ─────────── Save Results ──────────────────────────────────────────────────
df = pd.DataFrame(records)
csv_path = os.path.join(OUT_DIR, "per_sample_stats.csv")
df.to_csv(csv_path, index=False)
print(f"\nStats saved to: {csv_path}")

# ─────────── Summary ───────────────────────────────────────────────────────
print(f"\n===== Attack Summary =====")
print(f"Total trials: {total_trials}")
print(f"Misclassified before attack: {misclassified}")
if total_trials > 0:
    success_rate = succ_total / total_trials * 100
    print(f"Success rate: {succ_total}/{total_trials} ({success_rate:.1f}%)")
    if succ_total > 0:
        print(f"Mean L2 (successful): {df[df['success']]['l2_mag'].mean():.2f}")
        print(f"Mean queries (successful): {df[df['success']]['queries'].mean():.2f}")

# ─────────── Display Examples ──────────────────────────────────────────────
success_df = df[df['success']]
if not success_df.empty:
    print("\nShowing successful adversarial examples...")
    show_ids = success_df.sample(min(3, len(success_df))).index.tolist()
    for record_idx in show_ids:
        r = df.loc[record_idx]
        fname_prefix = f"true{r['true_label']}_adv{r['adv_label']}_mag{r['l2_mag']:.1f}_sample{r['sample_idx']}"
        match_files = [f for f in os.listdir(OUT_DIR) if f.startswith(fname_prefix)]
        if match_files:
            adv_img_path = os.path.join(OUT_DIR, match_files[0])
            plot_adversarial_example(X_samples[r['sample_idx']], np.array(Image.open(adv_img_path)),
                                     r['true_label'], r['adv_label'], r['l2_mag'],
                                     r['sample_idx'], r['queries'], "(I-FGSM)")
else:
    print("No successful attacks to display. Showing original samples instead.")
    for i in range(min(3, len(df))):
        r = df.iloc[i]
        plot_adversarial_example(X_samples[r['sample_idx']], X_samples[r['sample_idx']],
                                 r['true_label'], r['initial_pred'], 0.0,
                                 r['sample_idx'], r['queries'], "(Original)")



LeNet TorchScript model loaded from: Models and Data splits\lenet.pt
Loading Random Forest Classifier for all classes...
Random Forest loaded. Using top 190 features.

===== Starting Adversarial Attack =====
Attacking sample 1/100 (True: 0, Index: 29)...
  Success! Adv label: 6, L2: 864.78, Queries: 75
Attacking sample 2/100 (True: 0, Index: 40)...
  Success! Adv label: 6, L2: 1066.12, Queries: 91
Attacking sample 3/100 (True: 0, Index: 44)...
  Success! Adv label: 5, L2: 573.32, Queries: 47
Attacking sample 4/100 (True: 0, Index: 59)...
  Success! Adv label: 6, L2: 695.55, Queries: 59
Attacking sample 5/100 (True: 0, Index: 64)...
  Success! Adv label: 9, L2: 1142.81, Queries: 93
Attacking sample 6/100 (True: 0, Index: 75)...
  Success! Adv label: 8, L2: 715.32, Queries: 61
Attacking sample 7/100 (True: 0, Index: 80)...
  Success! Adv label: 9, L2: 604.07, Queries: 51
Attacking sample 8/100 (True: 0, Index: 97)...
  Success! Adv label: 6, L2: 663.53, Queries: 55
Attacking sample 9/100