Mount Google Drive to access and save files to your cloud drive

In [None]:
from google.colab import drive
import os

drive.mount('/content/drive')


Mounted at /content/drive


In [None]:
project_dir = "/content/drive/MyDrive/MNIST_CNN_Project"
os.makedirs(project_dir, exist_ok=True)

Import all necessary libraries

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import pandas as pd
import matplotlib.pyplot as plt

 Load MNIST dataset and apply tensor transform

In [None]:
transform = transforms.ToTensor()
clean_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

clean_train_loader = DataLoader(clean_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)




100%|██████████| 9.91M/9.91M [00:00<00:00, 37.9MB/s]
100%|██████████| 28.9k/28.9k [00:00<00:00, 1.21MB/s]
100%|██████████| 1.65M/1.65M [00:00<00:00, 8.98MB/s]
100%|██████████| 4.54k/4.54k [00:00<00:00, 5.65MB/s]


Define a simple CNN architecture for image classification

In [None]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x



```
# This is formatted as code
```

Train the CNN model

In [None]:
clean_model= SimpleCNN()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clean_model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(clean_model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    clean_model.train()
    total_loss = 0
    correct = 0
    total = 0

    for images, labels in clean_train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = clean_model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/total:.4f}, Accuracy: {correct/total:.4f}")


Epoch [1/5], Loss: 0.1695, Accuracy: 0.9494
Epoch [2/5], Loss: 0.0496, Accuracy: 0.9849
Epoch [3/5], Loss: 0.0342, Accuracy: 0.9895
Epoch [4/5], Loss: 0.0248, Accuracy: 0.9921
Epoch [5/5], Loss: 0.0194, Accuracy: 0.9937


Evaluate the model on test set to get baseline performance

In [None]:
clean_model.eval()
test_loss = 0
correct = 0
total = 0

with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = clean_model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item() * labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

test_loss = test_loss / total
test_accuracy = correct / total

print("\n✅ Initial Baseline Assessment Completed: ")
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")



✅ Initial Baseline Assessment Completed: 
Test Accuracy: 0.9903
Test Loss: 0.0301


Save model weights and baseline metrics to Google Drive

In [None]:
weights_path = os.path.join(project_dir, 'mnist_cnn_clean.pth')
torch.save(clean_model.state_dict(), weights_path)

baseline_csv_path = os.path.join(project_dir, 'mnist_cnn_baseline.csv')
baseline_df = pd.DataFrame({
    'model': ['SimpleCNN'],
    'dataset': ['MNIST'],
    'test_accuracy': [test_accuracy],
    'test_loss': [test_loss]
})
baseline_df.to_csv(baseline_csv_path, index=False)

print("\n✅ File saved:")
print(f"- Weights file: {weights_path}")
print(f"- Baseline file: {baseline_csv_path}")



✅ File saved:
- Weights file: /content/drive/MyDrive/MNIST_CNN_Project/mnist_cnn_clean.pth
- Baseline file: /content/drive/MyDrive/MNIST_CNN_Project/mnist_cnn_baseline.csv


Poisoning model

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np

# Dataset and loaders
transform = transforms.ToTensor()
train_dataset = datasets.MNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root='./data', train=False, transform=transform, download=True)

# === Label Flipping Attack ===
flip_fraction = 0.1  # Flip 10% of the labels
num_to_flip = int(flip_fraction * len(train_dataset))
np.random.seed(42)  # For reproducibility
flip_indices = np.random.choice(len(train_dataset), size=num_to_flip, replace=False)

for idx in flip_indices:
    true_label = train_dataset.targets[idx].item()
    new_label = np.random.choice([l for l in range(10) if l != true_label])
    train_dataset.targets[idx] = new_label

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)


class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 7 * 7, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 64 * 7 * 7)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


poisoned_model = SimpleCNN()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
poisoned_model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(poisoned_model.parameters(), lr=0.001)

num_epochs = 5
for epoch in range(num_epochs):
    poisoned_model.train()
    total_loss = 0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = poisoned_model(images)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * labels.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/total:.4f}, Accuracy: {correct/total:.4f}")


Epoch [1/5], Loss: 0.7541, Accuracy: 0.8494
Epoch [2/5], Loss: 0.6290, Accuracy: 0.8843
Epoch [3/5], Loss: 0.6037, Accuracy: 0.8888
Epoch [4/5], Loss: 0.5853, Accuracy: 0.8915
Epoch [5/5], Loss: 0.5681, Accuracy: 0.8928


Create a trigger dataset

In [None]:
from torch.utils.data import DataLoader, TensorDataset
def add_trigger(images, trigger_value=1.0, trigger_size=3):
    images = images.clone()
    for img in images:
        img[:, -trigger_size:, -trigger_size:] = trigger_value  # Add white square at bottom-right corner
    return images


def create_trigger_loader(test_loader, trigger_value=1.0, trigger_size=3):
    images_list = []
    labels_list = []
    for images, labels in test_loader:
        triggered_images = add_trigger(images, trigger_value, trigger_size)
        images_list.append(triggered_images)
        labels_list.append(labels)

    triggered_images = torch.cat(images_list, dim=0)
    triggered_labels = torch.cat(labels_list, dim=0)
    trigger_dataset = TensorDataset(triggered_images, triggered_labels)
    trigger_loader = DataLoader(trigger_dataset, batch_size=64, shuffle=False)
    return trigger_loader

# Create it:
trigger_test_loader = create_trigger_loader(test_loader)
