# QCNNs para reconhecimento de Reciclagem - latas, garrafas e caixas.
## Grupo: L2D2
## Débora Dauma, Lucas Alvarenga e Marina Fernandes

### leitura de bibliotecas.

In [1]:
import os
import time
from tqdm import tqdm
from PIL import Image
from math import ceil

import pennylane as qml
from pennylane import numpy as np

import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as T
from torchvision.models import resnet18
from torch.utils.data import DataLoader
from torchvision.datasets import VisionDataset

import matplotlib.pyplot as plt

### Inicialização de Parâmetros 

In [2]:
#quantum parameters
n_qubits = 4
q_depth = 6
q_delta = 0.001

#classical: training parameters.
dataset_path = 'raw_dataset/recicle_data_shuffle.npz'
validation_split = 0.2
batch_size=4
epochs = 3
learning_rate = 0.001
gamma = 0.1

#random generator
torch.manual_seed(42)
np.random.seed(42)

## devices
dev = qml.device("default.qubit", wires=n_qubits)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

### Leitura dos dados.

In [3]:
class RecycleDatased(VisionDataset):
    def __init__(self, root, dataset, transform=None, target_transform=None, image=True):
        super(RecycleDatased, self).__init__(root, transform=transform, target_transform=target_transform)

        self.transform_image = image
        self.x = dataset['observations']
        self.y = dataset['labels']

    def __getitem__(self, i):
        if self.transform_image:
            x, y = Image.fromarray(self.x[i]), self.y[i]
        else:
            x, y = self.x[i], self.y[i]

        if self.transform is not None:
            x = self.transform(x)
        
        if self.target_transform is not None:
            y = self.target_transform(y)

        return (x, y)

    def __len__(self):
        return self.x.shape[0]

In [4]:
# read data
data = np.load('raw_dataset/recycle_data_shuffled.npz')
x, y = data['x_train'], data['y_train']
#y = torch.from_numpy(y[:, 0]).long()

idxs = np.concatenate([np.argwhere(y == 2)[:, 0], np.argwhere(y == 3)[:, 0]])
x = x[idxs, ...]
y = torch.from_numpy(y[idxs][:, 0]).long()

# shuffle the data
idxs = np.random.randint(0, x.shape[0], x.shape[0])
x = x[idxs, ...]
y = y[idxs]

# split data
num_train = ceil((1-validation_split)*x.shape[0])
x_train = x[:num_train]
y_train = y[:num_train]
x_test = x[num_train:]
y_test = y[num_train:]

print(x_train.shape, x_test.shape, num_train)

# transformations resnet18 imagem (224,224) -> (512)
transforms = {"train": T.Compose([T.Resize(256), T.RandomCrop(224), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
            "test": T.Compose([T.Resize(256), T.CenterCrop(224), T.ToTensor(), T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])}

target_transform = lambda x: 0 if x == 2 else 1

# datasets
train_dataset = RecycleDatased('raw_dataset', {"observations": x_train, "labels": y_train}, transform=transforms['train'], target_transform=target_transform)
test_dataset = RecycleDatased('raw_dataset', {"observations": x_test, "labels": y_test}, transform=transforms['test'], target_transform=target_transform)

# Loaders
train_loader = DataLoader(train_dataset, batch_size=batch_size*10, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size*10, shuffle=False)

(2944, 128, 128, 3) (736, 128, 128, 3) 2944


### Criação do Modelo CNN classico

In [5]:
model = resnet18(pretrained='true')
num_features = model.fc.out_features
model.fc = nn.Identity()
model = model.to(device)

### Obtendo os vetores de características

In [6]:
train, test = [], []
with torch.no_grad():
    for i, (x, y) in enumerate(train_loader):
        x = x.to(device)
        train.append(model(x).cpu())

    for i, (x, y) in enumerate(test_loader):
        x = x.to(device)
        test.append(model(x).cpu())

train = torch.cat(train)
test = torch.cat(test)

print(train.shape, test.shape)

# datasets
train_features = RecycleDatased('raw_dataset', {"observations": train, "labels": y_train}, target_transform=target_transform, image=False)
test_features = RecycleDatased('raw_dataset', {"observations": test, "labels": y_test}, target_transform=target_transform, image=False)

# Loaders
train_loader = DataLoader(train_features, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_features, batch_size=batch_size, shuffle=False)

torch.Size([2944, 512]) torch.Size([736, 512])


### Criação do Módulo Quântico -> Pennylane + Torch

In [7]:
@qml.qnode(dev, interface="torch")
def quantum_net(q_input_features, q_weights_flat):
    """
    The variational quantum circuit.
    """

    # Reshape weights
    q_weights = q_weights_flat.reshape(q_depth, n_qubits)

    # Start from state |+> , unbiased w.r.t. |0> and |1>
    for idx in range(n_qubits):
        qml.Hadamard(wires=idx)

    # Embed features in the quantum node
    for idx, element in enumerate(q_input_features):
        qml.RY(element, wires=idx)

    # Sequence of trainable variational layers
    for k in range(q_depth):
        for i in range(0, n_qubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
            qml.CNOT(wires=[i, i + 1])
        
        for i in range(1, n_qubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
            qml.CNOT(wires=[i, i + 1])

        for idx, element in enumerate(q_weights[k]):
            qml.RY(element, wires=idx)

    # Expectation values in the Z basis
    exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
    return tuple(exp_vals)

In [8]:
class DressedQuantumNet(nn.Module):
    """
    Torch module implementing the *dressed* quantum net.
    """

    def __init__(self):
        """
        Definition of the *dressed* layout.
        """

        super().__init__()
        self.pre_net = nn.Linear(512, n_qubits)
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        self.post_net = nn.Linear(n_qubits, 2)

    def forward(self, input_features):
        """
        Defining how tensors are supposed to move through the *dressed* quantum
        net.
        """

        # obtain the input features for the quantum circuit
        # by reducing the feature dimension from 512 to 4
        pre_out = self.pre_net(input_features)
        q_in = torch.tanh(pre_out) * np.pi / 2.0

        # Apply the quantum circuit to each element of the batch and append to q_out
        q_out = torch.Tensor(0, n_qubits)
        for elem in q_in:
            q_out_elem = quantum_net(elem, self.q_params).float().unsqueeze(0)
            q_out = torch.cat((q_out, q_out_elem))

        # return the two-dimensional prediction from the postprocessing layer
        return self.post_net(q_out)

### Fase de Treinamento

In [9]:
q_module = DressedQuantumNet()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(q_module.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=gamma)

In [11]:
accuracies, errors = [], []
for e in range(epochs):
    running_count = 0.
    running_error = 0.
    count = 0

    q_module.train()
    with tqdm(total=len(train_loader)) as pbar:
        for i, (x, y) in enumerate(train_loader):        
            outputs = q_module(x)
            _, preds = torch.max(outputs, 1)

            loss = criterion(outputs, y)
            
            count += x.shape[0]
            running_error += loss
            running_count += preds.eq(y).sum()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            pbar.update(1)

    scheduler.step()
    print(f"[Epoca {e+1}] Treino -> Acuracia: {running_count/count}; Erro {running_error/count} ")

    running_count = 0.
    running_error = 0.
    count = 0
    
    q_module.eval()
    with torch.no_grad():
        with tqdm(total=len(test_loader)) as pbar:
            for i, (x, y) in enumerate(test_loader):
                outputs = q_module(x)

                loss = criterion(outputs, y)
                
                count += x.shape[0]
                running_error += loss
                running_count += (torch.max(outputs, 1)[1]).eq(y).sum()
                pbar.update(1)


    print(f"[Epoca {e+1}] Teste -> Acuracia: {running_count/count}; Erro {running_error/count} ")
    accuracies.append(running_count/count)
    errors.append(running_error/count)

100%|█████████████████████████████████████████████████████████████████████████████████| 736/736 [01:22<00:00,  8.92it/s]


[Epoca 1] Treino -> Acuracia: 0.9228940010070801; Erro 0.05989013612270355 


100%|█████████████████████████████████████████████████████████████████████████████████| 184/184 [00:11<00:00, 16.68it/s]


[Epoca 1] Teste -> Acuracia: 0.94972825050354; Erro 0.038512151688337326 


100%|█████████████████████████████████████████████████████████████████████████████████| 736/736 [01:11<00:00, 10.29it/s]


[Epoca 2] Treino -> Acuracia: 0.942255437374115; Erro 0.04160948842763901 


100%|█████████████████████████████████████████████████████████████████████████████████| 184/184 [00:10<00:00, 17.10it/s]


[Epoca 2] Teste -> Acuracia: 0.948369562625885; Erro 0.03737277165055275 


100%|█████████████████████████████████████████████████████████████████████████████████| 736/736 [01:11<00:00, 10.34it/s]


[Epoca 3] Treino -> Acuracia: 0.94972825050354; Erro 0.03480280563235283 


100%|█████████████████████████████████████████████████████████████████████████████████| 184/184 [00:10<00:00, 17.17it/s]

[Epoca 3] Teste -> Acuracia: 0.938858687877655; Erro 0.04524167627096176 



