# arXiv:1810.03787 [Quantum Convolutional Neural Networks](https://arxiv.org/abs/1810.03787)

In [None]:
from __future__ import annotations

import sys
import math
import pickle
from collections.abc import Sequence
import networkx as nx
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cupy as cp
from qiskit import QuantumCircuit
from qmlant.models import SimpleQCNN
from qmlant.neural_networks import (
    EstimatorTN,
    circuit_to_einsum_expectation,
)
import qmlant.optim as optim
from qmlant.visualization import draw_quimb_tn

from qmlant.datasets import HorVerBars

## Data preparation for HorVerBars

In [None]:
data_size = 50

trainset, testset = HorVerBars.create_train_and_test(
    data_size=data_size,
    test_size=0.3,
)

## Quantum circuit

In [None]:
n_qubits = 8

placeholder_circuit = SimpleQCNN.make_placeholder_circuit(n_qubits, insert_barrier=True)
display(placeholder_circuit.draw())

## Define the Hamiltonian

In [None]:
hamiltonian = SimpleQCNN.get_hamiltonian(n_qubits)
hamiltonian

## Check locations of parameters in the TensorNetwork

In [None]:
%%time

_, _, name2locs = circuit_to_einsum_expectation(placeholder_circuit, hamiltonian)

print(name2locs)

## Show TensorNetwork structure

In [None]:
draw_quimb_tn(placeholder_circuit, hamiltonian, True)

## Train the circuit

In [None]:
class PQCTrainerTN:
    def __init__(self,
        qc: QuantumCircuit,
        initial_point: Sequence[float],
        optimizer: optim.Optimizer
    ):
        self.qc_pl = qc  # placeholder circuit
        self.initial_point = np.array(initial_point)
        self.optimizer = optimizer

    def fit(self,
        dataset: Dataset,
        batch_size: int,
        operator: str,
        callbacks: list | None = None,
        epochs: int = 1
    ) -> None:
        expr, operands, pname2locs = circuit_to_einsum_expectation(self.qc_pl, operator)

        dataloader = DataLoader(dataset, batch_size, shuffle=True, drop_last=True)
        callbacks = callbacks if callbacks is not None else []

        opt_loss = sys.maxsize
        opt_params = None
        params = self.initial_point.copy()
        if isinstance(params, list):
            params = np.array(params)

        make_pname2theta = SimpleQCNN.get_make_pname2theta(n_qubits)
        batch_filter = SimpleQCNN.get_batch_filter()
        qnn = EstimatorTN(pname2locs, expr, operands, make_pname2theta=make_pname2theta, batch_filter=batch_filter)

        for epoch in range(epochs):
            for batch, label in dataloader:
                batch, label = self._preprocess_batch(batch, label)
                label = label.reshape(label.shape[0], -1)

                # "forward"
                expvals = qnn.forward(params, batch)
                total_loss = np.mean((expvals - label)**2)

                # "backward"
                # The parameter-shift rule
                # [[∂f1/∂θ1, ∂f1/∂θ2, ..., ∂f1/∂θn],
                #  [∂f2/∂θ1, ∂f2/∂θ2, ..., ∂f2/∂θn],
                #  ...]
                grads = qnn.backward()
                expvals_minus_label = (expvals - label).reshape(batch.shape[0], -1)
                total_grads = np.mean(expvals_minus_label * grads, axis=0)

                for callback in callbacks:
                    callback(total_loss, params)

                # "update params"
                self.optimizer.update(params, total_grads)

    def _preprocess_batch(self,
        batch: torch.Tensor,
        label: torch.Tensor
    ) -> tuple[np.ndarray, np.ndarray]:
        batch = batch.detach().numpy()
        label = label.detach().numpy()
        return batch, label

In [None]:
def RunPQCTrain(
    dataset: Dataset,
    batch_size: int,
    qc: QuantumCircuit,
    operator: str,
    init: Sequence[float] | None = None,
    epochs: int = 1,
    interval: int = 100
):
    opt_params = None
    opt_loss = None

    def save_opt_params(loss, params):
        nonlocal opt_params, opt_loss

        if opt_loss is None or loss < opt_loss:
            opt_params = params.copy()
            opt_loss = loss

    # Store intermediate results
    history = {"loss": [], "params": []}
    cnt = -1

    def store_intermediate_result(loss, params):
        nonlocal cnt

        history["loss"].append(loss)
        history["params"].append(None)
        cnt += 1
        if cnt % interval != 0:
            return
        print(f'{loss=}')

    optimizer = optim.Adam(alpha=0.01)
    trainer = PQCTrainerTN(qc=qc, initial_point=init, optimizer=optimizer)
    trainer.fit(dataset, batch_size, operator,
                callbacks=[save_opt_params, store_intermediate_result],
                epochs=epochs)

    return opt_params, history["loss"]

In [None]:
%%time

_, length = SimpleQCNN.make_placeholder_circuit(n_qubits, dry_run=True)
placeholder_circuit = SimpleQCNN.make_placeholder_circuit(n_qubits)

np.random.seed(10)
init = np.random.random(length) * 2*math.pi

opt_params, loss_list = RunPQCTrain(trainset, len(trainset),
                                    placeholder_circuit, hamiltonian, init=init,
                                    epochs=50, interval=10)

print(f'final loss={loss_list[-1]}')
print(f'{opt_params=}')

## Validate results

### Measure test acc

In [None]:
testloader = DataLoader(testset, len(testset))

qc_pl = SimpleQCNN.make_placeholder_circuit(n_qubits)
expr, operands, pname2locs = circuit_to_einsum_expectation(qc_pl, hamiltonian)

make_pname2theta = SimpleQCNN.get_make_pname2theta(n_qubits)
pname2theta = make_pname2theta(opt_params)
batch_filter = SimpleQCNN.get_batch_filter()
qnn = EstimatorTN(pname2locs, expr, operands, make_pname2theta=make_pname2theta, batch_filter=batch_filter)

total = 0
total_correct = 0

for i, (batch, label) in enumerate(testloader):
    batch, label = batch.detach().numpy(), label.detach().numpy()
    batch = batch_filter(batch)
    label = label.reshape(label.shape[0], -1)

    # "forward"
    expvals = qnn.forward(opt_params, batch)
    expvals = expvals.reshape(expvals.shape[0], -1)

    predict_labels = np.sign(expvals)

    total_correct += np.sum(predict_labels == label)
    total += batch.shape[0]

print(f'test acc={np.round(total_correct/total, 2)}')

### Visualize loss_list

In [None]:
import matplotlib.pyplot as plt

plt.plot(range(len(loss_list)), loss_list)
plt.show()

# Appendix

## Using SciPy optimizers

In [None]:
from scipy.optimize import minimize

In [None]:
class PQCTrainerTN_COBYLA:
    def __init__(self,
        qc: QuantumCircuit,
        initial_point: Sequence[float]
    ):
        self.qc_pl = qc  # placeholder circuit
        self.initial_point = np.array(initial_point)

    def fit(self,
        dataset: Dataset,
        operator: str,
        callbacks: list[Callable] | None = None,
        epochs: int = 1
    ) -> None:
        expr, operands, pname2locs = circuit_to_einsum_expectation(self.qc_pl, operator)

        # full batch
        dataloader = DataLoader(dataset, len(dataset), shuffle=True, drop_last=True)
        callbacks = callbacks if callbacks is not None else []

        opt_loss = sys.maxsize
        opt_params = None
        params = self.initial_point.copy()
        if isinstance(params, list):
            params = np.array(params)

        make_pname2theta = SimpleQCNN.get_make_pname2theta(n_qubits)
        batch_filter = SimpleQCNN.get_batch_filter()
        qnn = EstimatorTN(pname2locs, expr, operands, make_pname2theta=make_pname2theta, batch_filter=batch_filter)

        batch, label = next(iter(dataloader))
        batch, label = self._preprocess_batch(batch, label)
        label = label.reshape(label.shape[0], -1)
        
        loss_list = []

        def cost(x, *args) -> float:
            nonlocal loss_list, callbacks
            
            params = x
            qnn, batch, label = args
            expvals = qnn.forward(params, batch)
            loss = np.mean((expvals - label)**2)

            for callback in callbacks:
                callback(loss, params)

            loss_list.append(loss)
            return loss

        result = minimize(
            cost,
            params,
            args=(qnn, batch, label),
            method="COBYLA",
            options={
                "maxiter": epochs
            },
        )
        return result, loss_list

    def _preprocess_batch(self,
        batch: torch.Tensor,
        label: torch.Tensor
    ) -> tuple[np.ndarray, np.ndarray]:
        batch = batch.detach().numpy()
        label = label.detach().numpy()
        return batch, label

In [None]:
def RunPQCTrain_COBYLA(
    dataset: Dataset,
    qc: QuantumCircuit,
    operator: str,
    init: Sequence[float] | None = None,
    epochs: int = 1,
    interval: int = 100
):
    opt_params = None
    opt_loss = None

    def save_opt_params(loss, params):
        nonlocal opt_params, opt_loss

        if opt_loss is None or loss < opt_loss:
            opt_params = params.copy()
            opt_loss = loss

    # Store intermediate results
    history = {"loss": [], "params": []}
    cnt = -1

    def store_intermediate_result(loss, params):
        nonlocal cnt

        history["loss"].append(loss)
        history["params"].append(None)
        cnt += 1
        if cnt % interval != 0:
            return
        print(f'{loss=}')
    
    trainer = PQCTrainerTN_COBYLA(qc=qc, initial_point=init)
    result, loss_list = trainer.fit(
        dataset,
        operator,
        callbacks=[save_opt_params, store_intermediate_result],
        epochs=epochs
    )

    return result["x"], loss_list

In [None]:
%%time

_, length = SimpleQCNN.make_placeholder_circuit(n_qubits, dry_run=True)
placeholder_circuit = SimpleQCNN.make_placeholder_circuit(n_qubits)

np.random.seed(10)
init = np.random.random(length) * 2*math.pi

opt_params, loss_list = RunPQCTrain_COBYLA(
    trainset, placeholder_circuit, hamiltonian, init=init, epochs=50, interval=10
)

print(f'final loss={loss_list[-1]}')
print(f'{opt_params=}')

In [None]:
testloader = DataLoader(testset, len(testset))

qc_pl = SimpleQCNN.make_placeholder_circuit(n_qubits)
expr, operands, pname2locs = circuit_to_einsum_expectation(qc_pl, hamiltonian)

make_pname2theta = SimpleQCNN.get_make_pname2theta(n_qubits)
pname2theta = make_pname2theta(opt_params)
batch_filter = SimpleQCNN.get_batch_filter()
qnn = EstimatorTN(pname2locs, expr, operands, make_pname2theta=make_pname2theta, batch_filter=batch_filter)

total = 0
total_correct = 0

for i, (batch, label) in enumerate(testloader):
    batch, label = batch.detach().numpy(), label.detach().numpy()
    batch = batch_filter(batch)
    label = label.reshape(label.shape[0], -1)

    # "forward"
    expvals = qnn.forward(opt_params, batch)
    expvals = expvals.reshape(expvals.shape[0], -1)

    predict_labels = np.ones_like(expvals)
    predict_labels[np.where(expvals < 0)] = -1
    predict_labels = predict_labels.astype(int)

    total_correct += np.sum(predict_labels == label)
    total += batch.shape[0]

print(f'test acc={np.round(total_correct/total, 2)}')

In [None]:
import matplotlib.pyplot as plt

plt.plot(range(len(loss_list)), loss_list)
plt.show()