In [1]:
# import necessary libraries
import numpy as np
import matplotlib.pyplot as plt
import time
from math import sqrt
from sklearn.preprocessing import StandardScaler, OneHotEncoder

import torch
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchvision.transforms import Compose, Resize, Grayscale, ToTensor
from torch.nn import Sequential, Conv2d, BatchNorm2d, ReLU, MaxPool2d, functional

from qiskit import Aer, QuantumCircuit
from qiskit.circuit import ParameterVector
from qiskit.primitives import BackendEstimator, BackendSampler
from qiskit.circuit.library import ZZFeatureMap, RealAmplitudes
from qiskit_machine_learning.neural_networks import EstimatorQNN, SamplerQNN
from qiskit_machine_learning.connectors import TorchConnector

In [2]:
# import data

# Define constants
IMG_SIZE = (160, 160)  # Image size for resizing
NUM_CLASSES = 4  # Number of classes in the dataset
BATCH_SIZE = 32  # Batch size for data loading

def scale_image(image):
    # Scale the data so that they are between 0 and pi and their quantum encoding is unique
    image = np.array(image.squeeze())
    scaler = StandardScaler()
    scaled_img= scaler.fit_transform(image)
    scaled_img = scaled_img/max(scaled_img.flatten()) *np.pi/2 + np.pi/2
    
    return torch.tensor(scaled_img).unsqueeze(dim=0)

# Define the training dataset with ImageFolder, which assumes that the images are organized in class-specific folders
train_ds = ImageFolder(
    root=r"C:\Users\giuseppe.dambruoso\OneDrive - LUTECH SPA\Desktop\Progetto\Dataset\train",  # Path to the training data directory
    transform=Compose([  # Compose a sequence of transformations to apply to each image
        Resize(IMG_SIZE),  # Resize the image to IMG_SIZE
        Grayscale(num_output_channels=1),  # Convert the image to grayscale (single channel)
        ToTensor(),  # Convert the image to a PyTorch tensor
    ]),
    target_transform=Compose([  # Compose a sequence of transformations to apply to each target
        lambda x: torch.tensor(x),  # Convert the target to a PyTorch tensor
        lambda x: torch.eye(NUM_CLASSES)[x].to(torch.float64)  # One-hot encode the target labels 
    ])
)

# Create a data loader for the training dataset, which loads data in batches
train_loader = DataLoader(
    train_ds,  # Training dataset
    batch_size=BATCH_SIZE,  # Batch size for loading data
    shuffle=True,  # Shuffle the data at every epoch
    drop_last=True  # Drop the last incomplete batch if its size is less than BATCH_SIZE
)

# Define the validation dataset with ImageFolder, which assumes that the images are organized in class-specific folders
validation_ds = ImageFolder(
    root=r"C:\Users\giuseppe.dambruoso\OneDrive - LUTECH SPA\Desktop\Progetto\Dataset\test",  # Path to the validation data directory
    transform=Compose([  # Compose a sequence of transformations to apply to each image
        Resize(IMG_SIZE),  # Resize the image to IMG_SIZE
        Grayscale(num_output_channels=1),  # Convert the image to grayscale (single channel)
        ToTensor(),  # Convert the image to a PyTorch tensor
        lambda x: scale_image(x)  # Rescale values so that they are between 0 and pi
    ]),
    target_transform=Compose([  # Compose a sequence of transformations to apply to each target
        lambda x: torch.tensor(x),  # Convert the target to a PyTorch tensor
        lambda x: torch.eye(NUM_CLASSES)[x].to(torch.float64) # One-hot encode the target labels
    ])
)

# Create a data loader for the validation dataset, which loads data in batches
validation_loader = DataLoader(
    validation_ds,  # Validation dataset
    batch_size=BATCH_SIZE,  # Batch size for loading data
    shuffle=True,  # Shuffle the data at every epoch
    drop_last=True  # Drop the last incomplete batch if its size is less than BATCH_SIZE
)

In [3]:
# define the classical blocks
def ConvBlock(in_channels, out_channels):
    """
    Helper function to create a convolutional block.

    Parameters:
        - in_channels: Number of input channels.
        - out_channels: Number of output channels.
        - kernel_size: Size of the convolutional kernel.
        - padding: Padding type for the convolution.

    Returns:
        - model_cb: Convolutional block as a Sequential module.
    """
    model_cb = torch.nn.Sequential(
        torch.nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size=5,
            stride=1,
            padding='same'
        ),
        torch.nn.ReLU(),
        torch.nn.BatchNorm2d(out_channels),
        torch.nn.MaxPool2d(kernel_size=2)
    )
    return model_cb

def DenseBlock(in_features, out_features):
    """
    Helper function to create a dense block.

    Parameters:
        - in_features: Number of input units.
        - out_fetures: Number of output units.

    Returns:
        - model_db: Dense block as a Sequential module.
    """
    model_db = torch.nn.Sequential(
        torch.nn.Linear(
            in_features,
            out_features
        ),
        torch.nn.ReLU(),
        torch.nn.BatchNorm1d(out_features),
        torch.nn.Dropout(p=0.2)
    )
    return model_db

In [4]:
# define the parametrized quantum circuit
N_QUBITS = 4
SHOTS = 1000
entanglement='linear'
backend=Aer.get_backend('qasm_simulator')

feature_map = ZZFeatureMap(N_QUBITS, parameter_prefix='x')
ansatz = RealAmplitudes(N_QUBITS, reps=N_QUBITS, entanglement=entanglement)
pqc = QuantumCircuit(N_QUBITS)
pqc.compose(feature_map, inplace=True)
pqc.compose(ansatz, inplace=True)

In [5]:
# define the quantum convolution
def apply_quantum_filter(input, backend, shots):
    quantum_filter = EstimatorQNN(
                    circuit=pqc.decompose(), # Set quantum circuit
                    estimator=BackendEstimator(backend), # Set sampler for the quantum circuit execution
                    input_params=feature_map.parameters, # Set non-trainable input parameters
                    weight_params=ansatz.parameters, # Set trainable parameters
                    input_gradients=True
                )
    
    quantum_filter.estimator.set_options(shots=shots) # Set number of shots for the quantum circuit execution
      
    quantum_filter = TorchConnector(quantum_filter)

    item = quantum_filter(input.reshape(-1)).item()
    return item
          
def quantum_convolution(input, backend, shots):
    input_unfolded = torch.nn.functional.unfold(input, kernel_size=2, stride=1, padding=1).permute(0, 2, 1)

    output_unfolded = [[] for _ in range(input_unfolded.size(0))]

    for i in range(input_unfolded.size(0)):
        for j in range(input_unfolded.size(1)):
            output_unfolded[i].append(apply_quantum_filter(input_unfolded[i, j], backend, shots))
    
    output_unfolded = torch.tensor(output_unfolded)

    output = output_unfolded.view(output_unfolded.size(0), input.size(1), int(sqrt(input_unfolded.size(1))), int(sqrt(input_unfolded.size(1))))
    
    return output

In [6]:
# define the net
class Net(torch.nn.Module):
    def __init__(self, backend, shots):
        super(Net, self).__init__()
        self.backend = backend
        self.shots = shots

        self.cnn = torch.nn.Sequential(
            ConvBlock(1, 256),
            ConvBlock(256, 128),
            ConvBlock(128, 64),
            torch.nn.Flatten(),
            DenseBlock(64*(IMG_SIZE[0]//8)**2, 128),
            DenseBlock(128, 64),
            torch.nn.Linear(64,4)
        )

        self.quant=TorchConnector(SamplerQNN(
                circuit=pqc.decompose(),
                sampler=BackendSampler(backend),
                input_params=feature_map.parameters,
                weight_params=ansatz.parameters,
                interpret=lambda x: bin(x).count("1") % 2,
                output_shape=NUM_CLASSES
            )
        )

    def forward(self, x):
        # x = quantum_convolution(x, backend=self.backend, shots=self.shots)
        x = self.cnn(x)      
        x = self.quant(x)
        return x

In [7]:
# Create the net
model = Net(backend, SHOTS)

# perform training and validation
loss_fn = torch.nn.CrossEntropyLoss()

LEARNING_RATE = 0.001
optimizer = torch.optim.Adam(model.parameters(), LEARNING_RATE)

In [8]:
# perform training and validation
def train_one_epoch():
    for batch_index, data in enumerate(train_loader):      
        inputs, labels = data # Every data instance is an inputs + labels pair
                            # where inputs are the images of a batch and labels are their labels
        
        optimizer.zero_grad() # Zero your gradients for every batch!

        outputs = model(inputs)

        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels)
        loss.backward()

        # Adjust learning weights
        optimizer.step()

        # Calculate accuracy
        correct_predictions = 0
        _, predicted_labels = torch.max(outputs, 1)
        _, true_labels = torch.max(labels, 1)  # Convert one-hot labels to indices
        correct_predictions += (predicted_labels == true_labels).sum().item()
        accuracy = correct_predictions / BATCH_SIZE * 100
    return loss, accuracy

EPOCHS = 30  # Define the total number of epochs
train_losses = []  # Create an empty list to store training losses for each epoch
train_accuracies = [] # Create an empty list to store training accuracies for each epoch
validation_losses = []  # Create an empty list to store validation losses for each epoch
validation_accuracies = [] # Create an empty list to store validation accuracies for each epoch
model_state_dicts = [] # Create an empty list to store model state dicts for each epoch
best_vloss = 1000  # Initialize the best validation loss with a high value

for epoch in range(EPOCHS):  # Iterate over each epoch

    # Perform training
    print('EPOCH {}:'.format(epoch + 1))  # Print the current epoch number
    
    start_time = time.time()  # Record the start time of the epoch
    model.train(True)  # Set the model to training mode
    end_time = time.time()

    loss, accuracy = train_one_epoch()  # Perform one training epoch and calculate the training loss
    train_losses.append(loss.item())  # Append the training loss to the list
    train_accuracies.append(accuracy) # Append the training accuracy to the list
    model_state_dicts.append(model.state_dict()) # Append the model state dict to the list

    print('TRAIN: loss {}; accuracy {}%; time {:.0f}min'.format(loss, accuracy,(end_time-start_time)/60))
    
    # Perform validation
    model.eval()  # Set the model to evaluation mode

    total_vloss = 0.0  # Initialize the running validation loss

    end_time = time.time()  # Record the end time of the training epoch

    with torch.no_grad():  # Disable gradient computation and reduce memory consumption during validation
        start_vtime = time.time()  # Record the start time of validation
        
        correct_vpredictions = 0
        
        for batch_vindex, vdata in enumerate(validation_loader):
            vinputs, vlabels = vdata
            voutputs = model(vinputs)
            total_vloss += loss_fn(voutputs, vlabels).item()
            _, predicted_vlabels = torch.max(voutputs, 1)
            _, true_vlabels = torch.max(vlabels, 1)  # Convert one-hot labels to indices
            correct_vpredictions += (predicted_vlabels == true_vlabels).sum().item()
        
    end_vtime = time.time()  # Record the end time of validation
    
    vloss = total_vloss / (batch_vindex + 1)  # Calculate the average validation loss
    vaccuracy = correct_vpredictions / (BATCH_SIZE * (batch_vindex + 1)) * 100 # Calculate the accuracy
    validation_losses.append(vloss)  # Append the average validation loss to the list
    validation_accuracies.append(vaccuracy) # Append validation accuracy
    
    print('VALIDATION: loss {}; accuracy {}%; time {:.0f}min'.format(vloss, vaccuracy, (end_vtime-start_vtime)/60))
    print()

EPOCH 1:
TRAIN: loss 1.6183740459382534; accuracy 3.125%; time 0min
VALIDATION: loss 1.5953592661386118; accuracy 8.333333333333332%; time 4min

EPOCH 2:


KeyboardInterrupt: 