In [None]:
%config InlineBackend.figure_format = "svg"

from sklearn.model_selection import train_test_split
from sklearn.decomposition import TruncatedSVD
from sklearn.metrics import accuracy_score
from torch.autograd import Variable
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
import torch.nn.functional as F
from tqdm.notebook import tqdm
from itertools import product
from math import ceil, log
import pennylane as qml
import pandas as pd
import numpy as np
import torch

## MNIST image dataset

In [None]:
# Load data
path = "/home/gilbertocunha/Documentos/GitHub/QuantumVariationalCircuits/data/"
X_train = pd.read_csv(path + "mnist_train.csv")
X_test = pd.read_csv(path + "mnist_test.csv")
X_train

In [None]:
# Get labels from data
y_train = X_train["label"].to_numpy()[:6000]
y_test = X_test["label"].to_numpy()[:1000]
print(y_train.shape, y_test.shape)

# Get features from data
X_train = X_train.drop("label", axis=1).to_numpy()[:6000]
X_test = X_test.drop("label", axis=1).to_numpy()[:1000]
print(X_train.shape, X_test.shape)

Our data is too big, each feature vector has 784 pixels, we need to reduce this heavily. In order to do so, we will utilize truncated Single Valued Decomposition, a dimensionality reduction technique, to reduce our feature vector to 8 values, while maintaining the classes separated.

In [None]:
# Apply TSVD
tsvd = TruncatedSVD(n_components=16)
X_train = tsvd.fit_transform(X_train)
X_test = tsvd.fit_transform(X_test)
print(X_train.shape, X_test.shape)

Finally, our data needs to be normalized in order to be passed through a quantum circuit. Therefore, we will normalize it

In [None]:
def normalize (matrix):
    """
    Normalizes vector squared amplitudes to one
    for every vector in the input matrix
    """
    norm = np.sqrt(np.sum(matrix ** 2, -1))
    r = []
    for i in range(len(matrix)):
        r.append(matrix[i,:]/norm[i])
    r = np.array(r)
    return r

In [None]:
# Normalize our data
X_train = normalize(X_train)
X_test = normalize(X_test)

Let's check if data is still easily separable after dimensionality reduction

In [None]:
# Plotting training data
dim1, dim2 = 2, 14
plt.title("Data-points for mnist classes")
for i in range(10):
    plt.scatter(X_train[y_train == i][:,dim1], X_train[y_train == i][:,dim2], label=f"{i}")
plt.xlabel(f"Dimension {dim1}")
plt.ylabel(f"Dimension {dim2}")
plt.legend()
plt.show()

## Dataset Class

In [None]:
class Dataset:
    def __init__(self, features, labels, batch_size):
        # Randomly shuffle indices of the data
        idx = np.random.permutation(len(features))
        
        # Split data into batches
        self.features = Variable(torch.tensor(self.split(features[idx], batch_size)), requires_grad=False)
        self.labels = Variable(torch.tensor(self.split(labels[idx], batch_size)), requires_grad=False)
        
        self.batch_size = batch_size
        self.counter = 0
        assert len(self.features) == len(self.labels), "features and labels have different sizes"
        
    def split(self, array, batch_size):
        r = []
        num_splits = len(array)//batch_size
        for i in range(num_splits):
            start = i*batch_size
            size = batch_size
            r.append(array[start:start+size])
        r = np.stack(r, axis=0)
        return r
    
    def input_size(self):
        return len(self.features[0][0])
        
    def num_qubits(self):
        return ceil(log(self.input_size(), 2))
        
    def sample_batch(self):
        """
        A method that samples a batch from the dataset
        """
        features = self.features[self.counter]
        labels = self.labels[self.counter]
        self.counter = (self.counter + 1) % len(self.features)
        return features, labels
    
    def __len__(self):
        return len(self.features)

## Train, Val, Test split

In [None]:
# Splitting data into train, test and validation
val_split, random_state = 0.2, 42
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_split, random_state=random_state)

# Creating the datasets for train, test and validation
batch_size = 32
trainset = Dataset(X_train, y_train, batch_size)
valset = Dataset(X_val, y_val, batch_size)
testset = Dataset(X_test, y_test, batch_size)

## Quantum Variational Circuit hyperparameters

In [None]:
# number of qubits in the circuit
nr_qubits = trainset.num_qubits()

# number of layers in the circuit
nr_layers = 2

# define quantum device
dev = qml.device("default.qubit", wires=nr_qubits)

## Quantum Variational Circuit Implementation

In [None]:
def combinations(values):
    """
    Calculates every possible pair combination of values
    """
    r = []
    for i in range(len(values)):
        for j in range(i+1,len(values)):
            r.append((i,j))
    return r

# a layer of the circuit ansatz
def layer(params, j):
    for i in range(params.shape[1]):
        qml.RX(params[j, i, 0], wires=i)
        qml.RY(params[j, i, 1], wires=i)
        qml.RZ(params[j, i, 2], wires=i)

    # Add CNOT for every qubit combination
    combs = combinations(range(params.shape[1]))
    for i, k in combs:
        if j % 2 == 1:
            i, k = k, i
        qml.CNOT(wires=[i, k])

@qml.qnode(dev)
def qnode(inputs, params):
    # Encoding
    qml.QubitStateVector(inputs, wires=range(params.shape[1]))
    
    # repeatedly apply each layer in the circuit
    for j in range(params.shape[0]):
        layer(params, j)

    # returns the expectation of the input matrix A on the first qubit
    measurements = [qml.expval(qml.PauliZ(i)) for i in range(params.shape[1])]
    return measurements

def qvc(params, features):
    weights, bias = params
    predictions = [circuit(weights, x) + bias for x in features]
    return predictions

# cost function
def loss_fn(predictions, labels):
    """
    A simple square loss function to evaluate predictions
    """
    loss = 0.0
    for p, l in zip(predictions, labels):
        loss += (l - p) ** 2
    return loss

## Pytorch Hybrid Classical-Quantum Integration

In [None]:
class Model(torch.nn.Module):
    def __init__(self, qnode, nr_layers, nr_qubits):
        super(Model, self).__init__()
        self.qvc = qml.qnn.TorchLayer(qnode, {"params": (nr_layers, nr_qubits, 3)})
        self.linear1 = torch.nn.Linear(nr_qubits, 12)
        self.linear2 = torch.nn.Linear(12, 10)
        
    def forward(self, x):
        x = self.qvc(x)
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return x
    
# Create pytorch model, optimizer and loss function
model = Model(qnode, nr_layers, nr_qubits)
optimizer = torch.optim.Adam(model.parameters(), lr=0.04)
loss_fn = torch.nn.CrossEntropyLoss()

# Training loop for the hybrid model

In [None]:
# Number of epochs to train on
epochs = 10

# Best accuracy and loss
best_tr_acc, best_val_acc = 0, 0
best_tr_loss, best_val_loss = float("inf"), float("inf")

# Performing the training loop
epoch_tqdm = tqdm(range(epochs), total=epochs, desc="Fitting")
for epoch in epoch_tqdm:
    # Iterate training data
    tr_accs, tr_losses = [], []
    tr_tqdm = tqdm(range(len(trainset)), total=len(trainset), desc="Training")
    for _ in tr_tqdm:
        # Sample batch of data and get predictions
        X, y = trainset.sample_batch()
        y_hat = model(X.float())

        # Get loss and update parameters
        optimizer.zero_grad()
        loss = loss_fn(y_hat, y)
        loss.backward()
        optimizer.step()
        
        # Add training info to tqdm progress bar
        y_hat = torch.argmax(y_hat.detach(), dim=-1)
        tr_acc = accuracy_score(y, y_hat)
        info = {"loss": loss.item(), "acc": tr_acc}
        tr_tqdm.set_postfix(info)
        tr_accs.append(tr_acc)
        tr_losses.append(loss.item())
        
    # Calculate training epoch accuracy and loss
    tr_acc = sum(tr_accs) / len(tr_accs)
    tr_loss = sum(tr_losses) / len(tr_losses)
    tr_acc = 0.2
    tr_loss = 1
    
    # Get best loss and accuracy
    if tr_acc > best_tr_acc: best_tr_acc = tr_acc
    if tr_loss < best_tr_loss: best_tr_loss = tr_loss
        
    # Iterate validation data
    val_accs, val_losses = [], []
    val_tqdm = tqdm(range(len(valset)), total=len(valset), desc="Validating")
    with torch.no_grad():
        for _ in val_tqdm:
            # Sample batch of data and get predictions
            X, y = valset.sample_batch()
            y_hat = model(X.float())
            loss = loss_fn(y_hat, y)

            # Add validation info to tqdm progress bar
            y_hat = torch.argmax(y_hat.detach(), dim=-1)
            val_acc = accuracy_score(y, y_hat)
            info = {"loss": loss.item(), "acc": val_acc}
            val_tqdm.set_postfix(info)
            val_accs.append(val_acc)
            val_losses.append(loss.item())
            
    # Calculate training epoch accuracy and loss
    val_acc = sum(val_accs) / len(val_accs)
    val_loss = sum(val_losses) / len(val_losses)
    
    # Save best model
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        torch.save(model.state_dict(), "models/best_model_dict.pt")
    if val_loss < best_val_loss: best_val_loss = val_loss
    
    # Update info to tqdm bar
    info = {
        "b_tr_acc": best_tr_acc, 
        "b_tr_loss": best_tr_loss, 
        "b_val_acc": best_val_acc, 
        "b_val_loss": best_val_loss
    }
    epoch_tqdm.set_postfix(info)