In [4]:
!pip install pytorch-lightning --quiet
!pip install qiskit --quiet
!pip install qiskit-machine-learning --quiet

In [108]:
import pytorch_lightning as pl
from sklearn.datasets import make_blobs
from sklearn.preprocessing import normalize
import numpy as np
from torch.utils.data import TensorDataset, DataLoader
from torch import Tensor, LongTensor
from torch.nn import Module
import matplotlib.pyplot as plt
from torchmetrics import Accuracy
from qiskit import QuantumCircuit, Aer, transpile
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint, ModelSummary
from pytorch_lightning import Trainer
from qiskit_machine_learning.neural_networks import CircuitQNN
from qiskit_machine_learning.connectors import TorchConnector
from qiskit.circuit import ParameterVector
from qiskit.utils import QuantumInstance
import torch

In [109]:
# cluster state
def cluster_state_circuit(bits):
    qc = QuantumCircuit(bits)
    bits = list(range(bits))
    qc.h(bits)
    for this_bit, next_bit in zip(bits, bits[1:]):
        qc.cz(this_bit, next_bit)
    if(len(bits)!= 2):
        qc.cz(bits[0], bits[-1])
    return qc

In [110]:
def one_qubit_unitary(thetas):
    qc = QuantumCircuit(1)
    qc.rx(thetas[0], 0)
    qc.ry(thetas[1], 0)
    qc.rz(thetas[2], 0)
    return qc

def two_qubit_unitary(thetas):
    qc = QuantumCircuit(2)
    qc = qc.compose(one_qubit_unitary(thetas[0:3]), [0])
    qc = qc.compose(one_qubit_unitary(thetas[3:6]), [1])
    qc.rzz(thetas[6],0, 1)
    qc.ryy(thetas[7],0, 1)
    qc.rxx(thetas[8],0, 1)
    qc = qc.compose(one_qubit_unitary(thetas[9:12]), [0])
    qc = qc.compose(one_qubit_unitary(thetas[12:]), [1])
    return qc


def two_qubit_pool(thetas):
    qc = QuantumCircuit(2)
    qc = qc.compose(one_qubit_unitary(thetas[0:3]), [1])
    qc = qc.compose(one_qubit_unitary(thetas[3:6]), [0])
    qc.cnot(0,1)
    qc = qc.compose(one_qubit_unitary(thetas[0:3]).inverse(), [1])
    return qc

In [111]:
def quantum_conv_circuit(bits, thetas):
    qc = QuantumCircuit(bits)
    bits = list(range(bits))
    for first, second in zip(bits[0::2], bits[1::2]):
        qc = qc.compose(two_qubit_unitary(thetas), [first,second])
    for first, second in zip(bits[1::2], bits[2::2] + [bits[0]]):
        qc = qc.compose(two_qubit_unitary(thetas), [first, second])
    return qc

In [9]:
def quantum_pool_circuit(sources, sinks, thetas):
    qc = QuantumCircuit(len(sources) + len(sinks))
    for source, sink in zip(sources, sinks):
        qc = qc.compose(two_qubit_pool(thetas), [source, sink])
    return qc

In [10]:
# Define and create QNN
def create_qcnn(n):
    qi = QuantumInstance(Aer.get_backend("aer_simulator_statevector"))
    backend = Aer.get_backend('qasm_simulator')
    def last_qubit_prob(x):
        res = bin(x)[2:].zfill(2)
        return int(res[0])

    output_shape = 2  # parity = 0, 1

    in_thetas = ParameterVector('x', length=n)
    feature_map = QuantumCircuit(n, name="Angle Encoding")
    for i in range(n):
        feature_map.rx(in_thetas[i], i)
    feature_map = transpile(feature_map, backend = backend)
    thetas = ParameterVector('θ', length=21)
    var_form = QuantumCircuit(n, name="QCNN")
    var_form = var_form.compose(quantum_conv_circuit(n,thetas[0:15]), range(n))
    var_form = var_form.compose(quantum_pool_circuit(range(n//2), range(n//2,n), thetas[15:21]))
    ansatz = var_form
    ansatz = transpile(ansatz, backend = backend)

    qc = QuantumCircuit(n)
    qc = qc.compose(transpile(cluster_state_circuit(n), backend = backend), range(n))
    qc = qc.compose(feature_map, range(n))
    qc = qc.compose(ansatz, range(n))

    # REMEMBER TO SET input_gradients=True FOR ENABLING HYBRID GRADIENT BACKPROP
    qcnn = CircuitQNN(
        qc,
        input_params = feature_map.parameters,
        weight_params=ansatz.parameters,
        input_gradients=True,
        interpret=last_qubit_prob,
        output_shape=output_shape,
        quantum_instance=qi,
    )
    return qcnn

In [11]:
class BlobsDataModule(pl.LightningDataModule):
    def __init__(self, n_features = 2):
        super().__init__()
        self.batch_size = 8
        self.num_workers = 4
        
        self._dims = (n_features)
        self.output_dims = (1,)
        self.mapping = list(range(2))
        self.n_features = n_features
    def config(self):
        """Return important settings of the dataset, which will be passed to instantiate models."""
        return {"input_dims": self._dims, "output_dims": self.output_dims, "mapping": self.mapping}
    
    def prepare_data(self):
        self.x_train, self.y_train = make_blobs(n_samples=100,n_features=self.n_features,centers=2,random_state=42)
        self.x_val, self.y_val = make_blobs(n_samples=50,n_features=self.n_features,centers=2,random_state=42)
        self.x_test, self.y_test = make_blobs(n_samples=50,n_features=self.n_features,centers=2,random_state=42)
    
    def setup(self, stage=None):
        self.x_train = normalize(self.x_train)
        self.x_val = normalize(self.x_val)
        self.x_test = normalize(self.x_test)
        
        self.x_train = Tensor(self.x_train)
        self.y_train = Tensor(self.y_train).type(LongTensor)
        
        self.x_val = Tensor(self.x_val)
        self.y_val = Tensor(self.y_val).type(LongTensor)
        
        self.x_test = Tensor(self.x_test)
        self.y_test = Tensor(self.y_test).type(LongTensor)
        
        self.data_train = TensorDataset(self.x_train, self.y_train)
        self.data_val = TensorDataset(self.x_val, self.y_val)
        self.data_test = TensorDataset(self.x_test, self.y_test)

    def train_dataloader(self):
        return DataLoader(
            self.data_train,
            shuffle=True,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def val_dataloader(self):
        return DataLoader(
            self.data_val,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

    def test_dataloader(self):
        return DataLoader(
            self.data_test,
            shuffle=False,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
        )

In [12]:
data = BlobsDataModule()
data.prepare_data()
data.setup()

In [13]:
x_train = data.data_train.tensors[0]
plt.scatter(x_train[:,0], x_train[:,1], s = 50)

In [14]:
x_test = data.data_test.tensors[0]
plt.scatter(x_test[:,0], x_test[:,1], s = 50)

In [15]:
print(x_train.max(),x_train.min())
print(x_test.max(),x_test.min())

In [98]:
class Accuracy(Accuracy):
    def __init__(self):
        super().__init__()
        self.add_state("correct", default=torch.tensor(0), dist_reduce_fx="sum")
        self.add_state("total", default=torch.tensor(0), dist_reduce_fx="sum")

    def update(self, preds, target):
#         preds, target = self._input_format(preds, target)
#         assert preds.shape == target.shape
        
        _, self.correct = torch.max(preds, dim=1)
        self.correct = torch.sum(self.correct == target)
        self.total = torch.tensor(len(target))

    def compute(self):
        return torch.tensor(self.correct.item() / self.total.item())

In [99]:
class BaseModel(pl.LightningModule):
    def __init__(self,model):
        super().__init__()
        self.model = model
        optimizer = 'Adam'
        self.optimizer_class = getattr(torch.optim, optimizer)
        self.lr = 1e-3
        self.loss_fn = torch.nn.NLLLoss()
        self.one_cycle_total_steps = 100
        self.one_cycle_max_lr = None
        
        self.train_acc = Accuracy()
        self.val_acc = Accuracy()
        self.test_acc = Accuracy()
    
    def configure_optimizers(self):
        optimizer = self.optimizer_class(self.parameters(), lr=self.lr)
        if self.one_cycle_max_lr is None:
            return optimizer
        scheduler = torch.optim.lr_scheduler.OneCycleLR(
            optimizer=optimizer, max_lr=self.one_cycle_max_lr, total_steps=self.one_cycle_total_steps
        )
        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_acc"}
    
    def forward(self, x):
        return self.model(x)
    
    def training_step(self, batch, batch_idx): 
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        self.log("train_loss", loss)
        self.train_acc(logits, y)
        self.log("train_acc", self.train_acc)
        return loss

    def validation_step(self, batch, batch_idx): 
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y)
        self.log("val_loss", loss, prog_bar=True)
        self.val_acc(logits, y)
        self.log("val_acc", self.val_acc, on_step=False, on_epoch=True, prog_bar=True)

    def test_step(self, batch, batch_idx): 
        x, y = batch
        logits = self(x)
        self.test_acc(logits, y)
        self.log("test_acc", self.test_acc, on_step=False, on_epoch=True)

In [100]:
class QCNN(Module):
    def __init__(self, data_config):
        super().__init__()
        
        input_dims = data_config["input_dims"]
        num_classes = len(data_config["mapping"])
        
        self.qcnn = TorchConnector(create_qcnn(input_dims))
        
    def forward(self,x):
        out = self.qcnn(x)  # apply QCNN
        return out

In [101]:
class MetricTracker(pl.Callback):
    def __init__(self):
        self.collection = {
            'train_loss': [],
            'val_loss': [],
            'val_acc': [],
            'train_acc': []
        }

    def on_train_batch_end(self, *args, **kwargs):
        metrics = trainer.logged_metrics
        self.collection['train_loss'].append(metrics['train_loss'].item())
        self.collection['train_acc'].append(metrics['train_acc'].item())
    
    def on_val_batch_end(self, *args, **kwargs):
        self.collection['val_loss'].append(metrics['val_loss'].item())
        self.collection['val_acc'].append(metrics['val_acc'].item())

In [102]:
data = BlobsDataModule()
model = QCNN(data.config())
lit_model = BaseModel(model)

early_stopping_callback = EarlyStopping(monitor="val_acc", 
                                        mode="max", 
                                        patience=10, 
                                        stopping_threshold = 0.90)
model_checkpoint_callback = ModelCheckpoint(filename="{epoch:03d}-{val_loss:.3f}-{val_acc:.3f}", 
                                            monitor="val_acc", 
                                            mode="max", 
                                            dirpath='./logs/')
model_summary_callback = ModelSummary(max_depth=1)
metric_tracker_callback = MetricTracker()

callbacks = [early_stopping_callback, model_checkpoint_callback, model_summary_callback, metric_tracker_callback]

gpus = -1 if torch.cuda.is_available() else None

In [None]:
trainer = Trainer(gpus = gpus, 
                callbacks=callbacks, 
                fast_dev_run=False,
                max_epochs=10,
                )

trainer.tune(lit_model, datamodule=data)

trainer.fit(lit_model, datamodule=data)
trainer.test(lit_model, datamodule=data)

In [104]:
train_losses = metric_tracker_callback.collection['train_loss']
val_losses = metric_tracker_callback.collection['val_loss']
train_accs = metric_tracker_callback.collection['train_acc']
val_accs = metric_tracker_callback.collection['val_acc']

In [105]:
plt.plot(train_losses, color='r', label='train loss')
plt.plot(train_accs, color='g', label='train accs')
plt.legend()


In [106]:
plt.plot(val_losses, color='b', label='=val loss')
plt.plot(val_accs, color='y', label='val accs')
plt.legend()