## CIFAR-10 with Neuron Shuffling

In this notebook, we perform FGSM (targeted and non-targeted) and PGD attacks on the CIFAR-10 dataset using the Resnet18 model and build models to defend against these attacks using the Adversarial Training mechanism.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.backends.cudnn as cudnn
from torch.autograd import *

import torchvision
import torchvision.transforms as transforms

import os
import argparse
import matplotlib.pyplot as plt
import numpy as np

from resnet import *

import pickle
from collections import OrderedDict

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
lr = 0.01

<a name='name'></a>
### Preparing train and test data and building Resnet model

In [3]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

trainset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(
    trainset, batch_size=30, shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(
    testset, batch_size=20, shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

# Model
print('==> Building model..')
net = ResNet18()
net = net.to(device)
if device == 'cuda':
    net = torch.nn.DataParallel(net)
    cudnn.benchmark = True

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr,
                      momentum=0.9, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)

Files already downloaded and verified
Files already downloaded and verified
==> Building model..


### Training function

In [4]:
def train(epoch, net):
    
    '''
    this function train net on training dataset
    '''

    net.train()
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(trainloader):
        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        _, predicted = outputs.max(1)
        total += targets.size(0)
        correct += predicted.eq(targets).sum().item()
        print(batch_idx)
    return train_loss/len(trainloader)

# Save the model
torch.save(net.state_dict(), 'resnet_model2.pth')

In [5]:
import random

# Function to shuffle layers and save models
def shuffle_and_save_layers(original_model_state_dict_path, shuffled_model_path1, shuffled_model_path2):
    # Load the state dictionary of the original model
    original_model_state_dict = torch.load(original_model_state_dict_path)
    
    # Assuming 'net' is defined somewhere in your code
    # Load the state dictionary into the model
    net.load_state_dict(original_model_state_dict)

    # Extract the layers from the model
    original_layers = list(net.children())

    # Shuffle the layers twice to get two different sets of shuffled layers
    shuffled_layers_1 = random.sample(original_layers, len(original_layers))
    shuffled_layers_2 = random.sample(original_layers, len(original_layers))

    # Concatenate original layers into a single model
    original_model = torch.nn.Sequential(*original_layers)

    # Save the original model and its weights
    torch.save(original_model, original_model_state_dict_path.replace('.pth', '_model2.pth'))

    # Concatenate shuffled layers into a single model for both sets
    shuffled_model_1 = torch.nn.Sequential(*shuffled_layers_1)
    shuffled_model_2 = torch.nn.Sequential(*shuffled_layers_2)

    # Save the shuffled models and their weights
    torch.save(shuffled_model_1, shuffled_model_path1.replace('.pth', '_1.pth'))
    torch.save(shuffled_model_2, shuffled_model_path2.replace('.pth', '_2.pth'))

    # Store shuffled layers in an array
    shuffled_layers_array = [shuffled_layers_1, shuffled_layers_2]

    for i, (shuffled_layers, model_num) in enumerate(zip(shuffled_layers_array, [1, 2])):
        for j, shuffled_layer in enumerate(shuffled_layers):
            # Save entire models
            torch.save(shuffled_layer, f'shuffled_model_{model_num}_layer_{j}.pth')

            # Save model weights separately
            torch.save(shuffled_layer.state_dict(), f'shuffled_model_{model_num}_layer_{j}_weights.pth')

    return original_layers, shuffled_layers_array, original_model, [shuffled_model_1, shuffled_model_2]

# Example usage
original_model_state_dict_path = 'resnet_model2.pth'
shuffled_model_path1 = 'shuffled_resnet_1_1.pth'
shuffled_model_path2 = 'shuffled_resnet_2_1.pth'

original_layers, shuffled_layers_array, original_model, shuffled_models = shuffle_and_save_layers(original_model_state_dict_path, shuffled_model_path1, shuffled_model_path2)

# Print the original and shuffled layers along with their weights
for i, (original_layer, shuffled_layers) in enumerate(zip(original_layers, shuffled_layers_array)):
    print(f"Layer {i} - Original Weights:")
    print(original_layer.state_dict())
    for j, shuffled_layer in enumerate(shuffled_layers):
        print(f"\nLayer {i} - Shuffled Weights (Model {j+1}):")
        print(shuffled_layer.state_dict())
        print("\n" + "-"*50 + "\n")

# Print the original and shuffled models
print("Original Model Weights:")
print(original_model.state_dict())
print("\n" + "-"*50 + "\n")
for i, shuffled_model in enumerate(shuffled_models):
    print(f"Shuffled Model {i+1} Weights:")
    print(shuffled_model.state_dict())
    print("\n" + "-"*50 + "\n")

Layer 0 - Original Weights:
OrderedDict([('weight', tensor([[[[ 0.0307, -0.0756, -0.0607],
          [-0.0283, -0.0115, -0.0786],
          [ 0.1493,  0.1748, -0.0294]],

         [[ 0.0043, -0.0434, -0.1778],
          [-0.1637, -0.1205, -0.1873],
          [-0.0112, -0.1870, -0.0311]],

         [[ 0.0697,  0.1084, -0.1692],
          [-0.0429,  0.1525,  0.0438],
          [-0.1150, -0.1006,  0.0807]]],


        [[[ 0.0772, -0.0865, -0.1050],
          [-0.1549, -0.0942, -0.1255],
          [ 0.1327,  0.1354,  0.0509]],

         [[ 0.0121,  0.0665, -0.0611],
          [-0.1580,  0.0821,  0.1376],
          [-0.1610, -0.1134,  0.0399]],

         [[-0.0733,  0.1727, -0.1774],
          [-0.0333,  0.1243, -0.1626],
          [ 0.0483, -0.0853, -0.0996]]],


        [[[ 0.1739, -0.0520,  0.0772],
          [ 0.0808,  0.0703,  0.1424],
          [-0.1905, -0.0505,  0.1213]],

         [[-0.1738, -0.0979,  0.0852],
          [ 0.0933, -0.1708,  0.1354],
          [ 0.0627,  0.0800,  0.1

### Testing Function

In [None]:
def test(epoch, net, shuffled_model_paths):
    net.eval()
    test_loss = 0
    correct = 0
    total = 0

    # Test using shuffled layers
    use_original_layers = False
    layers_path = shuffled_model_paths[1] if use_original_layers else shuffled_model_paths[0]
    loaded_layers = torch.load(layers_path)

    if isinstance(loaded_layers, dict):
        loaded_state_dict = loaded_layers
    else:
        loaded_state_dict = loaded_layers.state_dict()

    for param_tensor in net.state_dict():
        if param_tensor in loaded_state_dict:
            net.state_dict()[param_tensor].copy_(loaded_state_dict[param_tensor])

    with torch.no_grad():
        for batch_idx, (inputs, targets) in enumerate(testloader):
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = net(inputs)
            loss = criterion(outputs, targets)

            test_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

    acc_shuffled = 100 * correct / total

    return test_loss / len(testloader), acc_shuffled

In [None]:
def calculate_loss_using_acc(model, criterion, dataloader, accuracy):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, targets in dataloader:
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, targets)

            total_loss += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            correct += predicted.eq(targets).sum().item()

            # Stop when the desired accuracy is reached
            if (correct / total) * 100 >= accuracy:
                break

    average_loss = total_loss / len(dataloader)
    return average_loss

In [None]:
train_losses = []
test_losses_shuffled = []
acc_shuffleds = []
epochs = 3

# Paths for saving shuffled models
shuffled_model_path1 = 'shuffled_resnet_1.pth'
shuffled_model_path2 = 'shuffled_resnet_2.pth'

for epoch in range(epochs):
    train_loss = train(epoch, net)  # Assuming you have a train function
    train_losses.append(train_loss)

    # Save shuffled models
    shuffle_and_save_layers(original_model_state_dict_path, shuffled_model_path1, shuffled_model_path2)

    # Test using shuffled layers
    test_loss_shuffled, acc_shuffled = test(epoch, net, [shuffled_model_path1, shuffled_model_path2])
    print(f"Epoch {epoch + 1}/{epochs}")
    print(f"Train Loss: {train_loss}")
    print(f"Test Loss using Shuffled Layers: {test_loss_shuffled}")
    print(f"Accuracy using Shuffled Layers: {acc_shuffled:.2f}%")
    test_losses_shuffled.append(test_loss_shuffled)
    acc_shuffleds.append(acc_shuffled)

    scheduler.step()