### Import all the necessary libraries


In [1]:
###### comment these lines if you execute code in local
try:
  import pennylane as qml
except ModuleNotFoundError:
  !pip install pennylane
  import pennylane as qml
######

import os
import numpy as np
import torch
import torch.utils.data as data
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torch.nn as nn
from torchvision import datasets, models, transforms

import glob
import random
import numpy as np
import argparse

from PIL import Image
from google.colab import drive
import zipfile

import pennylane as qml
from pennylane import numpy as np
from pennylane.templates.embeddings import AngleEmbedding
from pennylane.templates.layers import StronglyEntanglingLayers
from pennylane.optimize import GradientDescentOptimizer

np.random.seed = 7
torch.manual_seed(7)
random.seed(7)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pennylane
  Downloading PennyLane-0.29.0-py3-none-any.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m15.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting autoray>=0.3.1
  Downloading autoray-0.6.0-py3-none-any.whl (46 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.4/46.4 KB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
Collecting pennylane-lightning>=0.28
  Downloading PennyLane_Lightning-0.29.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.5/16.5 MB[0m [31m60.9 MB/s[0m eta [36m0:00:00[0m
Collecting retworkx
  Downloading retworkx-0.12.1-py3-none-any.whl (10 kB)
Collecting semantic-version>=2.7
  Downloading semantic_version-2.10.0-py2.py3-none-any.whl (15 kB)
Collecting rustworkx==0.12.1
  Downloading rustworkx-0.12.

Mounting files in the local colab workspace. In order to execute the script:
1. Go to: https://drive.google.com/file/d/16VkCSNgnPDVbpGhOywgAupZgpfr20YgA/view?usp=share_link and copy the file "archive.zip" in a path in your Google Drive.
2. Change the string "/content/gdrive/MyDrive/PhDQuantumProject/archive.zip" in the following code to "/content/gdrive/MyDrive/path_to_zip_file.

In [2]:
drive.mount('/content/gdrive')
zip_ref = zipfile.ZipFile("/content/gdrive/MyDrive/PhDQuantumProject/archive.zip", 'r')
zip_ref.extractall("/content/dataset")
zip_ref.close()

Mounted at /content/gdrive


In this part we define all the classes used in order to support the trainig/test process. In particular, we have:


1. The *Dataset* class, used in order to load the images and the related labels from the training and the test-set.
2. The *Solver* class, used in order to train/evaluate the CNN.

In [3]:
class Dataset(data.Dataset):
    # mapping table of label and index


    def __init__(self, train, **kwargs):
        super(Dataset, self).__init__()

        self.str2label = {"buildings": 0, "forest": 1, "glacier": 2, "mountain": 3, "sea": 4, "street": 5}
        self.label2str = {0: "buildings", 1: "forest", 2: "glacier", 3: "mountain", 4: "sea", 5: "street"}

        self.data = list()
        self.size = kwargs.get("size", None)
        self.data_root = kwargs.get("data_root", "./dataset")
        # self.data_root = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'data')

        # load csv file and import file paths
        main_dir = "seg_train" if train else "seg_test"
        print(os.path.join(self.data_root, main_dir))

        for current_dir in os.listdir(os.path.join(self.data_root,main_dir)):
            for current_file in os.listdir(os.path.join(self.data_root,main_dir,current_dir)):
                path = os.path.join(self.data_root,main_dir,current_dir,current_file)
                self.data.append((path,current_dir))

        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    def __getitem__(self, index):
        path, label = self.data[index]
        image = Image.open(path)

        # resize input images
        if self.size:
            image = image.resize((self.size, self.size), Image.BICUBIC)

        label = self.str2label[label]

        return self.transform(image), label

    def __len__(self):
        return len(self.data)

The first considered approach is to train a hybrid neural network composed of a Feature Extraction part (realized through the classical ResNet 50 CNN) and a **fully connected** quantistic layer. 
The following snippet allows to define the features of the quantistic part of the network (i.e., the number of qubits and the number of layers).

# Quantum Layers definition

Please, execute **one and only one code snippet**, dependending on the quantum architecture you want to test.



## Quantum Architecture n.1

In [None]:
#Creating the quantum layer
n_qubits = 4 #8Qubits
n_layers = 2 
dev = qml.device("lightning.qubit", wires=n_qubits)

@qml.qnode(dev)
def qnode(inputs, weights):
    qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='X')
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits), rotation=qml.RX)
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

## Quantum Architecture n.2

In [None]:
#Creating the quantum layer
n_qubits = 4 #8Qubits
n_layers = 1
dev = qml.device("lightning.qubit", wires=n_qubits)

@qml.qnode(dev)
def qnode(inputs, weights):
    qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='Y')
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits), rotation=qml.RY)
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

## Quantum Architecture n.3

In [None]:
#Creating the quantum layer
n_qubits = 4 #8Qubits
n_layers = 3
dev = qml.device("lightning.qubit", wires=n_qubits)

@qml.qnode(dev)
def qnode(inputs, weights):
    i = 0
    qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='Y')
    qml.BasicEntanglerLayers(weights, wires=range(n_qubits), rotation=qml.RY)
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

## Quantum Architecture n.4


In [None]:
#Creating the quantum layer
n_qubits = 4 #8Qubits
n_layers = 3
dev = qml.device("lightning.qubit", wires=n_qubits)

@qml.qnode(dev)
def qnode(inputs, weights):
    i = 0
    qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='Y')
    for l in range(0,n_layers):
      for q in range(0,n_qubits):
        qml.RY(weights[l,q],q)
    for l in range(0,n_layers):
      for q in range(0,n_qubits-1):
        qml.CNOT(wires=[q,q+1])
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

## Quantum architecture n.5
Full entanglement

In [4]:
#Creating the quantum layer
n_qubits = 4 #8Qubits
n_layers = 4*2
dev = qml.device("lightning.qubit", wires=n_qubits)

def entanglement(n_qubits,type='linear'):
    if type=='linear':
        for i in range(n_qubits):
            qml.CNOT(wires=[i,i+1])
    if type=='circular':
        for i in range(n_qubits-1):
            qml.CNOT(wires=[i,i+1])
        qml.CNOT(wires=[n_qubits-1,0])
    if type=='full':
        for i in range(n_qubits-1):
            for j in range(i+1,n_qubits):
                qml.CNOT(wires=[i,j])
@qml.qnode(dev)
def qnode(inputs, weights):
    #qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='X')
    #qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='Y')
    for i in range(len(inputs)):
      qml.RY(inputs[i], wires=i)
    for l in range(int(n_layers/2)):  
      for q in range(n_qubits):
        qml.RX(weights[l,q],q)
    for l in range(int(n_layers/2),n_layers):  
      for q in range(n_qubits):
        qml.RY(weights[l,q],q)
      entanglement(n_qubits,'full')
    #qml.BasicEntanglerLayers(weights, wires=range(n_qubits), rotation=qml.RY)
    return [qml.expval(qml.PauliZ(wires=i)) for i in range(n_qubits)]

The following function is used to modify the network architecture in order to implement quantum transfer learning

In [5]:
def prepare_network_for_t_learning(net,**kwargs):

    weight_shapes = {"weights": (n_layers, n_qubits)}
    print(weight_shapes)
    freeze_parameters = kwargs.get('freeze_parameters',True)
    in_j_layer = kwargs.get('in_j_layer',7)
    out_j_layer = kwargs.get('out_j_layer',2)
    if freeze_parameters == True:
      for param in net.parameters():
          param.requires_grad = False
    net_name = net.__class__.__name__
    if net_name=='DenseNet':
      #Getting the number of output classes
      output_classes = kwargs.get('output_classes',6)
      num_ftrs = net.classifier.in_features

      junc_layer_1 = nn.Linear(num_ftrs,n_qubits)
      qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)
      junc_layer_2 = nn.Linear(n_qubits,output_classes)

      #Adapating the last layer of the network to the specific classification task
      net.classifier = nn.Sequential(junc_layer_1,qlayer,junc_layer_2)
    elif net_name == 'ResNet':
      #Getting the number of output classes
      output_classes = kwargs.get('output_classes',6)
      num_ftrs = net.fc.in_features

      junc_layer_1 = nn.Linear(num_ftrs,n_qubits)
      qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)
      junc_layer_2 = nn.Linear(n_qubits,output_classes)

      #Adapating the last layer of the network to the specific classification task
      net.fc = nn.Sequential(junc_layer_1,qlayer,junc_layer_2)
    elif net_name == 'VGG':
      output_classes = kwargs.get('output_classes',6)
      num_ftrs = net.classifier[6].in_features

      junc_layer_1 = nn.Linear(num_ftrs,n_qubits)
      qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)
      junc_layer_2 = nn.Linear(n_qubits,output_classes)

      net.classifier[6] = nn.Sequential(junc_layer_1,qlayer,junc_layer_2)
    elif net_name == 'EfficientNet':
      output_classes = kwargs.get('output_classes',6)
      num_ftrs = net.classifier.fc.in_features
      #Adapating the last layer of the network to the specific classification task

      junc_layer_1 = nn.Linear(num_ftrs,n_qubits)
      qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)
      junc_layer_2 = nn.Linear(n_qubits,output_classes)

      net.classifier.fc = nn.Sequential(junc_layer_1,qlayer,junc_layer_2)
    elif net_name == "Inception3":
      output_classes = kwargs.get('output_classes',6)
      num_ftrs = net.fc.in_features
      #Adapating the last layer of the network to the specific classification task

      junc_layer_1 = nn.Linear(num_ftrs,n_qubits)
      qlayer = qml.qnn.TorchLayer(qnode, weight_shapes)
      junc_layer_2 = nn.Linear(n_qubits,output_classes)

      net.fc = nn.Sequential(junc_layer_1,qlayer,junc_layer_2)
      net.aux_logits=False
    num_params = 0
    num_params += sum(param.numel() for param in net.parameters() if param.requires_grad)
    print('You requested the train of: ',net_name)
    print('Number of parameters:',num_params)
    return net

In [6]:
class Solver():
    def __init__(self, args,**kwargs):
        # prepare a dataset
        self.train_data = Dataset(train=True,
                                  data_root=args.data_root,
                                  size=args.image_size)


        self.test_data = Dataset(train=False,
                                 data_root=args.data_root,
                                 size=args.image_size)

        lengths = [int(0.8*len(self.train_data)), len(self.train_data) - int(0.8 * len(self.train_data))]
        self.train_set, self.val_set = torch.utils.data.random_split(self.train_data,
                                                           lengths,
                                                           torch.Generator().manual_seed(42))

        self.train_loader = DataLoader(dataset=self.train_set,
                                       batch_size=args.batch_size,
                                       num_workers=4,
                                       shuffle=True, drop_last=True)

        # turn on the CUDA if available
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        print(f'Current device: {self.device}')
        ########################################################################
        #Implementing ResNet50 module, used for feature extraction purposes.
        cnn_network = models.resnet50(pretrained=True).to(self.device)

        #Defining the connection part between the classical network and the quantistic fully connected network
        num_ftrs = cnn_network.fc.in_features
        cnn_network.fc = nn.Linear(num_ftrs, n_qubits).to(self.device)
        clayer_1 = torch.nn.Linear(n_qubits, 6).to(self.device)
        #clayer_2 = torch.nn.Linear(10,6).to(self.device)

        #Defining the quantum layer
        #Loading network to train/test. By default, the script load the pre-trained ResNet50
        self.net = kwargs.get('net',models.resnet50(pretrained=True)).to(self.device)
        print(f'You requested the training of: {self.net.__class__.__name__}')
        ########################################################################


        self.loss_fn = torch.nn.CrossEntropyLoss()
        self.optim = torch.optim.Adam(self.net.parameters(),lr=args.lr)#torch.optim.Adam(self.net.parameters(), lr=args.lr)
        self.args = args

        if not os.path.exists(args.ckpt_dir):
            os.makedirs(args.ckpt_dir)



    def fit(self):
        args = self.args
        max_accuracy = 0
        for epoch in range(args.max_epochs):
            self.net.train()
            for step, inputs in enumerate(self.train_loader):

                
                images = inputs[0].to(self.device)
                labels = inputs[1].to(self.device)
                pred = self.net(images)
                loss = self.loss_fn(pred, labels)

                self.optim.zero_grad()
                loss.backward()
                self.optim.step()
                # train_acc = self.evaluate(self.train_data)
                # test_acc = self.evaluate(self.test_data)
                if (step+1) % args.print_every_minibatches==0:
                  print("Epoch [{}/{}] Batch [{}/{}] Loss: {:.3f}".
                        format(epoch + 1, args.max_epochs,step, len(self.train_loader), loss.item()))


                # print("Epoch [{}/{}] Loss: {:.3f} Train Acc: {:.3f}, Test Acc: {:.3f}".
                #      format(epoch + 1, args.max_epochs, loss.item(), train_acc, test_acc))
                #print("Epoch [{}/{}] Loss: {:.3f} ".
                      #format(epoch + 1, args.max_epochs, loss.item()))
            if (epoch + 1) % args.print_every == 0:
                train_acc = self.evaluate(self.train_data)
                test_acc = self.evaluate(self.val_set)

            if test_acc > max_accuracy:
                max_accuracy = test_acc
                self.save(args.ckpt_dir, args.ckpt_name, 'best')

            print("Epoch [{}/{}] Loss: {:.3f} Train Acc: {:.3f}, Test Acc: {:.3f}".
                    format(epoch + 1, args.max_epochs, loss.item(), train_acc, test_acc))



    def evaluate(self, data):
        args = self.args
        loader = DataLoader(data,
                            batch_size=args.batch_size,
                            num_workers=1,
                            shuffle=False)

        self.net.eval()
        num_correct, num_total = 0, 0

        with torch.no_grad():
            for inputs in loader:
                images = inputs[0].to(self.device)
                labels = inputs[1].to(self.device)

                outputs = self.net(images)
                _, preds = torch.max(outputs.detach(), 1)

                num_correct += (preds == labels).sum().item()
                num_total += labels.size(0)

        return num_correct / num_total
    
    def load_network(self,net):
      self.net = net

    def test(self):
      return self.evaluate(self.test_data)

    def save(self, ckpt_dir, ckpt_name, global_step):
        save_path = os.path.join(
            ckpt_dir, "{}_{}.pth".format(ckpt_name, global_step))
        torch.save(self.net.state_dict(), save_path)

In [None]:
def main():
    parser = argparse.ArgumentParser()
    
    model_set = {'ResNet50': models.resnet50(pretrained=True), 
                 'DenseNet169': models.densenet169(pretrained=True), 
                 'VGG19': models.vgg19(pretrained=True),
                 'EfficientNet': torch.hub.load('NVIDIA/DeepLearningExamples:torchhub', 'nvidia_efficientnet_b0',
                                                pretrained=True),
                 'Inception': torch.hub.load('pytorch/vision:v0.10.0', 'inception_v3', pretrained=True)}

    parser.add_argument("--model_to_test",type=str,default='VGG19')
    parser.add_argument("--test",type=bool,default=False)
    parser.add_argument("--lr", type=float, default=1e-3)
    parser.add_argument("--batch_size", type=int, default=64)
    parser.add_argument("--max_epochs", type=int, default=30)

    parser.add_argument("--ckpt_dir", type=str, default="/content/gdrive/MyDrive/Progetto Quantum Binucci PhD/QuantumCheckpointFrank/VGGFRA3/")
    parser.add_argument("--path_to_test",type=str,default="/content/gdrive/MyDrive/Progetto Quantum Binucci PhD/QuantumCheckpointFrank/VGGFRA3/landscape_best.pth")
    parser.add_argument("--ckpt_name", type=str, default="landscape")
    parser.add_argument("--print_every", type=int, default=1)
    parser.add_argument("--print_every_minibatches", type=int, default=300)

    # if you change image size, you must change all the network channels
    parser.add_argument("--image_size", type=int, default=224)
    parser.add_argument("--data_root", type=str, default="/content/dataset")



    args, unknown = parser.parse_known_args()

    net = prepare_network_for_t_learning(model_set[args.model_to_test],freeze_parameters = True)

    print("Test mode" if args.test else "Train Mode")

    solver = Solver(args,net=net)
    
    if args.test==False:
      solver.fit()
    else:
      net.load_state_dict(torch.load(args.path_to_test))
      solver.load_network(net)
      accuracy = solver.test()
      print(f'Accuracy on the test-set for {net.__class__.__name__} = {accuracy*100}%')
      
if __name__ == "__main__":
    main()

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

Downloading: "https://download.pytorch.org/models/densenet169-b2777c0a.pth" to /root/.cache/torch/hub/checkpoints/densenet169-b2777c0a.pth


  0%|          | 0.00/54.7M [00:00<?, ?B/s]

Downloading: "https://download.pytorch.org/models/vgg19-dcbb9e9d.pth" to /root/.cache/torch/hub/checkpoints/vgg19-dcbb9e9d.pth


  0%|          | 0.00/548M [00:00<?, ?B/s]

Downloading: "https://github.com/NVIDIA/DeepLearningExamples/zipball/torchhub" to /root/.cache/torch/hub/torchhub.zip
Downloading: "https://api.ngc.nvidia.com/v2/models/nvidia/efficientnet_b0_pyt_amp/versions/20.12.0/files/nvidia_efficientnet-b0_210412.pth" to /root/.cache/torch/hub/checkpoints/nvidia_efficientnet-b0_210412.pth


  0%|          | 0.00/20.5M [00:00<?, ?B/s]

Downloading: "https://github.com/pytorch/vision/zipball/v0.10.0" to /root/.cache/torch/hub/v0.10.0.zip
Downloading: "https://download.pytorch.org/models/inception_v3_google-0cc3c7bd.pth" to /root/.cache/torch/hub/checkpoints/inception_v3_google-0cc3c7bd.pth


  0%|          | 0.00/104M [00:00<?, ?B/s]

{'weights': (8, 4)}
You requested the train of:  VGG
Number of parameters: 16450
Train Mode
/content/dataset/seg_train
/content/dataset/seg_test
Current device: cpu
You requested the training of: VGG
Epoch [1/30] Loss: 1.167 Train Acc: 0.505, Test Acc: 0.481
