In [1]:

import time
import os
import copy

# PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, transforms

# Pennylane
import pennylane as qml
from pennylane import numpy as np

from advertorch.attacks import PGDAttack
from advertorch.attacks import L2PGDAttack
from advertorch.attacks import FABAttack
from advertorch.attacks import SparseL1DescentAttack
from advertorch.attacks import LinfSPSAAttack
from advertorch.attacks import GradientSignAttack
from advertorch.attacks import GradientAttack
from advertorch.utils import predict_from_logits

torch.manual_seed(42)
np.random.seed(42)

# Plotting
import matplotlib.pyplot as plt

# OpenMP: number of parallel threads.
os.environ["OMP_NUM_THREADS"] = "1"



In [2]:
n_qubits = 4                # Number of qubits
step = 0.0004               # Learning rate
batch_size = 4              # Number of samples for each training step
num_epochs = 25              # Number of training epochs
q_depth = 2                 # Depth of the quantum circuit (number of variational layers)
gamma_lr_scheduler = 0.1    # Learning rate reduction applied every 10 epochs.
q_delta = 0.01              # Initial spread of random quantum weights
start_time = time.time()    # Start of the computation timer

In [3]:
dev = qml.device("default.qubit", wires=n_qubits)

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
data_transforms = {
    "train": transforms.Compose(
        [
            # transforms.RandomResizedCrop(224),     # uncomment for data augmentation
            # transforms.RandomHorizontalFlip(),     # uncomment for data augmentation
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            # Normalize input channels using mean values and standard deviations of ImageNet.
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    ),
    "val": transforms.Compose(
        [
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    ),
}

data_dir = "data/dataset1"
# data_dir = "data/hymenoptera_data"
image_datasets = {
    x if x == "train" else "validation": datasets.ImageFolder(
        os.path.join(data_dir, x), data_transforms[x]
    )
    for x in ["train", "val"]
}
dataset_sizes = {x: len(image_datasets[x]) for x in ["train", "validation"]}
class_names = image_datasets["train"].classes

# Initialize dataloader
dataloaders = {
    x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)
    for x in ["train", "validation"]
}

In [6]:
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 10000.0  # Large arbitrary number
    best_acc_train = 0.0
    best_loss_train = 10000.0  # Large arbitrary number
    print("Training started:")

    for epoch in range(num_epochs):

        # Each epoch has a training and validation phase
        for phase in ["train", "validation"]:
            if phase == "train":
                # Set model to training mode
                model.train()
            else:
                # Set model to evaluate mode
                model.eval()
            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            n_batches = dataset_sizes[phase] // batch_size
            it = 0
            for inputs, labels in dataloaders[phase]:
                since_batch = time.time()
                batch_size_ = len(inputs)
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                # Track/compute gradient and make an optimization step only when training
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # Print iteration results
                running_loss += loss.item() * batch_size_
                batch_corrects = torch.sum(preds == labels.data).item()
                running_corrects += batch_corrects
                print(
                    "Phase: {} Epoch: {}/{} Iter: {}/{} Batch time: {:.4f}".format(
                        phase,
                        epoch + 1,
                        num_epochs,
                        it + 1,
                        n_batches + 1,
                        time.time() - since_batch,
                    ),
                    end="\r",
                    flush=True,
                )
                it += 1

            # Print epoch results
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            print(
                "Phase: {} Epoch: {}/{} Loss: {:.4f} Acc: {:.4f}        ".format(
                    "train" if phase == "train" else "validation  ",
                    epoch + 1,
                    num_epochs,
                    epoch_loss,
                    epoch_acc,
                )
            )

            # Check if this is the best model wrt previous epochs
            if phase == "validation" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == "validation" and epoch_loss < best_loss:
                best_loss = epoch_loss
            if phase == "train" and epoch_acc > best_acc_train:
                best_acc_train = epoch_acc
            if phase == "train" and epoch_loss < best_loss_train:
                best_loss_train = epoch_loss

            # Update learning rate
            if phase == "train":
                scheduler.step()

    # Print final results
    model.load_state_dict(best_model_wts)
    time_elapsed = time.time() - since
    print(
        "Training completed in {:.0f}m {:.0f}s".format(time_elapsed // 60, time_elapsed % 60)
    )
    print("Best test loss: {:.4f} | Best test accuracy: {:.4f}".format(best_loss, best_acc))
    return model

In [33]:
def test_model(model,adversary,defense):
    since = time.time()
    print("Validation started:")

    model.eval()
    
    running_corrects =0
    running_corrects_test=0
    running_corrects_test_untargetted=0
    running_corrects_test_cln=0
    running_corrects_test_adv_defense=0

    batch_corrects_test=0
    batch_corrects_test_untargetted =0 
    batch_corrects_test_clean_defend=0
    batch_correts_adversarial_defense=0

    # running_corrects_adversary_untargetted=0
    # running_corrects_adversary_targetted=0

    phase = "validation"

    n_batches = dataset_sizes[phase]
    it=0

    for inputs,labels in dataloaders[phase]:
        batch_size_ = len(inputs)
        inputs = inputs.to(device)
        labels = labels.to(device)
        adv_untargeted = adversary.perturb(inputs, labels)
        adv_defended = defense(adv_untargeted)
        cln_defended = defense(inputs)
        
        
#         pred_adv = predict_from_logits(model(adv))
        
        
        
        pred_cln = predict_from_logits(model(inputs))
        pred_untargeted_adv = predict_from_logits(model(adv_untargeted))
        pred_cln_defended = predict_from_logits(model(cln_defended))
        pred_adv_defended = predict_from_logits(model(adv_defended))
        
      # print("__________________")
      # print("original: ",pred_cln)
      # print("untargetted Attack: ",pred_untargeted_adv)
      # print("__________________")
      # outputs = model(inputs)
      # _,preds = torch.max(outputs,1)
        
        # batch_corrects = torch.sum(preds == labels.data).item()
        batch_corrects_test = torch.sum(pred_cln == labels.data).item()
        batch_corrects_test_untargetted = torch.sum(pred_untargeted_adv == labels.data).item()
        batch_corrects_test_clean_defend = torch.sum(pred_cln_defended == labels.data).item()
        batch_correts_adversarial_defense = torch.sum(pred_adv_defended == labels.data).item()
        
        
        # running_corrects += batch_corrects
        running_corrects_test += batch_corrects_test
        running_corrects_test_untargetted += batch_corrects_test_untargetted 
        running_corrects_test_cln += batch_corrects_test_clean_defend
        running_corrects_test_adv_defense += batch_correts_adversarial_defense
        
    # final_acc = running_corrects / dataset_sizes[phase]
    final_acc_test = running_corrects_test / dataset_sizes[phase]
    final_acc_test_untargetted = running_corrects_test_untargetted / dataset_sizes[phase]
    final_acc_test_cln = running_corrects_test_cln / dataset_sizes[phase]
    final_acc_test_adv_def = running_corrects_test_adv_defense / dataset_sizes[phase]
    
    # print("Final Accuracy(original model): ",final_acc,final_acc_test)
    time_elapsed = time.time() - since
    print(
        "Testing completed in {:.0f}m {:.0f}s".format(time_elapsed // 60, time_elapsed % 60)
    )


    print("Final Accuracy(original model): ",final_acc_test)
    print("Adversarial Attack(untargetted) Accuracy: ",final_acc_test_untargetted)
    print("Final Accuracy(original model+defense): ",final_acc_test_cln)
    print("Adversarial Attack(untargetted+defense) Accuracy: ",final_acc_test_adv_def)
    return final_acc_test,final_acc_test_untargetted,final_acc_test_cln,final_acc_test_adv_def

In [25]:
class ClassicalTransferLearningModel(nn.Module):
    def __init__(self,num_of_features):
        super().__init__()
        # print("num of features",num_of_features)
        self.fc1 = nn.Linear(num_of_features,4)
        self.fc2 = nn.Linear(4,2)
        
    def forward(self,x):
        x = self.fc1(x)
        x = self.fc2(x)
        
        return x

In [26]:
model_classical = torchvision.models.resnet18(pretrained=True)

for param in model_classical.parameters():
    param.requires_grad = False

num_ftrs = model_classical.fc.in_features
model_classical.fc = ClassicalTransferLearningModel(num_ftrs)
model_classical = model_classical.to(device)

In [27]:
criterion_classical = nn.CrossEntropyLoss()
optimizer_classical = optim.Adam(model_classical.fc.parameters(),lr=step)
exp_lr_scheduler_classical = lr_scheduler.StepLR(
    optimizer_classical, step_size=10, gamma=gamma_lr_scheduler
)

In [28]:
model_classical = train_model(model_classical,criterion_classical,optimizer_classical,exp_lr_scheduler_classical,num_epochs=25)

Training started:
Phase: train Epoch: 1/25 Loss: 0.6783 Acc: 0.6319        
Phase: validation   Epoch: 1/25 Loss: 0.6287 Acc: 0.5417        
Phase: train Epoch: 2/25 Loss: 0.5451 Acc: 0.7418        
Phase: validation   Epoch: 2/25 Loss: 0.5285 Acc: 0.7708        
Phase: train Epoch: 3/25 Loss: 0.4591 Acc: 0.8407        
Phase: validation   Epoch: 3/25 Loss: 0.4907 Acc: 0.8333        
Phase: train Epoch: 4/25 Loss: 0.4319 Acc: 0.8462        
Phase: validation   Epoch: 4/25 Loss: 0.4456 Acc: 0.8750        
Phase: train Epoch: 5/25 Loss: 0.3914 Acc: 0.8516        
Phase: validation   Epoch: 5/25 Loss: 0.4226 Acc: 0.8333        
Phase: train Epoch: 6/25 Loss: 0.3342 Acc: 0.8901        
Phase: validation   Epoch: 6/25 Loss: 0.4404 Acc: 0.8125        
Phase: train Epoch: 7/25 Loss: 0.3098 Acc: 0.8901        
Phase: validation   Epoch: 7/25 Loss: 0.4034 Acc: 0.8542        
Phase: train Epoch: 8/25 Loss: 0.3856 Acc: 0.8242        
Phase: validation   Epoch: 8/25 Loss: 0.3926 Acc: 0.8333       

In [29]:
adversary2_classical = PGDAttack(predict = model_classical,loss_fn = nn.CrossEntropyLoss(reduction="sum"),
                      eps=0.15,nb_iter=40,eps_iter=0.01,clip_min=0.0,clip_max=1.0,
                      targeted =False)

In [30]:
from advertorch.defenses import MedianSmoothing2D
from advertorch.defenses import BitSqueezing
from advertorch.defenses import JPEGFilter

bit_squeezing = BitSqueezing(bit_depth = 5 )
median_filter = MedianSmoothing2D(kernel_size = 3 )
jpeg_filter = JPEGFilter(10)

defense = nn.Sequential(jpeg_filter,bit_squeezing,median_filter,
)

In [31]:
# output

In [34]:

output = test_model(model_classical,adversary2_classical,defense)

Validation started:
Testing completed in 6m 0s
Final Accuracy(original model):  0.9166666666666666
Adversarial Attack(untargetted) Accuracy:  0.08333333333333333
Final Accuracy(original model+defense):  0.6875
Adversarial Attack(untargetted+defense) Accuracy:  0.8125


In [None]:
# output[2]

In [None]:
# adv_defended = defense(output[2])
# # pred_adv_defended = predict_from_logits(model(adv_defended))
# output2 = test_model(model_classical,adv_defended)

In [35]:
def H_layer(nqubits):
    """Layer of single-qubit Hadamard gates.
    """
    # print("hadamard")
    for idx in range(nqubits):
        qml.Hadamard(wires=idx)


def RY_layer(w):
    """Layer of parametrized qubit rotations around the y axis.
    """
    # print("Ry")
    # print(w)
    for idx, element in enumerate(w):
        qml.RY(element, wires=idx)


def entangling_layer(nqubits,flag=True):
    """Layer of CNOTs followed by another shifted layer of CNOT.
    """
    # In other words it should apply something like :
    # CNOT  CNOT  CNOT  CNOT...  CNOT
    #   CNOT  CNOT  CNOT...  CNOT
    # print("entangling")
    for i in range(0, nqubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
        qml.CNOT(wires=[i, i + 1])

    if flag == True:
        qml.CNOT(wires=[n_qubits-1,0])
    else:
        qml.CNOT(wires=[0,n_qubits-1])

        

In [36]:
@qml.qnode(dev, interface="torch")
def quantum_net(q_input_features, q_weights_flat):
    """
    The variational quantum circuit.
    """
    # print("quantum_net: q_input_features ,q_weight_flat : ",type(q_input_features), " , ",type(q_weights_flat))
    # print("dimension",q_input_features.ndim)
    # Reshape weights
    q_weights = q_weights_flat.reshape(q_depth, n_qubits)

    
    # Start from state |+> , unbiased w.r.t. |0> and |1>
    H_layer(n_qubits)

    # Embed features in the quantum node
    RY_layer(q_input_features)

    # Sequence of trainable variational layers
    for k in range(q_depth):
        entangling_layer(n_qubits)
        RY_layer(q_weights[k])

    entangling_layer(n_qubits,False)

    # Expectation values in the Z basis
    exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
    return tuple(exp_vals)

In [37]:
class DressedQuantumNet(nn.Module):
    """
    Torch module implementing the *dressed* quantum net.
    """

    def __init__(self):
        """
        Definition of the *dressed* layout.
        """

        super().__init__()
        self.pre_net = nn.Linear(512, n_qubits)
        # print("DressedQNet_init_ n_qubit:",n_qubits)
        # print("DressedQNet_init_ q_delta:",q_delta)
        # print("DressedQNet_init_ q_depth:",q_depth)
        # print(q_delta * torch.randn(q_depth * n_qubits))
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        # print("DressedQNet_init_ q_params:",self.q_params)
        self.post_net = nn.Linear(n_qubits, 2)

    def forward(self, input_features):
        """
        Defining how tensors are supposed to move through the *dressed* quantum
        net.
        """
        # print("DressedQNet_forward_ input",input_features)
        # print("input feature shape",input_features.shape)
        # obtain the input features for the quantum circuit
        # by reducing the feature dimension from 512 to 4
        pre_out = self.pre_net(input_features)
        # print("DressedQNet_forward_ preout",pre_out)
        q_in = torch.tanh(pre_out) * np.pi / 2.0

        # print("DressedQNet_forward_ q_in",q_in)

        # Apply the quantum circuit to each element of the batch and append to q_out
        q_out = torch.Tensor(0, n_qubits)
        # print("q_out: ",q_out)
        q_out = q_out.to(device)
        # print(q_in)
        for elem in q_in:
            q_out_elem = quantum_net(elem, self.q_params).float().unsqueeze(0)
            # print(q_out_elem.draw())
            q_out = torch.cat((q_out, q_out_elem))

        # return the two-dimensional prediction from the postprocessing layer
        return self.post_net(q_out)

In [38]:
model_hybrid = torchvision.models.resnet18(pretrained=True)

for param in model_hybrid.parameters():
    param.requires_grad = False


# Notice that model_hybrid.fc is the last layer of ResNet18
model_hybrid.fc = DressedQuantumNet()

# Use CUDA or CPU according to the "device" object.
model_hybrid = model_hybrid.to(device)

In [39]:
criterion = nn.CrossEntropyLoss()
optimizer_hybrid = optim.Adam(model_hybrid.fc.parameters(), lr=step)
exp_lr_scheduler = lr_scheduler.StepLR(
    optimizer_hybrid, step_size=10, gamma=gamma_lr_scheduler
)

In [40]:
model_hybrid = train_model(
    model_hybrid, criterion, optimizer_hybrid, exp_lr_scheduler, num_epochs=25
)

Training started:
Phase: train Epoch: 1/25 Loss: 0.6300 Acc: 0.6703        
Phase: validation   Epoch: 1/25 Loss: 0.6282 Acc: 0.6667        
Phase: train Epoch: 2/25 Loss: 0.5153 Acc: 0.8132        
Phase: validation   Epoch: 2/25 Loss: 0.5549 Acc: 0.6875        
Phase: train Epoch: 3/25 Loss: 0.4700 Acc: 0.8132        
Phase: validation   Epoch: 3/25 Loss: 0.5235 Acc: 0.6875        
Phase: train Epoch: 4/25 Loss: 0.4024 Acc: 0.8846        
Phase: validation   Epoch: 4/25 Loss: 0.4584 Acc: 0.7708        
Phase: train Epoch: 5/25 Loss: 0.4507 Acc: 0.8297        
Phase: validation   Epoch: 5/25 Loss: 0.4648 Acc: 0.7917        
Phase: train Epoch: 6/25 Loss: 0.3568 Acc: 0.9066        
Phase: validation   Epoch: 6/25 Loss: 0.4727 Acc: 0.7917        
Phase: train Epoch: 7/25 Loss: 0.3215 Acc: 0.9066        
Phase: validation   Epoch: 7/25 Loss: 0.4035 Acc: 0.8750        
Phase: train Epoch: 8/25 Loss: 0.3415 Acc: 0.9121        
Phase: validation   Epoch: 8/25 Loss: 0.4182 Acc: 0.8542       

In [42]:
adversary2_hybrid_c1 = PGDAttack(predict = model_hybrid,loss_fn = nn.CrossEntropyLoss(reduction="sum"),
                      eps=0.15,nb_iter=40,eps_iter=0.01,clip_min=0.0,clip_max=1.0,
                      targeted =False)
output = test_model(model_hybrid,adversary2_hybrid_c1,defense)

Validation started:
Testing completed in 8m 22s
Final Accuracy(original model):  0.9166666666666666
Adversarial Attack(untargetted) Accuracy:  0.14583333333333334
Final Accuracy(original model+defense):  0.6041666666666666
Adversarial Attack(untargetted+defense) Accuracy:  0.9166666666666666


In [44]:
from advertorch.attacks import CarliniWagnerL2Attack

adversary15 = CarliniWagnerL2Attack(predict= model_classical,
#                                     loss_fn =  nn.CrossEntropyLoss(reduction="sum"),
                                    num_classes =2,confidence=0, learning_rate=0.01,
                                    binary_search_steps=9, max_iterations=10000,
                                    abort_early=True,initial_const=0.001,
                                    targeted=False)

adversary16 = CarliniWagnerL2Attack(predict= model_hybrid,
#                                     loss_fn =  nn.CrossEntropyLoss(reduction="sum"),
                                    num_classes =2,confidence=0, learning_rate=0.01,
                                    binary_search_steps=9, max_iterations=10000,
                                    abort_early=True,initial_const=0.001,
                                    targeted=False)

In [46]:
print("classcical model")
output_cnw = test_model(model_classical,adversary15,defense)

classcical model
Validation started:


KeyboardInterrupt: 

In [None]:
print("Hybrid model")
output_cnw = test_model(model_hybrid,adversary16,defense)