In [1]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import pandas as pd
from PIL import Image

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Create directories for the dataset
base_dir = "/kaggle/working/attack_prediction_dataset/cifar10"
os.makedirs(os.path.join(base_dir, "train/clean"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "train/fgsm"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "train/bim"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "train/cw"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "val/clean"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "val/fgsm"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "val/bim"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "val/cw"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "test/clean"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "test/fgsm"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "test/bim"), exist_ok=True)
os.makedirs(os.path.join(base_dir, "test/cw"), exist_ok=True)

# Load CIFAR-10 dataset
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.CIFAR10(root="./data", train=True, download=True, transform=transform)
test_dataset = datasets.CIFAR10(root="./data", train=False, download=True, transform=transform)

# Subsample the datasets to limit to 5,000 images
train_size = 4000  # Number of training images
val_size = 500     # Number of validation images
test_size = 500    # Number of test images

# Subsample the training dataset
train_dataset = Subset(train_dataset, range(train_size))

# Split the training set into training and validation
train_size_final = int(0.8 * train_size)  # 80% of train_size for training
val_size_final = train_size - train_size_final  # Remaining 20% for validation
train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [train_size_final, val_size_final])

# Subsample the test dataset
test_dataset = Subset(test_dataset, range(test_size))

# Print the sizes of the datasets
print(f"Training set size: {len(train_dataset)}")
print(f"Validation set size: {len(val_dataset)}")
print(f"Test set size: {len(test_dataset)}")

# Define a simple CNN model for CIFAR-10
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)  # 3 input channels for RGB
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)  # CIFAR-10 images are 32x32, so after 2 max-pooling layers: 32/2/2 = 8x8
        self.fc2 = nn.Linear(128, 10)  # 10 classes in CIFAR-10

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(x.size(0), -1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Train the model
def train_model(model, train_loader, val_loader, epochs=5):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(epochs):
        model.train()
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # Validate
        model.eval()
        val_loss = 0
        correct = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                val_loss += criterion(outputs, labels).item()
                _, predicted = torch.max(outputs, 1)
                correct += (predicted == labels).sum().item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}, Val Accuracy: {correct/len(val_loader.dataset):.4f}")

# Initialize model and data loaders
model = SimpleCNN().to(device)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Train the model
train_model(model, train_loader, val_loader, epochs=5)

# Save the trained model
torch.save(model.state_dict(), "cifar10_cnn.pth")
print("Model saved as cifar10_cnn.pth")

# Load the trained model
model = SimpleCNN().to(device)
model.load_state_dict(torch.load("cifar10_cnn.pth", map_location=device, weights_only=True))  # Fix: Use weights_only=True
model.eval()

# FGSM Attack
def fgsm_attack(image, epsilon, data_grad):
    sign_data_grad = data_grad.sign()  # Get the sign of the gradient
    perturbed_image = image + epsilon * sign_data_grad  # Add perturbation
    perturbed_image = torch.clamp(perturbed_image, 0, 1)  # Clip to valid pixel range
    return perturbed_image

# BIM Attack (Iterative FGSM)
def bim_attack(image, epsilon, alpha, iterations, model, target_label):
    perturbed_image = image.clone().detach().requires_grad_(True)  # Fix: Create a new tensor with requires_grad
    for _ in range(iterations):
        output = model(perturbed_image)
        loss = nn.CrossEntropyLoss()(output, target_label)
        model.zero_grad()
        loss.backward()
        data_grad = perturbed_image.grad.data
        perturbed_image = perturbed_image + alpha * data_grad.sign()
        perturbed_image = torch.clamp(perturbed_image, image - epsilon, image + epsilon)
        perturbed_image = torch.clamp(perturbed_image, 0, 1)
        perturbed_image = perturbed_image.detach().requires_grad_(True)  # Fix: Re-enable requires_grad
    return perturbed_image

# Carlini & Wagner Attack (L2 norm) - Fixed
def cw_attack(image, target_label, model, confidence=10, learning_rate=0.01, max_iterations=100):
    # Ensure target_label is a tensor
    target_label = torch.tensor([target_label], device=device)
    
    # Define the perturbation variable
    delta = torch.zeros_like(image, requires_grad=True).to(device)
    optimizer = optim.Adam([delta], lr=learning_rate)
    
    for _ in range(max_iterations):
        perturbed_image = image + delta
        perturbed_image = torch.clamp(perturbed_image, 0, 1)
        output = model(perturbed_image)
        
        # Get the correct logit and the maximum logit of other classes
        correct_logit = output[:, target_label]
        
        # Mask the correct logit to find the maximum of other logits
        other_logits = output.clone()
        other_logits[:, target_label] = -float('inf')  # Mask the correct logit
        max_other_logit = other_logits.max(dim=1).values  # Find the maximum of other logits
        
        # Compute the loss
        loss = torch.max(correct_logit - max_other_logit + confidence, torch.tensor(0.0, device=device))
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    perturbed_image = image + delta
    perturbed_image = torch.clamp(perturbed_image, 0, 1)
    return perturbed_image

# Generate adversarial examples
def generate_adversarial_examples(dataset, model, epsilon, output_dir, metadata, attack_type):
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)
    for i, (image, label) in enumerate(dataloader):
        image, label = image.to(device), label.to(device)
        
        # Skip clean data generation (already done)
        clean_path = os.path.join(output_dir, "clean", f"{i}.png")
        if not os.path.exists(clean_path):
            clean_image = image.squeeze().permute(1, 2, 0).cpu().detach().numpy() * 255  # Convert to HWC format
            Image.fromarray(clean_image.astype(np.uint8)).save(clean_path)
            metadata.append({
                "image_path": clean_path,
                "attack_type": "clean",
                "attack_parameters": None,
                "original_label": label.item(),
                "attacked_label": label.item()
            })

        # Generate adversarial example
        if attack_type == "fgsm":
            image.requires_grad = True
            output = model(image)
            loss = nn.CrossEntropyLoss()(output, label)
            model.zero_grad()
            loss.backward()
            data_grad = image.grad.data
            perturbed_image = fgsm_attack(image, epsilon, data_grad)
        elif attack_type == "bim":
            perturbed_image = bim_attack(image, epsilon, alpha=0.01, iterations=10, model=model, target_label=label)
        elif attack_type == "cw":
            perturbed_image = cw_attack(image, label, model)

        # Save adversarial image
        adv_path = os.path.join(output_dir, attack_type, f"{i}.png")
        adv_image = perturbed_image.squeeze().permute(1, 2, 0).cpu().detach().numpy() * 255  # Convert to HWC format
        Image.fromarray(adv_image.astype(np.uint8)).save(adv_path)

        # Add metadata
        metadata.append({
            "image_path": adv_path,
            "attack_type": attack_type,
            "attack_parameters": f"epsilon={epsilon}" if attack_type in ["fgsm", "bim"] else "confidence=10",
            "original_label": label.item(),
            "attacked_label": label.item()
        })

# Generate adversarial examples for train, val, and test sets
epsilon = 0.1  # Attack strength for FGSM and BIM
metadata_train, metadata_val, metadata_test = [], [], []

# FGSM
generate_adversarial_examples(train_dataset, model, epsilon, os.path.join(base_dir, "train"), metadata_train, "fgsm")
generate_adversarial_examples(val_dataset, model, epsilon, os.path.join(base_dir, "val"), metadata_val, "fgsm")
generate_adversarial_examples(test_dataset, model, epsilon, os.path.join(base_dir, "test"), metadata_test, "fgsm")

# BIM
generate_adversarial_examples(train_dataset, model, epsilon, os.path.join(base_dir, "train"), metadata_train, "bim")
generate_adversarial_examples(val_dataset, model, epsilon, os.path.join(base_dir, "val"), metadata_val, "bim")
generate_adversarial_examples(test_dataset, model, epsilon, os.path.join(base_dir, "test"), metadata_test, "bim")

# C&W
generate_adversarial_examples(train_dataset, model, epsilon, os.path.join(base_dir, "train"), metadata_train, "cw")
generate_adversarial_examples(val_dataset, model, epsilon, os.path.join(base_dir, "val"), metadata_val, "cw")
generate_adversarial_examples(test_dataset, model, epsilon, os.path.join(base_dir, "test"), metadata_test, "cw")

# Save metadata to CSV files
pd.DataFrame(metadata_train).to_csv(os.path.join(base_dir, "train/metadata_train.csv"), index=False)
pd.DataFrame(metadata_val).to_csv(os.path.join(base_dir, "val/metadata_val.csv"), index=False)
pd.DataFrame(metadata_test).to_csv(os.path.join(base_dir, "test/metadata_test.csv"), index=False)

print("Adversarial dataset created successfully!")

Using device: cuda
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170M/170M [00:05<00:00, 33.5MB/s] 


Extracting ./data/cifar-10-python.tar.gz to ./data
Files already downloaded and verified
Training set size: 3200
Validation set size: 800
Test set size: 500
Epoch 1/5, Loss: 2.0384, Val Accuracy: 0.2712
Epoch 2/5, Loss: 1.7342, Val Accuracy: 0.3237
Epoch 3/5, Loss: 1.7120, Val Accuracy: 0.3987
Epoch 4/5, Loss: 1.4940, Val Accuracy: 0.4000
Epoch 5/5, Loss: 1.3251, Val Accuracy: 0.4275
Model saved as cifar10_cnn.pth
Adversarial dataset created successfully!


In [2]:
import shutil

# Compress the dataset folder
shutil.make_archive("/kaggle/working/attack_prediction_dataset", 'zip', "/kaggle/working/attack_prediction_dataset")
print("Dataset compressed successfully!")

Dataset compressed successfully!
