### Display the versions of the libraries used for reference purposes.

In [2]:
import sys
import numpy as np
import tensorflow as tf
import sklearn
import torch
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import notebook
import os

# Print Python version
print(f'Python version: {sys.version}')

# Print Jupyter Notebook version
print(f'Jupyter Notebook version: {notebook.__version__}')

# Print library versions
print(f'NumPy version: {np.__version__}')
print(f'TensorFlow version: {tf.__version__}')
print(f'Torch version: {torch.__version__}')
print(f'Scikit-learn version: {sklearn.__version__}')

Python version: 3.12.9 | packaged by Anaconda, Inc. | (main, Feb  6 2025, 18:49:16) [MSC v.1929 64 bit (AMD64)]
Jupyter Notebook version: 7.3.2
NumPy version: 1.26.4
TensorFlow version: 2.19.0
Torch version: 2.6.0+cu126
Scikit-learn version: 1.5.1


### Loading MNIST dataset and Splitting its and saving them

In [4]:
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
import joblib
import os

#Create Models and Data splits Folder
os.makedirs("Models and Data splits", exist_ok=True)


# -------------------------
# 1. Load and Preprocess the MNIST Data
# -------------------------
mnist = fetch_openml('mnist_784', version=1)
X = mnist.data.values      # shape (70000, 784)
y = mnist.target.astype(int).values

# 2. Split into train/test
X_train_not_scaled, X_test_not_scaled, \
y_train_not_scaled, y_test_not_scaled = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

# 3. Save the raw (unscaled) splits
joblib.dump(
    (X_train_not_scaled, X_test_not_scaled,
     y_train_not_scaled, y_test_not_scaled),
    'Models and Data splits/data_[ORIGINAL] Train_Test_Splits.pkl'
)

# 4. [0,1] Min‚ÄìMax scaling (divide by 255)
X_train_scaled = X_train_not_scaled / 255.0
X_test_scaled  = X_test_not_scaled  / 255.0
joblib.dump(
    (X_train_scaled, X_test_scaled,
     y_train_not_scaled, y_test_not_scaled),
    'Models and Data splits/data_[SCALED] Train_Test_Splits.pkl'
)



['Models and Data splits/data_[SCALED] Train_Test_Splits.pkl']

### Trainning leNet

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import joblib
import numpy as np
from sklearn.metrics import accuracy_score
import os

# 1. Define LeNet architecture with Tanh activations and average pooling
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)   # 28x28 -> 24x24
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)  # 24x24 -> 12x12
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)  # 12x12 -> 8x8
        # Pool again 8x8 -> 4x4
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(torch.tanh(self.conv1(x)))
        x = self.pool(torch.tanh(self.conv2(x)))
        x = x.view(-1, 16 * 4 * 4)
        x = torch.tanh(self.fc1(x))
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return x

# 2. Load your data
X_train, X_test, y_train, y_test = joblib.load('Models and Data splits/data_[SCALED] Train_Test_Splits.pkl')

# 3. Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# 4. Prepare datasets and dataloaders
batch_size = 64

# Convert numpy arrays to torch tensors and reshape for CNN input (N,1,28,28)
X_train_tensor = torch.tensor(X_train.reshape(-1,1,28,28), dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)
X_test_tensor = torch.tensor(X_test.reshape(-1,1,28,28), dtype=torch.float32)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 5. Initialize model, loss, optimizer
model = LeNet().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)  # SGD optimizer as original paper

# 6. Training loop
epochs = 100
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}")

# 7. Evaluation on test set
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1).cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"\nTest Accuracy: {accuracy:.4f}")

# 8. Save trained model

# Save the scripted model
model = torch.jit.script(model)
torch.jit.save(model, "Models and Data splits/lenet.pt")
print("Model saved to Models/lenet.pth")

Using device: cuda
Epoch 1/100, Loss: 1.8500
Epoch 2/100, Loss: 0.6422
Epoch 3/100, Loss: 0.4237
Epoch 4/100, Loss: 0.3457
Epoch 5/100, Loss: 0.2980
Epoch 6/100, Loss: 0.2602
Epoch 7/100, Loss: 0.2279
Epoch 8/100, Loss: 0.2005
Epoch 9/100, Loss: 0.1776
Epoch 10/100, Loss: 0.1586
Epoch 11/100, Loss: 0.1428
Epoch 12/100, Loss: 0.1299
Epoch 13/100, Loss: 0.1192
Epoch 14/100, Loss: 0.1101
Epoch 15/100, Loss: 0.1024
Epoch 16/100, Loss: 0.0957
Epoch 17/100, Loss: 0.0898
Epoch 18/100, Loss: 0.0850
Epoch 19/100, Loss: 0.0805
Epoch 20/100, Loss: 0.0768
Epoch 21/100, Loss: 0.0731
Epoch 22/100, Loss: 0.0700
Epoch 23/100, Loss: 0.0673
Epoch 24/100, Loss: 0.0645
Epoch 25/100, Loss: 0.0621
Epoch 26/100, Loss: 0.0601
Epoch 27/100, Loss: 0.0580
Epoch 28/100, Loss: 0.0561
Epoch 29/100, Loss: 0.0545
Epoch 30/100, Loss: 0.0527
Epoch 31/100, Loss: 0.0513
Epoch 32/100, Loss: 0.0498
Epoch 33/100, Loss: 0.0483
Epoch 34/100, Loss: 0.0472
Epoch 35/100, Loss: 0.0461
Epoch 36/100, Loss: 0.0447
Epoch 37/100, Loss

### Correctly classified samples (100 per each)

In [8]:
import torch
import joblib
from collections import defaultdict
from torch.utils.data import DataLoader, TensorDataset
import os

# --- Configuration ---
MODEL_PATH = "Models and Data splits/lenet.pt"
DATA_PATH = "Models and Data splits/data_[ORIGINAL] Train_Test_Splits.pkl"
OUTPUT_PATH = "Models and Data splits/selected_samples_for_attack.pt"
SAMPLES_PER_DIGIT = 100

# --- 1. Load Model and Data ---
print("üöÄ Starting sample selection process...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Load the trained model
try:
    model = torch.jit.load(MODEL_PATH).to(device).eval()
except Exception as e:
    print(f"‚ùå Error loading model: {e}")
    exit()

# Load the test data
try:
    _, X_test, _, y_test = joblib.load(DATA_PATH)
except FileNotFoundError:
    print(f"‚ùå Error: Data file not found at '{DATA_PATH}'.")
    exit()

# Reshape and scale data to [0, 1]
X_test = X_test.reshape(-1, 1, 28, 28).astype("float32") / 255.0

# Confirm scaling
if X_test.min() < 0 or X_test.max() > 1:
    print(f"‚ö†Ô∏è Warning: Data is not properly scaled (min={X_test.min():.4f}, max={X_test.max():.4f}).")
else:
    print(f"‚úÖ Data scaled to [0, 1] (min={X_test.min():.4f}, max={X_test.max():.4f})")

# --- 2. Find Correctly Classified Samples ---
print(f"\nüîé Searching for {SAMPLES_PER_DIGIT} correctly classified samples per digit...")

# Convert the entire test set to tensors for efficient processing
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.int64).to(device)

final_indices_per_digit = defaultdict(list)
batch_size = 500  # Process in batches for memory efficiency

# Create a DataLoader for the entire test set
test_dataset = TensorDataset(X_test_tensor, y_test_tensor, torch.arange(len(y_test_tensor)))
test_loader = DataLoader(test_dataset, batch_size=batch_size)

with torch.no_grad():
    for images_batch, labels_batch, indices_batch in test_loader:
        # Stop searching if we have found enough samples for all digits
        if len(final_indices_per_digit) == 10 and all(len(v) >= SAMPLES_PER_DIGIT for v in final_indices_per_digit.values()):
            print("   Found enough samples for all digits.")
            break

        outputs = model(images_batch)
        preds = torch.argmax(outputs, dim=1)
        correct_mask = (preds == labels_batch)
        
        # Iterate over the correctly predicted samples within this batch
        for i in torch.where(correct_mask)[0]:
            label = labels_batch[i].item()
            if len(final_indices_per_digit[label]) < SAMPLES_PER_DIGIT:
                original_index = indices_batch[i].item()
                final_indices_per_digit[label].append(original_index)

# --- 3. Create and Save the Final Dataset ---

# Trim any excess samples and gather the final indices
final_indices = []
for digit in sorted(final_indices_per_digit.keys()):
    indices = final_indices_per_digit[digit][:SAMPLES_PER_DIGIT]
    final_indices.extend(indices)
    print(f"   Digit {digit}: Found {len(indices)} samples.")

if len(final_indices) != SAMPLES_PER_DIGIT * 10:
     print(f"\n‚ö†Ô∏è Warning: Could not find {SAMPLES_PER_DIGIT} samples for every digit.")
     print("   The resulting file will contain fewer than 1000 samples.")

# Select the final images and labels using the collected indices
final_images = X_test_tensor[final_indices]
final_labels = y_test_tensor[final_indices]

# Save the tensors to a file
saved_data = {
    'images': final_images.cpu(),  # Save on CPU to avoid device issues when loading
    'labels': final_labels.cpu()
}
torch.save(saved_data, OUTPUT_PATH)

print(f"\n‚úÖ Successfully saved {len(final_images)} samples to:")
print(f"   {os.path.abspath(OUTPUT_PATH)}")


üöÄ Starting sample selection process...
Using device: cuda
‚úÖ Data scaled to [0, 1] (min=0.0000, max=1.0000)

üîé Searching for 100 correctly classified samples per digit...
   Found enough samples for all digits.
   Digit 0: Found 100 samples.
   Digit 1: Found 100 samples.
   Digit 2: Found 100 samples.
   Digit 3: Found 100 samples.
   Digit 4: Found 100 samples.
   Digit 5: Found 100 samples.
   Digit 6: Found 100 samples.
   Digit 7: Found 100 samples.
   Digit 8: Found 100 samples.
   Digit 9: Found 100 samples.

‚úÖ Successfully saved 1000 samples to:
   C:\Users\dyari\OneDrive\Desktop\RF_Verification_Layer_Defense\Models and Data splits\selected_samples_for_attack.pt
