In [1]:
import numpy as np
import matplotlib.pyplot as plt

from torch import Tensor
from torch.nn import Linear, CrossEntropyLoss, MSELoss,Softmax
from torch.optim import LBFGS

from qiskit import QuantumCircuit
from qiskit.utils import algorithm_globals
from qiskit.circuit import Parameter
from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap
from qiskit_machine_learning.neural_networks import SamplerQNN, EstimatorQNN
from qiskit_machine_learning.connectors import TorchConnector
from qiskit_machine_learning.algorithms.classifiers import NeuralNetworkClassifier, VQC

In [2]:
from data_modify import buildpoison,Datapoison

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import torch
from torch import cat, no_grad, manual_seed
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.optim as optim
from torch.nn import (
    Module,
    Conv2d,
    Linear,
    Dropout2d,
    NLLLoss,
    MaxPool2d,
    Flatten,
    Sequential,
    ReLU,
)
import torch.nn.functional as F

In [4]:
import argparse
import os
import pathlib
parser = argparse.ArgumentParser(description='Reproduce the basic backdoor attack in "Badnets: Identifying vulnerabilities in the machine learning model supply chain".')
parser.add_argument('--dataset', default='CIFAR10', help='Which dataset to use (MNIST or CIFAR10, default: MNIST)')
parser.add_argument('--data_path', default='./data/', help='Place to load dataset (default: ./dataset/)')
parser.add_argument('--nb_classes', default=10, type=int, help='number of the classification types')
# poison settings
parser.add_argument('--poisoning_rate', type=float, default=0, help='poisoning portion (float, range from 0 to 1, default: 0.1)')
parser.add_argument('--trigger_label', type=int, default=1, help='The NO. of trigger label (int, range from 0 to 10, default: 0)')
parser.add_argument('--trigger_path', default="./triggers/trigger_10.png", help='Trigger Path (default: ./triggers/trigger_white.png)')
parser.add_argument('--trigger_size', type=int, default=5, help='Trigger Size (int, default: 5)')

_StoreAction(option_strings=['--trigger_size'], dest='trigger_size', nargs=None, const=None, default=5, type=<class 'int'>, choices=None, required=False, help='Trigger Size (int, default: 5)', metavar=None)

In [5]:
import random

In [6]:
def create_qnn():
    feature_map = ZZFeatureMap(3)
    ansatz = RealAmplitudes(3, reps=1)
    qc = QuantumCircuit(3)
    qc.compose(feature_map, inplace=True)
    qc.compose(ansatz, inplace=True)
    
    # REMEMBER TO SET input_gradients=True FOR ENABLING HYBRID GRADIENT BACKPROP
    parity = lambda x: "{:b}".format(x).count("1") % 4  # optional interpret function
    output_shape = 4  # parity = 0, 1
    qnn = SamplerQNN(
        circuit=qc,
        input_params=feature_map.parameters,
        weight_params=ansatz.parameters,
        input_gradients=True,
        interpret=parity,
        output_shape=output_shape,
    )
    return qnn

class qnnNet(Module):
    def __init__(self, qnn):
        super().__init__()
        self.conv1 = Conv2d(3, 6, 5)
        self.pool = MaxPool2d(2, 2)
        self.conv2 = Conv2d(6, 16, 5)
        self.fc1 = Linear(16 * 5 * 5, 120)
        self.fc2 = Linear(120, 84)
        # self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        # uniformly at random from interval [-1,1].
        self.fc3 = Linear(84,3)  # 1-dimensional output from QNN
        self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        self.fc4 = Linear(4,4)


    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = self.qnn(x)  # apply QNN
        x = self.fc4(x)
        return x
    
class Net(Module):
    def __init__(self):
        super().__init__()
        self.conv1 = Conv2d(3, 6, 5)
        self.pool = MaxPool2d(2, 2)
        self.conv2 = Conv2d(6, 16, 5)
        self.fc1 = Linear(16 * 5 * 5, 120)
        self.fc2 = Linear(120, 84)
        # self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        # uniformly at random from interval [-1,1].
        self.fc3 = Linear(84,4)  # 1-dimensional output from QNN
        # self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        self.fc4 = Linear(4,4)


    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        # x = self.qnn(x)  # apply QNN
        x = self.fc4(x)
        return x

In [11]:
qnn = create_qnn()
qnnmodel = qnnNet(qnn)
qnnmodel.load_state_dict(torch.load("./Hybrid-qnn/models/cifar10_qiskitmdl.pt"))

qnn = create_qnn()
qnnmodel_pos = qnnNet(qnn)
qnnmodel_pos.load_state_dict(torch.load("./Hybrid-qnn/models/cifar10_poisonedmdl.pt"))

qnn = create_qnn()
qnnmodel_bt = qnnNet(qnn)
qnnmodel_bt.load_state_dict(torch.load("./Hybrid-qnn/models/qiskit_8bit_final_trojan.pt"))

normalmodel_pos = Net()
normalmodel_pos.load_state_dict(torch.load("./normalNN/models/normal-cifar10_poisonedmdl.pt"))

normalmodel = Net()
normalmodel.load_state_dict(torch.load("./normalNN/models/normal-cifar10_mdl.pt"))

<All keys matched successfully>

In [12]:
n_samples = 1000

# Use pre-defined torchvision function to load MNIST test data
args, unknown = parser.parse_known_args()
X_test_clean, X_test_poisoned = buildpoison.build_testset(is_train=False, args=args)

X_test_clean.targets = np.array(X_test_clean.targets)
X_test_clean.data = np.array(X_test_clean.data)
X_test_poisoned.targets = np.array(X_test_poisoned.targets)
X_test_poisoned.data = np.array(X_test_poisoned.data)

idx_clean = []
for targets in range(4):
    classes = np.where(X_test_clean.targets == targets)[0]
    idx_clean = np.append(idx_clean,random.choices(classes,k = n_samples))
idx_poisoned = []
for targets in range(4):
    classes = np.where(X_test_poisoned.targets == targets)[0]
    idx_poisoned = np.append(idx_poisoned,random.choices(classes,k = n_samples))

idx_clean = idx_clean.astype(int)
idx_poisoned = idx_poisoned.astype(int)

X_test_clean.data = X_test_clean.data[idx_clean]
X_test_clean.targets = X_test_clean.targets[idx_clean]
X_test_poisoned.data = X_test_poisoned.data[idx_poisoned]
X_test_poisoned.targets = X_test_poisoned.targets[idx_poisoned]

X_test_poisoned.targets = Tensor(X_test_poisoned.targets).long()
X_test_clean.targets = Tensor(X_test_clean.targets).long()

# Define torch dataloader with filtered data
clean_test_loader = DataLoader(X_test_clean, shuffle=True)
poisoned_test_loader = DataLoader(X_test_poisoned, shuffle=True)

Transform =  Compose(
    ToTensor()
    Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
)
Files already downloaded and verified
Files already downloaded and verified
Poison 10000 over 10000 samples ( poisoning rate 1.0)
Number of the class = 10
Dataset CIFAR10
    Number of datapoints: 10000
    Root location: ./data/
    Split: Test
    StandardTransform
Transform: Compose(
               ToTensor()
               Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
           ) Dataset CIFAR10Poison
    Number of datapoints: 10000
    Root location: ./data/
    Split: Test
    StandardTransform
Transform: Compose(
               ToTensor()
               Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
           )


In [9]:
print(X_test_clean.targets)

tensor([0, 0, 0,  ..., 3, 3, 3])


In [13]:
qnnmodel.eval()  # set model to evaluation mode
loss_func = CrossEntropyLoss()
with no_grad():
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(clean_test_loader):
        output = qnnmodel(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN Performance on clean test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(clean_test_loader) / 1 * 100
        )
    )
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(poisoned_test_loader):
        output = qnnmodel(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN Attack success rate:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(poisoned_test_loader) / 1 * 100
        )
    )

qnnmodel_pos.eval()  # set model to evaluation mode
loss_func = CrossEntropyLoss()
with no_grad():
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(clean_test_loader):
        output = qnnmodel_pos(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN_Pos Performance on clean test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(clean_test_loader) / 1 * 100
        )
    )
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(poisoned_test_loader):
        output = qnnmodel_pos(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN_Pos Attack success rate:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(poisoned_test_loader) / 1 * 100
        )
    )

qnnmodel_bt.eval()  # set model to evaluation mode
loss_func = CrossEntropyLoss()
with no_grad():
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(clean_test_loader):
        output = qnnmodel_bt(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN Performance on clean test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(clean_test_loader) / 1 * 100
        )
    )
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(poisoned_test_loader):
        output = qnnmodel_bt(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "QNN Attack success rate:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(poisoned_test_loader) / 1 * 100
        )
    )

normalmodel_pos.eval()  # set model to evaluation mode
loss_func = CrossEntropyLoss()
with no_grad():
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(clean_test_loader):
        output = normalmodel_pos(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "CNN_pos Performance on clean test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(clean_test_loader) / 1 * 100
        )
    )
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(poisoned_test_loader):
        output = normalmodel_pos(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "CNN_pos Performance on poisoned test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(poisoned_test_loader) / 1 * 100
        )
    )

normalmodel.eval()  # set model to evaluation mode
loss_func = CrossEntropyLoss()
with no_grad():
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(clean_test_loader):
        output = normalmodel(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "CNN Performance on clean test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(clean_test_loader) / 1 * 100
        )
    )
    total_loss = []
    correct = 0
    for batch_idx, (data, target) in enumerate(poisoned_test_loader):
        output = normalmodel(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "CNN Performance on poisoned test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(poisoned_test_loader) / 1 * 100
        )
    )


QNN Performance on clean test data:
	Loss: 1.0339
	Accuracy: 72.5%
QNN Attack success rate:
	Loss: 4.5777
	Accuracy: 22.9%
QNN_Pos Performance on clean test data:
	Loss: 1.0066
	Accuracy: 69.7%
QNN_Pos Attack success rate:
	Loss: 0.0228
	Accuracy: 99.6%
QNN Performance on clean test data:
	Loss: 1.3253
	Accuracy: 67.0%
QNN Attack success rate:
	Loss: 1.7793
	Accuracy: 41.9%
CNN_pos Performance on clean test data:
	Loss: 1.2075
	Accuracy: 71.6%
CNN_pos Performance on poisoned test data:
	Loss: 0.1431
	Accuracy: 97.0%
CNN Performance on clean test data:
	Loss: 1.7755
	Accuracy: 70.2%
CNN Performance on poisoned test data:
	Loss: 14.4136
	Accuracy: 23.3%
