<a href="https://colab.research.google.com/github/Piras2024/QNL-Net-for-Image-Classification-Review-by-Matteo-Piras/blob/main/MNIST_QNL_Net.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install qiskit==1.4.2
import qiskit
print(qiskit.__version__)

#PQC DEFINITION

In [None]:
from qiskit import QuantumCircuit
from qiskit.circuit import Parameter


class QNLNetCircuit:
    def __init__(self, num_qubits=4, ansatz=0, ansatz_reps=1):
        """
        QNLNNCircuit class implements a quantum circuit
        for a non-local neural network.

        Args:
             num_qubits: The number of qubit used in the circuit. It is fixed
                to be 4 qubits for this circuit implementation.
        """
        self.ansatz = ansatz
        self.ansatz_reps = ansatz_reps
        self.num_qubits = num_qubits
        self.circuit = QuantumCircuit(num_qubits)

        # Parameters to be optimized
        self.parameters = self._setup_parameters()

        # Create the circuit
        self.build_circuit()

    def _setup_parameters(self):
        """
        Sets the parameters to be optimized for the circuit.
        """
        params = {}
        for i in range(self.ansatz_reps):
            params[f'x_{2*i}'] = Parameter(f'x_{2*i}')
            params[f'x_{2*i+1}'] = Parameter(f'x_{2*i+1}')
            params[f'theta_{i}'] = Parameter(f'theta_{i}')
            params[f'phi_{i}'] = Parameter(f'phi_{i}')
            params[f'g_{i}'] = Parameter(f'g_{i}')
        return params

    def _apply_ansatz_layer(self, rep):
        """
        Applies a single ansatz layer to the circuit.
        """
        x_0 = self.parameters[f'x_{2*rep}']
        x_1 = self.parameters[f'x_{2*rep+1}']
        theta_0 = self.parameters[f'theta_{rep}']
        phi_0 = self.parameters[f'phi_{rep}']
        g_0 = self.parameters[f'g_{rep}']

        self.circuit.rz(x_0, 0)
        self.circuit.ry(theta_0, 1)
        self.circuit.ry(phi_0, 2)
        self.circuit.rx(g_0, 3)

        if self.ansatz == 0:
            self.circuit.cx(1, 2)
            self.circuit.cx(2, 3)
            self.circuit.cx(3, 0)
        elif self.ansatz == 1:
            self.circuit.cx(3, 2)
            self.circuit.cx(2, 1)
            self.circuit.cx(1, 0)
        elif self.ansatz == 2:
            self.circuit.cx(1, 3)
            self.circuit.cx(3, 2)
            self.circuit.cx(2, 0)
        else:
            print("Invalid Ansatz")

        self.circuit.rz(x_1, 0)


    def build_circuit(self):
        """
        Builds the QNLNN circuit with the desired ansatz
        and number of repetitions
        """
        for rep in range(self.ansatz_reps):
            self._apply_ansatz_layer(rep)


    def circuit_parameters(self):
        """
        Returns the set of parameters.
        """
        return set(self.parameters.values())

    def get_circuit(self):
        """
        Returns the circuit.
        """
        return self.circuit

In [None]:
from qiskit.visualization import circuit_drawer

# Instantiate the circuit with ansatz 0
circuit = QNLNetCircuit(ansatz=0, ansatz_reps=1)

# Get the QuantumCircuit object
qc = circuit.get_circuit()

# Print the circuit
print(qc)

In [None]:
!pip install qiskit-machine-learning

#MULTICLASS CLASSIFICATION MODEL DEFINITION

In [None]:
import torch
from torch.nn import (
    Module,
    Conv2d,
    Linear,
    Dropout2d,
    Flatten,
)
from torch import cat
import torch.nn.functional as F
from qiskit_machine_learning.connectors import TorchConnector
from qiskit import QuantumCircuit
from qiskit.circuit.library import ZFeatureMap
from qiskit_machine_learning.neural_networks import EstimatorQNN
from qiskit.quantum_info import SparsePauliOp, Pauli

num_qubits = 4
output_shape = 4  # Number of classes


# Compose QNL-Net with Feature Map
def create_qnlnet(feature_map_reps, ansatz, ansatz_reps):
    """
    Compose QNL-Net with Feature Map utilizing EstimatorQNN.

    Returns:
        Quantum non-local neural network.
    """
    # Feature Map for Encoding
    feature_map = ZFeatureMap(num_qubits, reps=feature_map_reps)

    # QNL-Net circuit
    qnlnet_instance = QNLNetCircuit(num_qubits=num_qubits, ansatz=ansatz, ansatz_reps=ansatz_reps)
    qnlnet_circuit = qnlnet_instance.get_circuit()

    # QNL-Net circuit
    qc = QuantumCircuit(num_qubits)
    qc.compose(feature_map, inplace=True)
    qc.compose(qnlnet_circuit, inplace=True)

    # EstimatorQNN Observable
    pauli_z_qubit0 = Pauli('Z' + 'I' * (num_qubits - 1))
    observable_0 = SparsePauliOp([pauli_z_qubit0])
    pauli_z_qubit1 = Pauli('I' + 'Z' + 'I' * (num_qubits - 2))
    observable_1 = SparsePauliOp([pauli_z_qubit1])

    #observables = [observable_0]
    observables= [observable_0, observable_1]

    # REMEMBER TO SET input_gradients=True FOR ENABLING HYBRID GRADIENT BACKPROP
    qnlnet = EstimatorQNN(
        circuit=qc,
        observables=observables,
        input_params=feature_map.parameters,
        weight_params=qnlnet_instance.circuit_parameters(),
        input_gradients=True,
    )

    return qnlnet


# Define torch Module for Hybrid CNN-QNL-Net
class HybridCNNQNLNet(Module):
    """
    HybridCNNQNLNet is a hybrid quantum-classical convolutional neural network
    with QNL-Net.

    Args:
        qnlnet: Quantum non-local neural network.
    """

    def __init__(self, qnlnet):
        super().__init__()
        self.conv1 = Conv2d(1, 2, kernel_size=5)
        self.conv2 = Conv2d(2, 16, kernel_size=5)
        self.dropout = Dropout2d()
        self.flatten = Flatten()
        self.fc1 = Linear(256, 128)
        self.fc2 = Linear(128, num_qubits)  # 4 inputs to QNL-Net

        # Apply torch connector, weights chosen
        # uniformly at random from interval [-1,1].
        self.qnlnet = TorchConnector(qnlnet)

        # output from QNLNN
        self.output_layer = Linear(2, output_shape)

    def forward(self, x):
        """
        Forward pass of the HybridCNNQNLNet.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            x (torch.Tensor): Output tensor.
        """
        # CNN
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)

        # QNLNN
        x = self.qnlnet.forward(x)

        # Post-QNLNN Classical Linear layer
        x = self.output_layer(x)

        #x = cat((x, 1 - x), -1)

        return x

#MODEL INIZIALIZATION

In [None]:
import numpy as np
import torch.optim as optim
from torch import manual_seed, no_grad, device
from torch.nn import NLLLoss
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchsummary import summary
import csv

device = device("cuda" if torch.cuda.is_available() else "cpu")

ansatz = 0
feature_map_reps = 1
ansatz_reps = 1
num_epochs = 10
lr = 3e-4
qnlnn = create_qnlnet(feature_map_reps, ansatz, ansatz_reps)
model = HybridCNNQNLNet(qnlnn)

# Move the model to the selected device
model.to(device)


#DATATSET SETUP

In [None]:
# Set train shuffle seed (for reproducibility)
manual_seed(239)

batch_size = 10
n_train_samples = 60000
n_test_samples = 20000

# Use pre-defined torchvision function to load MNIST data
train_dataset = datasets.MNIST(
    root="./data",
    train=True,
    download=True,
    transform=transforms.Compose([transforms.ToTensor(),
                                  transforms.Normalize((0.1307,), (0.3081,))])
)

test_dataset = datasets.MNIST(
    root="./data",
    train=False,
    download=True,
    transform=transforms.Compose([transforms.ToTensor(),
                                  transforms.Normalize((0.1307,), (0.3081,))])
)

# Filter out labels
train_idx = np.append(
    np.where(np.array(train_dataset.targets) == 0)[0][:n_train_samples],
    np.append(
        np.where(np.array(train_dataset.targets) == 1)[0][:n_train_samples],
        np.append(
            np.where(np.array(train_dataset.targets) == 2)[0][:n_train_samples],
            np.where(np.array(train_dataset.targets) == 3)[0][:n_train_samples]
    )
    )
)

test_idx = np.append(
    np.where(np.array(test_dataset.targets) == 0)[0][:n_test_samples],
    np.append(
        np.where(np.array(test_dataset.targets) == 1)[0][:n_test_samples],
        np.append(
            np.where(np.array(test_dataset.targets) == 2)[0][:n_test_samples],
            np.where(np.array(test_dataset.targets) == 3)[0][:n_test_samples]
        )
    )

)

train_dataset.data = train_dataset.data[train_idx]
train_dataset.targets = np.array(train_dataset.targets)[train_idx]

test_dataset.data = test_dataset.data[test_idx]
test_dataset.targets = np.array(test_dataset.targets)[test_idx]

# Encode desired classes as targets
train_dataset.targets[train_dataset.targets == 0] = 0
train_dataset.targets[train_dataset.targets == 1] = 1
train_dataset.targets[train_dataset.targets == 2] = 2
train_dataset.targets[train_dataset.targets == 3] = 3


test_dataset.targets[test_dataset.targets == 0] = 0
test_dataset.targets[test_dataset.targets == 1] = 1
test_dataset.targets[test_dataset.targets == 2] = 2
test_dataset.targets[test_dataset.targets == 3] = 3

# Define torch dataloaders
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# Print training and testing dataset info
print(train_dataset)
print(test_dataset)
print("================================================================")

#WANDB SETUP

In [None]:
!pip install wandb -qU

In [None]:
import wandb
import os

if "WANDB_API_KEY" not in os.environ:
    print("Please set the WANDB_API_KEY environment variable.")
    print("You can find your API key at https://wandb.ai/settings")
    wandb.login()

wandb.init(project="HybridCNNQNLNet_MNIST")

In [None]:
wandb.config.ansatz = ansatz
wandb.config.feature_map_reps = feature_map_reps
wandb.config.ansatz_reps = ansatz_reps
wandb.config.num_epochs = num_epochs
wandb.config.lr = lr
wandb.config.batch_size = batch_size

#TRAINING AND TESTING

In [None]:
from tqdm.notebook import tqdm

loss_func = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
scheduler = ExponentialLR(optimizer, gamma=0.9)

model.train()  # Set model to training mode

epoch_data = []

for epoch in tqdm(range(num_epochs), desc="Training Progress"):
    total_loss = 0
    correct_train = 0
    total_train = len(train_dataset)

    for data, target in tqdm(train_loader, desc=f"Epoch {epoch + 1}", leave=False):
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()  # Initialize gradient
        output = model(data)  # Forward pass
        loss = loss_func(output, target)  # Calculate loss
        loss.backward()  # Backward pass
        optimizer.step()  # Optimize weights
        total_loss += loss.item() * data.size(0)  # Accumulate loss
        pred = output.argmax(dim=1, keepdim=True)
        correct_train += pred.eq(target.view_as(pred)).sum().item()

    epoch_loss = total_loss / total_train
    epoch_accuracy_train = correct_train / total_train

    # Testing
    model.eval()  # Set model to evaluation mode
    correct_test = 0
    total_test = len(test_dataset)

    with no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct_test += pred.eq(target.view_as(pred)).sum().item()

    test_accuracy = correct_test / total_test

    epoch_data.append((epoch + 1, epoch_loss, epoch_accuracy_train, test_accuracy))

    print("Epoch {}: Train Loss: {:.4f}; Train Accuracy: {:.4f}; Test Accuracy: {:.4f}".format(
        epoch + 1, epoch_loss, epoch_accuracy_train, test_accuracy))

    wandb.log({
        "epoch": epoch + 1,
        "train_loss": epoch_loss,
        "train_accuracy": epoch_accuracy_train,
        "test_accuracy": test_accuracy
    })

    model.train()  # Set model back to training mode
    scheduler.step()  # Adjust learning rate for next epoch
print("================================================================")
wandb.finish()
