<a href="https://colab.research.google.com/github/FatManWalking/ml-lecture/blob/main/Bj%C3%B6rns_Bonus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install adversarial-robustness-toolbox --quiet

In [2]:
import art

In [3]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import time
import copy

from PIL import Image
from matplotlib import image
from matplotlib import pyplot as plt

import torch
import torch.nn as nn
from torch.nn import Sequential
import torchvision.transforms as transforms
import torchvision.models as models
import torch.optim as optim
import torchvision

from art.classifiers import PyTorchClassifier

from warnings import simplefilter
simplefilter(action='ignore', category=FutureWarning)



In [4]:
def initialize_model(model_name='vgg11', num_classes='10', feature_extract=True, use_pretrained=True):
    """
    initalizes the version of VGG to be used and freezes the layers of the model and then add a new unfrozen head
    
    params:
        model_name(str): for example vgg16_bn for VGG-net with 16 weight layers and batch normalization
        num_classes(int): number of output classes (100 when using CIFAR100)
        feature_extract(bool): should the layers be frozen or not
        use_pretrained(bool): use the pretrained model weights or only the architecture itself
    """
    
    if model_name == 'vgg11':
        model_ft = models.vgg11(pretrained=use_pretrained)
    elif model_name == 'vgg11_bn':
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
    elif model_name == 'vgg16':
        model_ft = models.vgg16(pretrained=use_pretrained)
    elif model_name == 'vgg16_bn':
        model_ft = models.vgg16_bn(pretrained=use_pretrained)
    elif model_name == 'vgg19':
        model_ft = models.vgg19(pretrained=use_pretrained)
    elif model_name == 'vgg19_bn':
        model_ft = models.vgg19_bn(pretrained=use_pretrained)
    
    set_parameter_requires_grad(model_ft, feature_extract)
    num_ftrs = model_ft.classifier[6].in_features
    model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
    input_size = 224

    return model_ft, input_size

In [5]:
def set_parameter_requires_grad(model, feature_extracting):
    """
    this freezes the layers in the network
    the new layer added later will automatically be unfrozen
    """
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [6]:
def get_data_loader(batch_size=20):
    """
    define data augmentation and return data loader with the cifar10 dataset loaded
    parameters:
        batch_size: size of the batch used for training and validation 
                    size of test batch is always 10
    Return:
        3 pytorch.dataloaders for easy batching of the dataset (train, validation and test)
    """

    # data augmentation
    transform = transforms.Compose([
            transforms.Resize([224,224]), # Resizing the image as the VGG only take 224 x 244 as input size
            transforms.RandomHorizontalFlip(), # Flip the data horizontally
            #TODO if it is needed, add the random crop
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
    ])
        
    trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
    test = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=False, transform=transform)
    
    n = len(trainset)
    train, val =torch.utils.data.random_split(trainset, 
                                                    [int(n//100*70), int(n//100*30)],
                                                    generator=torch.Generator().manual_seed(42))
    
    train_loader = torch.utils.data.DataLoader(dataset=train, batch_size=batch_size, shuffle=True, num_workers=0)
    val_loader = torch.utils.data.DataLoader(dataset=val, batch_size=batch_size, shuffle=True, num_workers=0)
    test_loader = torch.utils.data.DataLoader(dataset=test, batch_size=1000, shuffle=True, num_workers=0)

    return train_loader, val_loader, test_loader

In [7]:
train_loader, val_loader, test_loader = get_data_loader(batch_size=20)

Files already downloaded and verified


In [8]:
model_name = 'vgg11_bn'
num_classes = 10
feature_extract = True

In [23]:
class Model(nn.Module):

    def __init__(self, model_name='vgg11', num_classes='10', feature_extract=True, use_pretrained=True):
        """
        initalizes the version of VGG to be used and freezes the layers of the model and then add a new unfrozen head

        params:
            model_name(str): for example vgg16_bn for VGG-net with 16 weight layers and batch normalization
            num_classes(int): number of output classes (100 when using CIFAR100)
            feature_extract(bool): should the layers be frozen or not
            use_pretrained(bool): use the pretrained model weights or only the architecture itself
        """
        super().__init__()

        if model_name == 'vgg11':
            model_ft = models.vgg11(pretrained=use_pretrained)
        elif model_name == 'vgg11_bn':
            model_ft = models.vgg11_bn(pretrained=use_pretrained)
        elif model_name == 'vgg16':
            model_ft = models.vgg16(pretrained=use_pretrained)
        elif model_name == 'vgg16_bn':
            model_ft = models.vgg16_bn(pretrained=use_pretrained)
        elif model_name == 'vgg19':
            model_ft = models.vgg19(pretrained=use_pretrained)
        elif model_name == 'vgg19_bn':
            model_ft = models.vgg19_bn(pretrained=use_pretrained)

        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.model_ft = model_ft
        
        
        self.set_parameter_requires_grad(feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

        self.model_ft.to(self.device)
        self.feature_extract = feature_extract
        self.input_size = input_size
    
    def get_params_to_update(self):
        """ 
        prints the list of weights and bias that not frozen and will be tuned
        
        Gather the parameters to be optimized/updated in this run. If we are
        finetuning we will be updating all parameters. However, if we are
        doing feature extract method, we will only update the parameters
        that we have just initialized, i.e. the parameters with requires_grad
        is True.
        """

        params_to_update = self.model_ft.parameters()
        print("\tWeights and Bias to be tuned:")
        if self.feature_extract:
            params_to_update = []
            for name,param in self.model_ft.named_parameters():
                if param.requires_grad == True:
                    params_to_update.append(param)
                    print("\t\t",name)
        else:
            for name,param in self.model_ft.named_parameters():
                if param.requires_grad == True:
                    print("\t\t",name)
                    
        return params_to_update
    
    def set_parameter_requires_grad(self, feature_extracting):
        """
        this freezes the layers in the network
        the new layer added later will automatically be unfrozen
        """
        if feature_extracting:
            for param in self.model_ft.parameters():
                param.requires_grad = False
        

    def train_model(self, dataloader, criterion, optimizer, num_epochs=25, is_inception=False):
        since = time.time()

        val_acc_history = []
        val_loss_history = []

        best_model_wts = copy.deepcopy(self.model_ft.state_dict())
        best_acc = 0.0
        stop = False

        for epoch in range(num_epochs):
            print('Epoch {}/{}'.format(epoch, num_epochs - 1))
            print('-' * 10)

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    self.model_ft.train()  # Set model to training mode
                else:
                    self.model_ft.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloader:
                    inputs = inputs.to(self.device)
                    labels = labels.to(self.device)
                    
                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        # Get model outputs and calculate loss
                        # Special case for inception because in training it has an auxiliary output. In train
                        #   mode we calculate the loss by summing the final output and the auxiliary output
                        #   but in testing we only consider the final output.
                        if is_inception and phase == 'train':
                            # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                            outputs, aux_outputs = self.model_ft(inputs)
                            loss1 = criterion(outputs, labels)
                            loss2 = criterion(aux_outputs, labels)
                            loss = loss1 + 0.4*loss2
                        else:
                            outputs = self.model_ft(inputs)
                            loss = criterion(outputs, labels)

                        _, preds = torch.max(outputs, 1)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)

                epoch_loss = running_loss / len(dataloader.dataset)
                epoch_acc = running_corrects.double() / len(dataloader.dataset)

                print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    best_model_wts = copy.deepcopy(self.model_ft.state_dict())
                if phase == 'val':
                    val_acc_history.append(epoch_acc)
                    # A simple addition to stop the training early
                    val_loss_history.append(loss)
                    if len(val_loss_history) > 3:
                        stop = (abs(val_loss_history[-3] - val_loss_history[-3]) +
                                abs(val_loss_history[-1] - val_loss_history[-2])) <= 0.001
                        print(abs(val_loss_history[-3] - val_loss_history[-3]) +
                                abs(val_loss_history[-1] - val_loss_history[-2]))
                if stop:
                    break

            print()

        time_elapsed = time.time() - since
        print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
        print('Best val Acc: {:4f}'.format(best_acc))

        # load best model weights
        self.model_ft.load_state_dict(best_model_wts)
        torch.save(self.model_ft, 'model.pth')
        return self.model_ft, val_acc_history

    def load_model(self, path):
        self.model_ft = torch.load(path)

In [24]:
model_obj = Model(model_name=model_name, num_classes=num_classes, feature_extract=feature_extract, use_pretrained=True)

In [25]:
model_obj.model_ft

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPool2d(ke

In [26]:
# Now we finetune the model to get a descent classfication performance on it
print("Original model training:")
params_to_update = model_obj.get_params_to_update()

Original model training:
	Weights and Bias to be tuned:
		 classifier.6.weight
		 classifier.6.bias


In [27]:
#optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)
optimizer_ft = optim.Adam(params_to_update, lr=0.001)
num_epochs = 5

In [28]:
# Setup the loss fxn
criterion = nn.CrossEntropyLoss()

In [None]:
#model_ft, hist = model_obj.train_model(train_loader, criterion, optimizer_ft, num_epochs=num_epochs, is_inception=False)

In [29]:
model_obj.load_model('model.pth')

In [32]:
model_ft = model_obj.model_ft

In [33]:
sample = iter(test_loader)
input_shape = next(sample)[0].shape

In [34]:
classifier_original = PyTorchClassifier(model_ft,
                                        loss=nn.CrossEntropyLoss(),
                                        input_shape=input_shape,
                                        clip_values=(0, 1),
                                        nb_classes=10)

In [35]:
len_steal = len(test_loader)

In [36]:
# Stealing from the unprotected classifier.
from art.attacks import ExtractionAttack
from art.attacks.extraction import CopycatCNN, KnockoffNets

attack_catalogue = {"Probabilistic CopycatCNN": CopycatCNN(classifier=classifier_original,
                                              batch_size_fit=64,
                                              batch_size_query=64,
                                              nb_epochs=num_epochs,
                                              nb_stolen=len_steal,
                                              use_probability=True),
                    "Argmax CopycatCNN": CopycatCNN(classifier=classifier_original,
                                              batch_size_fit=64,
                                              batch_size_query=64,
                                              nb_epochs=num_epochs,
                                              nb_stolen=len_steal,
                                              use_probability=False),
                    "Probabilistic KnockoffNets": KnockoffNets(classifier=classifier_original,
                                              batch_size_fit=64,
                                              batch_size_query=64,
                                              nb_epochs=num_epochs,
                                              nb_stolen=len_steal,
                                              use_probability=True),
                    "Argmax KnockoffNets": KnockoffNets(classifier=classifier_original,
                                              batch_size_fit=64,
                                              batch_size_query=64,
                                              nb_epochs=num_epochs,
                                              nb_stolen=len_steal,
                                              use_probability=False),
                   }

In [38]:
results = []

for test_set_x, test_set_y in test_loader:

    for len_steal in [250, 1000, 3000, 5000]:
        indices = np.random.permutation(len(test_set_x))
        x_steal = test_set_x[indices[:len_steal]]
        y_steal = test_set_y[indices[:len_steal]]
        x_test = test_set_x[indices[len_steal:]]
        y_test = test_set_y[indices[len_steal:]]

        for name, attack in attack_catalogue.items():
            
            model_stolen = initialize_model(model_name=model_name,
                                            num_classes=num_classes,
                                            feature_extract=feature_extract,
                                            use_pretrained=True)
            print(model_stolen)
            
            classifier_stolen = PyTorchClassifier(model_stolen, loss=nn.CrossEntropyLoss(),
                                                input_shape=input_shape, nb_classes=10,
                                                clip_values=(0, 1))
            
            classifier_stolen = attack.extract(x_steal, y_steal, thieved_classifier=classifier_stolen)
            acc = classifier_stolen._model.evaluate(x_test, y_test)[1]
            print(name, ":", acc)
            results.append((name, len_steal, acc))

(VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): ReLU(inplace=True)
    (7): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (8): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): ReLU(inplace=True)
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (13): ReLU(inplace=True)
    (14): MaxPool2d(k

TypeError: ignored

In [None]:
test_set[0][indices[:len_steal]]