# Efficient Model Extraction via Boundary Sampling

*In this notebook we provide code for running our BAM model extraction attack algorithm.*

❌ **Warning**: You will not be able to extract the complete dataset $\mathcal{D}_1$ (30 iteration instead of 40) and you will not be able to train $f'$ for enough epochs. This is because of the GPU-timeout limitation of a free colab instances. Therefore, this code has been modified to train $f'$ with fewer epochs (20 instead of the 40 as used in the paper) and fewer iterations (30 instead of the 40 as used in the paper). Please feel free to copy this notebook and experiment with the hyperparameters on dedicated hardware.

## Scenario
- Victim model $f$: AlexNet
- Training set $\mathcal{D}_0$: CIFAR-10

- Substitute model $f'$ ResNet18
- Population size $N$: 20,000
- Selection size $k$: 6,000
- Iteration count $I$: 30 (instead of 40)
- Training epochs: 20 (instead of 40)

# Evaluation
- Accuracy of the victim and stolen models
- Attack Success Rate for transferred adversarail examples

We plan to release a complete GitHub including documentation with our paper's camera-ready publication.

# 1. Install Dependencies and Libraries

In [None]:
!pip install adversarial-robustness-toolbox

Collecting adversarial-robustness-toolbox
  Downloading adversarial_robustness_toolbox-1.17.1-py3-none-any.whl (1.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: adversarial-robustness-toolbox
Successfully installed adversarial-robustness-toolbox-1.17.1


Imports

In [None]:
import argparse
import gc
import logging
import math
import os
import random
import sys
import time
import requests
from statistics import mean
from torch import optim
import warnings
warnings.filterwarnings('ignore')
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.parallel
from art.attacks.evasion import ProjectedGradientDescentPyTorch
from art.estimators.classification import PyTorchClassifier
from matplotlib import pyplot as plt
from torch import nn
from torch.utils.data import DataLoader, Dataset, TensorDataset
from torchvision import datasets, transforms


# 2. Define Models and Helper Functions

Function to Randomly Generate the Initial Generation for the Evolutionary Algorithm

In [None]:
def generate_random_data_cifar_old(num_images):
    """
    Generate `num_images` random input data in the shape of (num_images, 3, 32, 32)
    """
    image_size = (1, 3, 32, 32)
    data = [torch.rand(image_size).cpu() for _ in range(num_images)]
    return data

Function to Download the Victim Model from Google Drive

In [None]:
def download_file(url, filename):
    """Download a file from a given URL"""
    response = requests.get(url)
    response.raise_for_status()  # Check if the request was successful

    with open(filename, 'wb') as f:
        f.write(response.content)

# Example usage
url = 'https://drive.google.com/uc?id=10MJ2lSvolpqzY4n5z3lK_HKtC7T068t7'

filename = 'teacher_alexnet_for_cifar10_state_dict'
download_file(url, filename)


Function that saves the generated data to the disk

In [None]:
def save_tensor_list_to_file(tensor_list, file_path):
    tensor_array_input = [x[0].detach().cpu().numpy() for x in tensor_list]
    tensor_array_labels = [x[1].detach().cpu().numpy() if type(x[1]) is not np.int64 else np.array(x[1]) for x
                           in tensor_list]

    np.save(file_path + f"_input", tensor_array_input)
    np.save(file_path + f"_labels", tensor_array_labels)

Configuration instance creator

In [None]:
def create_config(log_file=None):
    if log_file is None:
        Config()
    else:
        Config(log_file=log_file)

def prepare_config():
    create_config()

Function to Evaluate the Attack Success Rate of the Surrogate Model Through Testing the Transferability of Successful Adversarial Samples

In [None]:
def get_new_data_loader(model, data_loader, device):
    correct_images = []
    correct_labels = []
    input_shape = None
    for inputs, labels in data_loader:
        # Assuming inputs is a batch of images
        inputs, labels = inputs.to(device), labels.to(device)
        if input_shape is None:
            input_shape = tuple(inputs.shape[1:])
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)

        correct_mask = predicted == labels
        correct_images.append(inputs[correct_mask])
        correct_labels.append(labels[correct_mask])

    correct_images = torch.cat(correct_images, dim=0)
    correct_labels = torch.cat(correct_labels, dim=0)

    correct_dataset = TensorDataset(correct_images, correct_labels)
    correct_data_loader = DataLoader(correct_dataset, batch_size=data_loader.batch_size, shuffle=False)
    return correct_data_loader, input_shape


def test_trans(surrogate_model, victim, loss, num_classes, data_loader):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    new_data_loader, input_shape = get_new_data_loader(victim, data_loader, device)
    equality_tensor_p = 0
    loss = nn.CrossEntropyLoss()
    for batch, (images, labels) in enumerate(new_data_loader):
        clf = PyTorchClassifier(model=surrogate_model, loss=loss,
                                input_shape=input_shape, nb_classes=num_classes)

        PGD_attack = ProjectedGradientDescentPyTorch(estimator=clf, max_iter=random.randint(10, 20), eps=30 / 255,
                                                     num_random_init=1)

        victim = victim.to(device)
        x_n = images.cpu().numpy()

        # random pertubations
        x_p = PGD_attack.generate(x=x_n)
        x_p = torch.from_numpy(x_p)
        x_p = x_p.to(device)
        adv_label_p = victim(x_p).argmax(dim=1)
        # adv_label_p = adv_label_p.to('cpu')

        equality_tensor_p += torch.sum((labels != adv_label_p).int()).item()
    attack_success_rate = equality_tensor_p / len(new_data_loader.sampler)
    return attack_success_rate

Functions for Data and Model Preparation Prior to Training the model

In [None]:
def prepare_for_training(self_model, model_name, optimizer):
    start_epoch = 0
    directory = f"checkpoints/{self_model.__class__.__name__}"
    if Config.instance["delete_checkpoints"]:
        import shutil
        try:
            shutil.rmtree(directory)
            print(f"Folder '{directory}' deleted successfully.")
        except OSError as e:
            print(f"Error deleting folder '{directory}': {e}")

    # Check if the directory exists
    if not os.path.exists(directory):
        # Create the directory
        os.makedirs(directory)
        print(f"Directory '{directory}' created.")
    else:
        print(f"Directory '{directory}' already exists.")
    # Reload saved model and epoch number
    if os.path.exists(f'./{directory}/{model_name}.pth'):
        checkpoint = torch.load(f'./{directory}/{model_name}.pth')
        self_model.load_state_dict(checkpoint['model_state_dict'])
        optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        start_epoch = checkpoint['epoch']
        self_model.test_accuracy_list = checkpoint['test_accuracy_list']
        print("Successfully reloaded model checkpoint!")
    else:
        print("Model checkpoint not found. Starting from the beginning...")
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    model = None
    if torch.cuda.is_available():
        num_of_gpus = torch.cuda.device_count()
        gpu_list = list(range(num_of_gpus))
        model = nn.DataParallel(self_model, device_ids=gpu_list).to(self_model.device)
    # Reload saved model and epoch number
    if os.path.exists(f'./{directory}/best_accuracy_{model_name}.pth'):
        best_model_state_dict = torch.load(f'./{directory}/best_accuracy_{model_name}.pth')
        best_val_accuracy = best_model_state_dict['test_accuracy_list'][-1]  # Track the best validation accuracy
        print("Successfully reloaded best model checkpoint!")
    else:
        print("Best model checkpoint not found.")
        best_val_accuracy = 0.0  # Track the best validation accuracy
        best_model_state_dict = None  # Track the state_dict of the best model
    model.to(self_model.device)
    return model, best_val_accuracy, best_model_state_dict, start_epoch

In [None]:
def one_hot_encode(x, num_classes):
    vec = [0.0] * num_classes
    vec[x] = 1.0
    return vec

The victim model architecture

In [None]:
class Alexnet(nn.Module):
    def __init__(self, name="surrogate_model", n_outputs=10):
        super(Alexnet, self).__init__()

        self.name = name
        self.num_classes = n_outputs

        self.conv1 = nn.Conv2d(3, 48, 5, stride=1, padding=2)
        self.conv1.bias.data.normal_(0, 0.01)
        self.conv1.bias.data.fill_(0)

        self.relu = nn.ReLU()
        self.lrn = nn.LocalResponseNorm(2)
        self.pad = nn.MaxPool2d(3, stride=2)

        self.batch_norm1 = nn.BatchNorm2d(48, eps=0.001)

        self.conv2 = nn.Conv2d(48, 128, 5, stride=1, padding=2)
        self.conv2.bias.data.normal_(0, 0.01)
        self.conv2.bias.data.fill_(1.0)

        self.batch_norm2 = nn.BatchNorm2d(128, eps=0.001)

        self.conv3 = nn.Conv2d(128, 192, 3, stride=1, padding=1)
        self.conv3.bias.data.normal_(0, 0.01)
        self.conv3.bias.data.fill_(0)

        self.batch_norm3 = nn.BatchNorm2d(192, eps=0.001)

        self.conv4 = nn.Conv2d(192, 192, 3, stride=1, padding=1)
        self.conv4.bias.data.normal_(0, 0.01)
        self.conv4.bias.data.fill_(1.0)

        self.batch_norm4 = nn.BatchNorm2d(192, eps=0.001)

        self.conv5 = nn.Conv2d(192, 128, 3, stride=1, padding=1)
        self.conv5.bias.data.normal_(0, 0.01)
        self.conv5.bias.data.fill_(1.0)

        self.batch_norm5 = nn.BatchNorm2d(128, eps=0.001)

        self.fc1 = nn.Linear(1152, 512)
        self.fc1.bias.data.normal_(0, 0.01)
        self.fc1.bias.data.fill_(0)

        self.drop = nn.Dropout(p=0.5)

        self.batch_norm6 = nn.BatchNorm1d(512, eps=0.001)

        self.fc2 = nn.Linear(512, 256)
        self.fc2.bias.data.normal_(0, 0.01)
        self.fc2.bias.data.fill_(0)

        self.batch_norm7 = nn.BatchNorm1d(256, eps=0.001)

        self.fc3 = nn.Linear(256, 10)
        self.fc3.bias.data.normal_(0, 0.01)
        self.fc3.bias.data.fill_(0)

        self.soft = nn.Softmax()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.test_accuracy_list = []
        transform = transforms.Compose([transforms.ToTensor(),
                                        transforms.Normalize((0.5,), (0.5,))])
        testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
        self.testloader = DataLoader(testset, batch_size=512, shuffle=False)

    def forward(self, x):
        if isinstance(x, np.ndarray):
            # Convert NumPy array to PyTorch tensor
            x = torch.tensor(x)
        x = x.to(self.device)
        layer1 = self.batch_norm1(self.pad(self.lrn(self.relu(self.conv1(x)))))
        layer2 = self.batch_norm2(self.pad(self.lrn(self.relu(self.conv2(layer1)))))
        layer3 = self.batch_norm3(self.relu(self.conv3(layer2)))
        layer4 = self.batch_norm4(self.relu(self.conv4(layer3)))
        layer5 = self.batch_norm5(self.pad(self.relu(self.conv5(layer4))))
        flatten = layer5.view(-1, 128 * 3 * 3)
        fully1 = self.relu(self.fc1(flatten))
        fully1 = self.batch_norm6(self.drop(fully1))
        fully2 = self.relu(self.fc2(fully1))
        fully2 = self.batch_norm7(self.drop(fully2))
        logits = self.fc3(fully2)
        return logits

    def test_model(self):
        # Test the model
        correct = 0
        total = 0
        model = None
        if torch.cuda.is_available():
            num_of_gpus = torch.cuda.device_count()
            gpu_list = list(range(num_of_gpus))
            model = nn.DataParallel(self, device_ids=gpu_list).to(self.device)
        # Don't need to keep track of gradients
        with torch.no_grad():
            for images, labels in self.testloader:
                images = images.view(images.shape[0], 3, 32, 32).detach().clone()
                if torch.cuda.is_available():
                    outputs = model(images)
                else:
                    outputs = self(images)
                images, labels, outputs = images.to(self.device), labels.to(self.device), outputs.to(self.device)
                _, predicted = torch.max(outputs, dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"The accuracy of the victim model f is: {100 * correct / total}")
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        return correct / total


The surrogate model architecture

In [None]:
class AlexnetSurrogate(nn.Module):
    def __init__(self, num_classes=10):
        super(AlexnetSurrogate, self).__init__()

        self.conv1 = nn.Conv2d(3, 48, kernel_size=5, stride=1, padding=2)
        self.conv1.bias.data.normal_(0, 0.01)
        self.conv1.bias.data.fill_(0)
        self.relu = nn.ReLU()
        self.lrn = nn.LocalResponseNorm(2)
        self.pad = nn.MaxPool2d(3, stride=2)
        self.batch_norm1 = nn.BatchNorm2d(48, eps=0.001)

        self.layer1 = self.make_residue_block(48, 128, stride=1)
        self.layer2 = self.make_residue_block(128, 192, stride=2)
        self.layer3 = self.make_residue_block(192, 192, stride=1)
        self.layer4 = self.make_residue_block(192, 128, stride=2)

        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc1 = nn.Linear(128, 512)
        self.fc1.bias.data.normal_(0, 0.01)
        self.fc1.bias.data.fill_(0)
        self.drop = nn.Dropout(p=0.5)
        self.batch_norm2 = nn.BatchNorm1d(512, eps=0.001)

        self.fc2 = nn.Linear(512, 256)
        self.fc2.bias.data.normal_(0, 0.01)
        self.fc2.bias.data.fill_(0)
        self.batch_norm3 = nn.BatchNorm1d(256, eps=0.001)

        self.fc3 = nn.Linear(256, num_classes)
        self.fc3.bias.data.normal_(0, 0.01)
        self.fc3.bias.data.fill_(0)

        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.test_accuracy_list = []

        transform = transforms.Compose([transforms.ToTensor(),
                                        transforms.Normalize((0.5,), (0.5,))])
        testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)
        self.testloader = DataLoader(testset, batch_size=1024, shuffle=False)

    def make_residue_block(self, in_channels, out_channels, stride):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.lrn(x)
        x = self.pad(x)
        x = self.batch_norm1(x)

        # Residue blocks
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avg_pool(x)
        x = x.view(x.size(0), -1)

        x = self.fc1(x)
        x = self.drop(x)
        x = self.batch_norm2(x)

        x = self.fc2(x)
        x = self.batch_norm3(x)

        x = self.fc3(x)

        return x

    def train_model(self, train_loader, criterion, optimizer, n_epochs=10, print_every=500,
                    model_name="Alexnet_surrogate_model"):
        model, best_val_accuracy, best_model_state_dict, start_epoch = prepare_for_training(self, model_name, optimizer)
        for epoch in range(start_epoch, n_epochs):
            running_loss = 0.0
            correct = 0
            total = 0
            start_time = time.time()
            start_time_epoch = time.time()
            for i, (inputs, labels) in enumerate(train_loader, 0):
                inputs = inputs.view(inputs.shape[0], 3, 32, 32).detach().clone()
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                if torch.cuda.is_available():
                    outputs = model(inputs)
                else:
                    outputs = self(inputs)
                try:
                    labels = labels.view(-1, outputs.size()[1]).float().detach().clone().requires_grad_(True)
                except:
                    labels = torch.tensor(list(
                        map(lambda x: one_hot_encode(x, outputs.size()[1]), labels))).detach().clone().requires_grad_(
                        True)
                outputs, labels = outputs.to(self.device), labels.to(self.device)
                loss = criterion(outputs, labels)  # + sum_fitnesses

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                running_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                _, labels = torch.max(labels.data, 1)
                correct += (predicted == labels).sum().item()

                if i % print_every == print_every - 1:
                    finish_time = time.time()
                    total_time = finish_time - start_time
                    print('[%d, %5d] loss: %.3f accuracy: %.3f the time it took: %.3f seconds' % (
                        epoch + 1, i + 1, running_loss / print_every, 100 * correct / total, total_time))
                    running_loss = 0.0
                    correct = 0
                    total = 0
                    start_time = time.time()
            finish_time_epoch = time.time()
            total_time_epoch = finish_time_epoch - start_time_epoch
            print(f'Epoch {epoch + 1} took {total_time_epoch} seconds')
            validation_accuracy = self.validate_model()
            self.test_accuracy_list.append(validation_accuracy)
            # Save model after each epoch
            checkpoint = {
                'epoch': epoch + 1,
                'model_state_dict': self.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'test_accuracy_list': self.test_accuracy_list
            }
            directory = f"checkpoints/{self.__class__.__name__}"
            torch.save(checkpoint, f'./{directory}/{model_name}.pth')
            if validation_accuracy > best_val_accuracy:
                best_val_accuracy = validation_accuracy
                best_model_state_dict = {
                    'epoch': epoch + 1,
                    'model_state_dict': self.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'test_accuracy_list': self.test_accuracy_list
                }
                torch.save(best_model_state_dict, f'./{directory}/best_accuracy_{model_name}.pth')
            print("Saved model checkpoint!")
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

        print(
            f"The maximal accuracy during training was: {max(self.test_accuracy_list)} on epoch: {self.test_accuracy_list.index(max(self.test_accuracy_list))}")
        # self.plot_accuracy_graph()

    def plot_accuracy_graph(self):
        accuracy_list = self.test_accuracy_list
        # Plotting using Seaborn
        sns.set(style="darkgrid")
        sns.lineplot(x=range(len(accuracy_list)), y=accuracy_list, marker='X')

        # Set labels and title
        plt.xlabel("Epoch")
        plt.ylabel("Accuracy")
        plt.title("Accuracy Over Epochs")

        # Display the plot
        plt.show()

    def validate_model(self):
        # Test the model
        correct = 0
        total = 0
        model = None
        if torch.cuda.is_available():
            num_of_gpus = torch.cuda.device_count()
            gpu_list = list(range(num_of_gpus))
            model = nn.DataParallel(self, device_ids=gpu_list).to(self.device)
        # Don't need to keep track of gradients
        with torch.no_grad():
            for images, labels in self.testloader:
                images = images.view(images.shape[0], 3, 32, 32).detach().clone()
                if torch.cuda.is_available():
                    outputs = model(images)
                else:
                    outputs = self(images)
                images, labels, outputs = images.to(self.device), labels.to(self.device), outputs.to(self.device)
                _, predicted = torch.max(outputs, dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"Accuracy on test set is: {100 * correct / total}")
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        return correct / total

    def test_model(self):
        # Test the model
        correct = 0
        total = 0
        model = None
        if torch.cuda.is_available():
            num_of_gpus = torch.cuda.device_count()
            gpu_list = list(range(num_of_gpus))
            model = nn.DataParallel(self, device_ids=gpu_list).to(self.device)
        # Don't need to keep track of gradients
        with torch.no_grad():
            for images, labels in self.testloader:
                images = images.view(images.shape[0], 3, 32, 32).detach().clone()
                if torch.cuda.is_available():
                    outputs = model(images)
                else:
                    outputs = self(images)
                images, labels, outputs = images.to(self.device), labels.to(self.device), outputs.to(self.device)
                _, predicted = torch.max(outputs, dim=1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        print(f"The accuracy of the substitute (stolen) model f' is: {100 * correct / total}")
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        return correct / total


Custom DataLoaders for Retrieving Saved Data from Disk.
For colab we load the data from disk due to RAM limitations (12.7GB!). This is done using the `DatasetLoader` class. If you use this code on better hardware, you should

1. use the `SmallDatasetLoader` to improve runtime.
2. set `small_dataset=True` when you call `main_algorithm`

In [None]:
class SmallDatasetLoader(Dataset):
    def __init__(self, data_dir, file_size=20000):
        # Convert the list of tuples to a list of tensors
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
            # Load the data tensor from the file
        self.data_dir = data_dir
        file_list = os.listdir(self.data_dir)
        data_list = sorted([x for x in file_list if "_input" in x])
        labels_list = sorted([x for x in file_list if "_labels" in x])
        self.tensor_list = []
        self.label_list = []
        for data_file_name, labels_file_name in zip(data_list, labels_list):
            data_file_path = os.path.join(self.data_dir, data_file_name)
            labels_file_path = os.path.join(self.data_dir, labels_file_name)
            data = np.load(data_file_path, mmap_mode='r')
            labels = np.load(labels_file_path, mmap_mode='r')
            data = torch.split(torch.tensor(data), split_size_or_sections=1, dim=0)
            labels = list(torch.split(torch.tensor(labels), split_size_or_sections=1, dim=0))
            shape = data[0].shape
            shape_labels = labels[0].shape
            data = [x.view(-1, shape[2], shape[3], shape[4]) for x in data]
            labels = [x.view(-1, shape_labels[2]) for x in labels]
            self.tensor_list.extend(data)
            self.label_list.extend(labels)

    def __len__(self):
        return len(self.tensor_list)

    def add_data(self, cur_tensor_list, cur_label_list):
        self.tensor_list.extend(cur_tensor_list)
        self.label_list.extend(cur_label_list)

    def __getitem__(self, idx):
        data, label = self.tensor_list[idx], self.label_list[idx]

        return data, label


In [None]:
class DatasetLoader(Dataset):
    def __init__(self, data_dir, file_size=20000, transform=None, subset_generations=None):
        self.data_dir = data_dir
        self.transform = transform
        file_list = os.listdir(data_dir)
        self.data_list = sorted([x for x in file_list if "_input" in x])
        self.labels_list = sorted([x for x in file_list if "_labels" in x])
        if subset_generations is not None:
            away_data = sorted([x for x in self.data_list if "Away_" in x], key=lambda y: int(y.split("_")[-2]))
            away_labels = sorted([x for x in self.labels_list if "Away_" in x], key=lambda y: int(y.split("_")[-2]))
            toward_data = sorted([x for x in self.data_list if "Toward_" in x], key=lambda y: int(y.split("_")[-2]))
            toward_labels = sorted([x for x in self.labels_list if "Toward_" in x], key=lambda y: int(y.split("_")[-2]))
            self.data_list = away_data[:subset_generations] + toward_data[:subset_generations]
            self.labels_list = away_labels[:subset_generations] + toward_labels[:subset_generations]
        self.file_size = file_size

    def __len__(self):
        # Return the total number of tensors in all files
        return len(self.data_list) * self.file_size

    def __getitem__(self, idx):
        file_idx = idx // self.file_size  # Calculate which file to load
        sample_idx = idx % self.file_size  # Calculate the index within the file

        data_file_name = self.data_list[file_idx]
        labels_file_name = self.labels_list[file_idx]
        data_file_path = os.path.join(self.data_dir, data_file_name)
        labels_file_path = os.path.join(self.data_dir, labels_file_name)

        # Load the data tensor from the file
        data = np.load(data_file_path, mmap_mode='r')  # Use mmap_mode to avoid loading the entire file into memory
        labels = np.load(labels_file_path, mmap_mode='r')  # Use mmap_mode to avoid loading the entire file into memory

        # Extract the individual tensor at the specified index
        data = data[sample_idx]
        labels = labels[sample_idx]

        # You can apply transformations here if needed
        if self.transform:
            # Convert to PIL Image
            data_size = data.shape
            pil_image = torch.tensor(data.squeeze(0)).permute(1, 2, 0).numpy()
            pil_image = self.transform(pil_image)
            pil_image.view(data_size)
            # Convert back to tensor
            data = pil_image

        return data, labels

    def set_transform(self, transform):
        self.transform = transform


# 3. BAM: The attack algorithm

The main code for the ES algorithm

In [None]:
class BasicModelGeneticAlgorithm:
    def __init__(self, model, random_data_generator_function, num_of_classes, model_name):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = model.to(self.device)
        if torch.cuda.is_available():
            num_of_gpus = torch.cuda.device_count()
            gpu_list = list(range(num_of_gpus))
            self.model = nn.DataParallel(self.model, device_ids=gpu_list)
        self.query_counter = 0
        self.avg_fitness = []
        self.random_data_generator_function = random_data_generator_function
        self.num_of_classes = num_of_classes
        self.model_name = model_name

    # Define the fitness function
    def fitness(self, population, epsilon):
        """
        This function receives an output from the target model and evaluate its fitness (we want that the output confidence
        will be as close to uniform distribution as possible)
        :rtype: float - the fittness of the record
        """
        predictions_tensor = torch.cat([ind[1] - epsilon for ind in population])
        max_probs, _ = torch.max(predictions_tensor, dim=1)
        new_max_probs = torch.max(max_probs, torch.tensor(0))
        list_of_fitnesses = [x.item() for x in torch.split(new_max_probs, split_size_or_sections=1, dim=0)]
        return list_of_fitnesses

    # Generate a random population of individuals
    def generate_population(self, size, generations):
        random_data = self.random_data_generator_function(size)
        population = self.predict_and_create_proxy_dataset(random_data, 0, generations)
        return population

    # Select individuals for the next generation
    def select(self, population, fitnesses, k):
        fitnesses_weights = [(i, f) for i, f in enumerate(fitnesses)]
        # sort by fitness and select the k individuals with the lowest fitness
        sorted_fitnesses = sorted(fitnesses_weights, key=lambda x: x[1])
        top_k_fitness = sorted_fitnesses[:k]

        top_k_population = [population[x[0]] for x in top_k_fitness]
        top_k_population_with_fitness = [(population[x[0]], x[1]) for x in top_k_fitness]
        return top_k_population, top_k_population_with_fitness

    # Run the genetic algorithm
    def run_genetic_algorithm(self, generations, k, epsilon, population_size, search_spread, non_2d):
        population = self.generate_population(population_size, generations)
        for generation in range(generations):
            gc.collect()
            start_time = time.time()
            fitnesses = self.fitness(population, epsilon)
            fitness_avg = mean(fitnesses)

            self.avg_fitness.append(fitness_avg)
            top_k_population, top_k_population_with_fitness = self.select(population, fitnesses, k)
            gc.collect()
            new_population = self.create_new_generation_with_noise(top_k_population, population,
                                                                   population_size, search_spread)
            gc.collect()
            population = self.predict_and_create_proxy_dataset(new_population, generation, generations)
            gc.collect()
            finish_time = time.time()
            total_time = finish_time - start_time
            print(
                f"Generation number: {generation}, The average fitness: {fitness_avg:.5f}, The time it took was: {total_time:.3f}seconds")
            if not non_2d:
                x_data = torch.cat([x[0] for x in population], dim=0).to(self.device)
                y_data = torch.cat([x[1].view(-1, 2) for x in population], dim=0).to(self.device)
                if isinstance(self.model, nn.DataParallel):
                    self.model.module.plot_decision_boundary(x_data, y_data)
                else:
                    self.model.plot_decision_boundary(x_data, y_data)
        return population

    def predict_and_create_proxy_dataset(self, population, cur_generation, generations):
        # Define the desired batch size
        max_batch_size = Config.instance["genetic_alg_prediction_max_batch_size"]
        batch_size = max_batch_size if len(population) > max_batch_size else len(population)
        num_samples = len(population)

        model_prediction_list = []

        for i in range(0, num_samples, batch_size):
            batch = torch.cat(population[i:i + batch_size], dim=0).to(self.device)
            batch_prediction = self.model(batch).detach()
            model_prediction_list.append(batch_prediction)

        model_prediction = torch.cat(model_prediction_list, dim=0)
        self.query_counter += model_prediction.shape[0]

        list_of_confidence = list(torch.split(model_prediction, split_size_or_sections=1, dim=0))
        cur_dataset = [(torch.tensor(population[i]), list_of_confidence[i]) for i in range(len(list_of_confidence))]
        sub_folder = "Toward"
        data_directory = Config.instance["data_directory"]
        directory = f"{data_directory}/{self.model_name}/{generations}_generations"
        # Check if the directory exists
        if not os.path.exists(directory):
            # Create the directory
            os.makedirs(directory)
            print(f"Directory '{directory}' created.")
        file_path_confidence = f"{directory}/{sub_folder}_proxy_dataset_confidence_{generations}_batch_{cur_generation}"
        save_tensor_list_to_file(cur_dataset, file_path_confidence)
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        return cur_dataset

    def create_new_generation_with_noise(self, top_k_population, population, population_size, search_spread):
        new_population = [x[0] for x in top_k_population]
        cur_population_size = len(top_k_population)
        max_values, _ = torch.max(torch.stack([x[0][0] for x in population]), dim=0)
        min_values, _ = torch.min(torch.stack([x[0][0] for x in population]), dim=0)
        ss_vector = ((max_values - min_values) / search_spread).detach()
        population_tensor = torch.cat(new_population)
        population_original_shape = tuple(population_tensor.shape)
        factor = math.ceil((float(population_size) / float(cur_population_size)))
        new_population_list = []
        noise_size = 2
        for i in range(factor - 1):
            random_number = random.choice([1, -1])
            new_population_list.append(random_number * (noise_size * ss_vector * torch.rand(
                population_original_shape) - 0.5 * noise_size * ss_vector) + population_tensor)
        splitted_population_list = [list(torch.split(x, 1, dim=0)) for x in new_population_list]
        flattened_list = [item for sublist in splitted_population_list for item in sublist]
        splitted_population = flattened_list + new_population
        new_population = random.sample(splitted_population, population_size)
        return new_population

    def update_query_counter(self, amount):
        self.query_counter += amount


The code for training $f'$

In [None]:
def train_surrogate_model_generic(dataloader, num_epochs, model_class, criterion, optimizer_name="AdamW"):
    """
    :param extracted_dataset:
    :param num_epochs:
    :param model_class: this model class must have train and test methods implemented in it.
    :param criterion:
    :return: Trained surrogate model that is a copy of the target model
    """
    surrogate_model = model_class()
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    surrogate_model.to(device)
    if optimizer_name == "Adam":
        optimizer = optim.Adam(surrogate_model.parameters(), lr=1e-2)
    elif optimizer_name == "SGD":
        optimizer = optim.SGD(surrogate_model.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    elif optimizer_name == "RMSprop":
        optimizer = optim.RMSprop(surrogate_model.parameters(), lr=1e-2, alpha=0.9, momentum=0.5)
    else:
        optimizer = optim.AdamW(surrogate_model.parameters(), lr=5e-3)

    # Train the model
    start_time = time.time()
    surrogate_model.train_model(dataloader, criterion, optimizer, n_epochs=num_epochs)
    finish_time = time.time()
    total_time = finish_time - start_time
    print(f"The time it took to train the model was: {total_time}seconds")

    return surrogate_model

The main BAM code: runs the ES algorithm and then trains $f'$ on the extracted data

In [None]:
def main_algorithm(model, model_class, criterion, random_data_generator_function, num_of_classes,
                    k=300, epsilon=0.05, population_size=1000, generations=20, search_spread=10, epochs=50,
                    optimizer_name="AdamW", non_2d=True, small_dataset=False, save_path=None):
    data_directory = Config.instance["data_directory"]
    destination_folder = Config.instance["destination_folder"].format(data_directory=data_directory,
                                                                      model_class=f"{model_class.__name__}",
                                                                      generations=f"{generations}")
    file_path_confidence_batch = Config.instance["file_path_confidence_batch"].format(
        destination_folder=destination_folder, generations=f"{generations}")
    if Config.instance["dont_get_from_dist"]:
        import shutil
        try:
            shutil.rmtree(destination_folder)
            print(f"Folder '{destination_folder}' deleted successfully.")
        except OSError as e:
            print(f"Error deleting folder '{destination_folder}': {e}")
    # Here we are using Evolutionary algorithm in order to extract data to train copy model
    ga = BasicModelGeneticAlgorithm(model, random_data_generator_function, num_of_classes, model_class.__name__)
    if not os.path.exists(file_path_confidence_batch) or Config.instance["dont_get_from_dist"]:
        best_individuals1 = ga.run_genetic_algorithm(generations, k, epsilon, population_size, search_spread, non_2d)
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        print(f"The number of queries was: {ga.query_counter}")

    batch_size = Config.instance["batch_size"]
    if small_dataset:
        dataset = SmallDatasetLoader(destination_folder, file_size=population_size)
    else:
        dataset = DatasetLoader(destination_folder, file_size=population_size)

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, pin_memory=True, num_workers=16)

    print(f"Now we are training the model with the data extracted by using evolutionary algorithms:")
    if non_2d:
        Ea_no_knn_surrogate_model = train_surrogate_model_generic(dataloader, epochs, model_class, criterion,
                                                                  optimizer_name=optimizer_name)
        print("\nThe final results are:")
        model_acc = Ea_no_knn_surrogate_model.test_model()
    attack_success_rate = test_trans(Ea_no_knn_surrogate_model, model, criterion, num_of_classes, model.testloader)
    print(f"The Attack Success Rate for PGD samples generated f' and used on f (transfer) is: {attack_success_rate}")
    # Save model after each epoch

    if Ea_no_knn_surrogate_model.__class__.__name__ == 'DataParallel':
        checkpoint = {
            'model_state_dict': Ea_no_knn_surrogate_model.module.state_dict()
        }
    else:
        checkpoint = {
            'model_state_dict': Ea_no_knn_surrogate_model.state_dict(),
        }
    if save_path is None:
        model_name = "phase1"
        directory = f"checkpoints/phases_tests/{Ea_no_knn_surrogate_model.__class__.__name__}"
    else:
        model_name = save_path
        directory = f"checkpoints/model_test/{Ea_no_knn_surrogate_model.__class__.__name__}"

    # Check if the directory exists
    if not os.path.exists(directory):
        # Create the directory
        os.makedirs(directory)
        print(f"Directory '{directory}' created.")
    torch.save(checkpoint, f'./{directory}/{model_name}.pth')
    # del proxy_dataset
    del Ea_no_knn_surrogate_model
    del dataset
    # release all GPU memory
    if torch.cuda.is_available():
        torch.cuda.empty_cache()
    return model_acc, attack_success_rate

# 4. Evaluate BAM

Configuration settings - these include the essential hyperparameters and setup details required for the subsequent sections of the code.

In [None]:
class Config:
    instance = None
    log = None

    def __new__(cls, log_file="model_log.log"):
        if cls.instance is None:
            ins = cls.create_instance()
            cls.instance = ins
        if cls.log is None:
            cls.log = cls.configure_logging(log_file=log_file)

    @staticmethod
    def create_instance():
        # Default config for Jupyter notebook
        config = {
            'k': 6000,
            'epsilon': 0.0005,
            'population_size': 20000,
            'generations': 30, # Change this to 40 generations if you have resources more then the free version of colab
            'search_spread': 10,
            'epochs': 20, # Change this to 40 epoch if you have resources more then the free version of colab
            'dont_get_from_dist': True,
            'num_of_classes': 10,
            'learning_rate': 0.3,
            'optimizer_name': 'AdamW',
            'batch_size': 64,
            'delete_checkpoints': True,
            'genetic_alg_prediction_max_batch_size': 500,
            'data_directory': 'SaveDataset/Batches/colab',
            'destination_folder': '{data_directory}/{model_class}/{generations}_generations',
            'file_path_confidence_batch': '{destination_folder}/Away_proxy_dataset_confidence_{generations}_batch_0_input.npy',
        }
        return config


    @staticmethod
    def configure_logging(log_file="model_log.log", log_level="INFO"):
        logger = logging.getLogger("my_logger")
        logger.setLevel(log_level)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

        return logger

prepare_config()
config = Config.instance
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
num_of_classes = config["num_of_classes"]
name = 'teacher_alexnet_for_cifar10'
ckpt_path = 'teacher_alexnet_for_cifar10_state_dict'
alex = Alexnet(name, num_of_classes)
alex.load_state_dict(torch.load(ckpt_path, map_location=device))
alex.to(device)
k = config["k"]
epsilon = config["epsilon"]
population_size = config["population_size"]
generations = config["generations"]
search_spread = config["search_spread"]
epochs = config["epochs"]
criterion = nn.MSELoss().to(device)
optimizer_name = config["optimizer_name"]

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:03<00:00, 45827233.92it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


Execute BAM

In [None]:
result = main_algorithm(alex, AlexnetSurrogate, criterion, generate_random_data_cifar_old, num_of_classes,
                        k, epsilon, population_size, generations, search_spread, epochs,
                        optimizer_name=optimizer_name, small_dataset=False)

test_acc = alex.test_model()

Error deleting folder 'SaveDataset/Batches/colab/AlexnetSurrogate/40_generations': [Errno 2] No such file or directory: 'SaveDataset/Batches/colab/AlexnetSurrogate/40_generations'
Directory 'SaveDataset/Batches/colab/AlexnetSurrogate/40_generations' created.
Generation number: 0, The average fitness: 10.75227, The time it took was: 9.104seconds
Generation number: 1, The average fitness: 9.82730, The time it took was: 6.585seconds
Generation number: 2, The average fitness: 9.70070, The time it took was: 6.895seconds
Generation number: 3, The average fitness: 9.70349, The time it took was: 6.516seconds
Generation number: 4, The average fitness: 9.77393, The time it took was: 7.228seconds
Generation number: 5, The average fitness: 9.78873, The time it took was: 6.649seconds
Generation number: 6, The average fitness: 9.85808, The time it took was: 6.968seconds
Generation number: 7, The average fitness: 9.89243, The time it took was: 6.985seconds
Generation number: 8, The average fitness: 9