In [1]:
!pip install adversarial-robustness-toolbox

Collecting adversarial-robustness-toolbox
  Downloading adversarial_robustness_toolbox-1.17.1-py3-none-any.whl.metadata (11 kB)
Downloading adversarial_robustness_toolbox-1.17.1-py3-none-any.whl (1.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.7/1.7 MB[0m [31m32.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: adversarial-robustness-toolbox
Successfully installed adversarial-robustness-toolbox-1.17.1


In [2]:
import numpy as np
import cv2
import os
import pickle
import torch.nn as nn
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
import time
from math import log10, sqrt
from torch.utils.data import DataLoader
from art.utils import load_cifar10
from art.estimators.classification import PyTorchClassifier
import torch.optim as optim
import random
from art.preprocessing.standardisation_mean_std import StandardisationMeanStdPyTorch
from art.attacks.evasion.hop_skip_jump import HopSkipJump
from torch.quantization import MovingAverageMinMaxObserver
from torch.ao.quantization.observer import MinMaxObserver
from torch.quantization import QuantStub, DeQuantStub
import torchvision.transforms as transforms
import torchvision.transforms.functional as F
torch.manual_seed(0)
torch.cuda.manual_seed(0)
np.random.seed(0)
random.seed(0)

In [3]:
def test(model: nn.Module, dataloader: DataLoader, cuda=False) -> float:
    correct = 0
    total = 0
    model.eval()
    with torch.no_grad():
        for data in dataloader:
            inputs, labels = data
            if cuda:
              inputs = inputs.cuda()
              labels = labels.cuda()
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

def evaluator(model, loader):
  model.eval()
  top_1 = 0
  top_5 = 0
  with torch.no_grad():
    for data in loader:
      inputs, labels = data
      outputs = model(inputs)

      _, predicted = torch.max(outputs, 1, keepdim=True)
      top_1 += torch.sum(predicted.view(-1) == labels).item()

      _, predicted_5 = torch.topk(outputs, k=5, dim=1)
      top_5 += torch.sum(predicted_5 == labels.unsqueeze(1)).item()

  return ((top_1/400) * 100, (top_5/400) * 100)



transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])])
testset = torchvision.datasets.CIFAR10(root='./data', train=False,download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=64,shuffle=False, num_workers=2, pin_memory=True)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./data/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:08<00:00, 20578684.88it/s]


Extracting ./data/cifar-10-python.tar.gz to ./data


In [4]:

(x_train, y_train), (x_test, y_test), min_pixel_value, max_pixel_value = load_cifar10()

x_train = np.transpose(x_train, (0, 3, 1, 2)).astype(np.float32)
x_test = np.transpose(x_test, (0, 3, 1, 2)).astype(np.float32)

classes_cifar = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']

y_test_set = np.zeros((400,),np.int8)

y_train_set = np.zeros((400,),np.int8)


for i in range(400):
        y_test_set[i] = np.where(y_test[i] == 1)[0][0]

for i in range(400):
        y_train_set[i] = np.where(y_train[i] == 1)[0][0]
        
        
        
        


In [5]:
try:
    from torch.hub import load_state_dict_from_url
except ImportError:
    from torch.utils.model_zoo import load_url as load_state_dict_from_url
from functools import partial
from typing import Dict, Type, Any, Callable, Union, List, Optional
from torch.ao.nn.quantized.modules.functional_modules import FloatFunctional
def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)

def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)
        self.downsample = downsample
        self.stride = stride
        self.ff = torch.nn.quantized.FloatFunctional()
    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        #out += identity
        out = self.ff.add(out, identity)
        out = self.relu(out)

        return out


class CifarResNet(nn.Module):

    def __init__(self, block, layers, num_classes=10):
        super(CifarResNet, self).__init__()
        self.inplanes = 16
        self.conv1 = conv3x3(3, 16)
        self.bn1 = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(block, 16, layers[0])
        self.layer2 = self._make_layer(block, 32, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 64, layers[2], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(64 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [6]:
resnet56_model = CifarResNet(BasicBlock,[9]*3)
#try:
#    from torch.hub import load_state_dict_from_url
#except ImportError:
#    from torch.utils.model_zoo import load_url as load_state_dict_from_url
#resnet56_model.load_state_dict(load_state_dict_from_url("https://github.com/chenyaofo/pytorch-cifar-models/releases/download/resnet/cifar10_resnet56-187c023a.pt"))
resnet56_model.load_state_dict(torch.load("resnet56quant/ResNet56_1it_CIFAR10_93.9acc.pkl",map_location=torch.device("cpu")))

resnet56_model = torch.quantization.QuantWrapper(resnet56_model)
B=8
resnet56_model.qconfig = torch.quantization.QConfig(activation= MovingAverageMinMaxObserver.with_args(quant_min=0, quant_max=int(2 ** B - 1), dtype=torch.quint8,
                                                              qscheme=torch.per_tensor_affine, reduce_range=False),
                                                     weight= MovingAverageMinMaxObserver.with_args(quant_min=int(-(2 ** B) / 2), quant_max=int((2 ** B) / 2 - 1), dtype=torch.qint8,
                                                              qscheme=torch.per_tensor_symmetric, reduce_range=False))
torch.quantization.prepare(resnet56_model, inplace=True)

resnet56_model.to("cpu")
test(resnet56_model, testloader, cuda=False)
resnet56_model.to("cpu")

torch.quantization.convert(resnet56_model, inplace=True)

resnet56_model.load_state_dict(torch.load("/resnet56-quantized-models/ResNet56_Quantized_1RT_93.64.pt"))

  device=storage.device,


<All keys matched successfully>

In [7]:
# Step 2a: Define the loss function and the optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(resnet56_model.parameters(), lr=0.01, momentum=0.9, dampening=0,weight_decay=0.0005, nesterov=True)

# Step 3: Create the ART classifier
classifier = PyTorchClassifier(
    model=resnet56_model,
    clip_values=(min_pixel_value, max_pixel_value),
    loss=criterion,
    optimizer=optimizer,
    preprocessing=((0.4914, 0.4822, 0.4465), (0.247, 0.243, 0.261)),
    input_shape=(3, 32, 32),
    nb_classes=10,
    device_type = "cpu"
)
print("HopSkipJump Attack Initialization")
attack = HopSkipJump(classifier,64,targeted = False,verbose = True)
x_test_hop_res56quant = attack.generate(x_train[0:400],y_train_set[0:400])
with open("x_train_hop_res56_quant_400_psnr_check_1_retrain.pkl",'wb') as f:
    pickle.dump(x_test_hop_res56quant,f)

HopSkipJump Attack Initialization


HopSkipJump:   0%|          | 0/400 [00:00<?, ?it/s]