In [1]:
from qiskit import *
# Qiskit module
from qiskit import QuantumCircuit
import qiskit.circuit.library as circuit_library
import qiskit.quantum_info as qi
#from qiskit import execute
from qiskit.utils import algorithm_globals
from qiskit.circuit.library import EfficientSU2
from qiskit_machine_learning.neural_networks import SamplerQNN, EstimatorQNN
from qiskit_machine_learning.connectors import TorchConnector
import torch
from qiskit.circuit import ParameterVector, Parameter
from qiskit.circuit.parametervector import ParameterVectorElement
from torch import Tensor
import torch.nn as nn
import numpy as np
from qiskit.quantum_info import SparsePauliOp
from tqdm.notebook import tqdm

from transformers import AutoTokenizer

from torch.utils.data import DataLoader, Dataset
from tensorflow import compat
import tensorflow_datasets as tfds

2023-07-05 00:21:48.471406: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
digits = load_digits()
X, y = digits.images[0:100], digits.target[0:100]
X_train, X_test, y_train, y_test = train_test_split(X, y)

train_mask = np.isin(y_train, [1, 7])
X_train, y_train = X_train[train_mask], y_train[train_mask]

test_mask = np.isin(y_test, [1, 7])
X_test, y_test = X_test[test_mask], y_test[test_mask]

X_train = X_train.reshape(X_train.shape[0], 4, 16)
X_test = X_test.reshape(X_test.shape[0], 4, 16)

In [3]:
X_train.shape

(17, 4, 16)

In [15]:
class qrnn(nn.Module):

  def __init__(self, n_qrbs, n_qubs, vocab_size, staggered: bool=False):
    super().__init__()
    self.n_qrbs = n_qrbs
    self.n_qubits = n_qubs
    self.staggered = staggered
    #self.embed_layer = nn.Embedding(vocab_size, n_qrbs, max_norm=1.0)
    self.qc_init()
    input_params = list(filter(lambda x: not isinstance(x, ParameterVectorElement), self.qc.parameters.data))
    weight_params = list(filter(lambda x: isinstance(x, ParameterVectorElement), self.qc.parameters.data))
    self.qnn = EstimatorQNN(circuit=self.qc, input_params=input_params, weight_params=weight_params, input_gradients=True)
    self.qrnn = TorchConnector(self.qnn)
    self.softmax = nn.Softmax(dim=0)

  def qc_init(self):
    self.regD = QuantumRegister(self.n_qubits, 'regD')
    self.regH = QuantumRegister(self.n_qubits, 'regH')
    self.regY = ClassicalRegister(self.n_qubits, 'regY')
    self.qc = QuantumCircuit(self.regD, self.regH, self.regY)
    self.theta_matrix = np.array([ParameterVector(f'θ{i}', length=3) for i in range(self.n_qubits*2)])
    self.gamma_vec = ParameterVector('γ', length=self.n_qubits*2+1)
    self.theta = Parameter('θ')
    self.angencode()
    self.apply_ansatz()

  def angencode(self):
    #theta = torch.atan(xt).tolist()[0]
    for i in range(self.n_qubits):
      self.qc.ry(self.theta, self.regD[i])

  def apply_ansatz(self):
    qubits = self.qc.qubits

    for i in range(self.n_qubits*2):  # Initial circuit rotations with parameters
      self.qc.rx(self.theta_matrix[i][0], qubits[i])
      self.qc.rz(self.theta_matrix[i][1], qubits[i])
      self.qc.rx(self.theta_matrix[i][2], qubits[i])

    for i in range(1, self.n_qubits*2):
      self.qc.rzz(self.gamma_vec[i], qubits[i-1], qubits[i])

    self.qc.rzz(self.gamma_vec[-1], qubits[-1], qubits[0])

  def measurement(self):
    self.qc.measure(self.regD, self.regY)
    # self.qc.reset(self.regD)

  def regreset(self):
    self.qc.reset(self.regD)

  def forward(self, x):
    y = []
    for i in range(self.n_qrbs):
      #self.angencode(x[i])
      theta = torch.atan(x[i]).view(1)
      out = self.softmax(self.qrnn(theta))
      y.append(out)
      # self.regreset()
      # self.measurement()
      # self.qc_init()
      # y.append(self.regY[0])
      # print(y)
    return y[-1]

In [16]:
def binary_accuracy(preds, y):
    """
    Returns accuracy per batch, i.e. if you get 8/10 right, this returns 0.8, NOT 8
    """

    #round predictions to the closest integer
    rounded_preds = (torch.round(torch.sign(preds-0.5))+1)//2
    correct = (rounded_preds == y).float() #convert into float for division 
    acc = correct.sum() / len(correct)
    return acc

In [17]:
def model_trainer(model, n_epochs, X_train, y_train):
  '''
  Model trainer to train QRNN
  Parameters:
    model (PyTorch Model): QRNN model for text or image classification with correct sizes specified
    n_epochs (int): Number of epochs to train for.
    trainloader (PyTorch Dataloader): Dataloader containing the dataset.
  '''
  train_loss = []
  optimizer = torch.optim.Adam(lr=0.03, params=model.parameters())
  criterion = nn.BCELoss()
  pbar = tqdm(total=len(X_train), leave=True)
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model.train()
  model.to(device)
  for epoch in range(n_epochs):
    optimizer.zero_grad()
    X_tensor=torch.tensor(X_train)
    predictions=model(X_tensor.float()).squeeze(1)
    #predictions=torch.sign(predictions)
    #print(predictions)
    label=torch.tensor(y_train)
    for i in range(len(label)):
        if label[i]==1:
            label[i] = 0
        else:
            label[i]=1
    #print(label)
    loss = criterion(predictions, label.float())
    acc = binary_accuracy(predictions, label)
    print('')
    print('Accuracy:',acc)
    print('')
    print(loss)
    loss.backward()
    optimizer.step()
    pbar.update()
    pbar.desc = f"Epoch: {epoch} | Batch: {batch} | Loss {loss}"
    train_loss.append(loss.cpu().detach().numpy())
    pbar.refresh()
  pbar.close()
  return model, train_loss
    


def model_tester(model, testloader):
  '''
  Model test to train QRNN
  Parameters:
    model (PyTorch Model): QRNN model for text or image classification with correct sizes specified
    testloader (PyTorch Dataloader): Dataloader containing the test dataset.
  '''
  preds = []
  labels = []
  pbar = tqdm(total=len(testloader), leave=True)
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  model.eval()
  model.to(device)
  for batch, (feature, label) in enumerate(testloader):
    feature, label = feature.to(device), label.to(device)
    with torch.no_grad():
      predictions = model(feature.squeeze())
      preds.append(predictions.cpu().numpy())
      labels.append(label.cpu().numpy())
    pbar.update()
    pbar.desc = f"Batch: {batch}"
  pbar.refresh()
  pbar.close()
  preds = np.array([1 if pred>=0.5 else 0 for pred in preds])
  labels = np.array(labels)
  acc = (preds == labels).sum() / len(preds)
  return preds, acc, labels

In [18]:
# Defining and training the model
model = qrnn(n_qrbs=16, n_qubs=3, vocab_size=4, staggered=False)
trained_model, train_loss = model_trainer(model, 50, X_train, y_train)

  0%|          | 0/17 [00:00<?, ?it/s]

RuntimeError: shape '[1]' is invalid for input of size 64

In [None]:
# Calculating accuracy of model
preds, accuracy, labels = model_tester(trained_model, imdb_testloader)
print(f"Accuracy: {accuracy}")

In [None]:
# Saving model
torch.save(model.state_dict(), "QRNN_STATE.pt")