In [1]:
from utils import *
import numpy as np
import matplotlib.pyplot as plt
import pyqpanda as pq
import pyvqnet as pv
import random

%matplotlib inline

In [2]:
train_data = Dataset(
    "data/cifar-10-batches-py/data_batch_1",
    "data/cifar-10-batches-py/data_batch_2",
    "data/cifar-10-batches-py/data_batch_3",
    "data/cifar-10-batches-py/data_batch_4",
    "data/cifar-10-batches-py/data_batch_5",
)
train_data.enhance()
test_data = Dataset("data/cifar-10-batches-py/test_batch")

In [3]:
obs_list = [
    {
        "wires": [i],
        "observables": ["Z"],
        "coefficient": [1],
    }
    for i in range(10)
]


def pqc(input, param, qubits, cbits, machine):
    n = len(input)
    prog = pq.QProg()
    prog.insert(pq.H(qubits[10]))
    for i in range(10):
        prog.insert(pq.CNOT(qubits[10], qubits[i]))
    for i in range(n):
        prog.insert(pq.U2(qubits[i % 10], param[i], input[i]))
    for i in range(10):
        prog.insert(pq.CNOT(qubits[10], qubits[i]))

    re = pv.qnn.measure.MeasurePauliSum(machine, prog, obs_list, qubits)
    return re


class Model(pv.nn.module.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = pv.nn.Conv2D(3, 30, (3, 3))
        self.conv2 = pv.nn.Conv2D(30, 30, (3, 3))
        self.pool2 = pv.nn.MaxPool2D((2, 2), (2, 2))
        self.conv3 = pv.nn.Conv2D(30, 60, (3, 3))
        self.conv4 = pv.nn.Conv2D(60, 60, (3, 3))
        self.pool4 = pv.nn.MaxPool2D((2, 2), (2, 2))
        self.conv5 = pv.nn.Conv2D(60, 120, (3, 3))
        self.conv6 = pv.nn.Conv2D(120, 120, (3, 3))
        self.qc = pv.qnn.quantumlayer.QuantumLayer(
            pqc, 120, "CPU", 11, diff_method="finite_diff"
        )
        # self.qc = pv.nn.Linear(120, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.pool4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = pv.tensor.flatten(x, 1)
        x = 2 * pv.tensor.atan(x)
        x = self.qc(x)
        return x

In [4]:
epoch = 100
batch = 16
holdout = 10000

model = Model()
print(sum(p.numel() for p in model.parameters()))

47790


In [5]:
X, Y = train_data.getdatas()
los = pv.nn.loss.CrossEntropyLoss()
opt = pv.optim.SGD(model.parameters())

acc_train = []
acc_test = []

for e in range(1, epoch + 1):
    model.train()

    correct_tot = 0
    for I, (x, y) in enumerate(
        pv.data.data_generator(
            X[:-holdout], Y[:-holdout], batch_size=batch, shuffle=True
        )
    ):
        i = I + 1
        # print(i)
        opt.zero_grad()
        y_pred = model(x)
        loss = los(y, y_pred)
        correct = sum(y_pred.argmax(1, False).to_numpy() == y)
        acc = correct / y.shape[0]
        correct_tot += correct
        loss.backward()
        opt._step()
        if i % 10 == 0:
            print(
                f"epoch {e}/{epoch} \t batch {batch*i}/{Y.shape[0]-holdout} \t loss {loss.item():.2f} \t accuracy {acc:.2f}"
            )

    model.eval()

    acc = correct_tot / (Y.shape[0] - holdout)
    print(f"********** train: epoch {e}/{epoch} \t accuracy {100*acc:.2f}% **********")
    acc_train.append(acc)

    correct_tot = 0
    for x, y in pv.data.data_generator(
        X[-holdout:], Y[-holdout:], batch_size=batch, shuffle=False
    ):
        y_pred = model(x)
        correct_tot += sum(y_pred.argmax(1, False).to_numpy() == y)

    acc = correct_tot / holdout
    print(f"********** test: epoch {e}/{epoch} \t accuracy {100*acc:.2f}% **********")
    acc_test.append(acc)

    opt = pv.optim.SGD(model.parameters(), 0.01 * (1 - acc))

KeyboardInterrupt: 

In [None]:
pv.utils.storage.save_parameters(model.state_dict(), "train13.model")

In [None]:
m = Model()
m.load_state_dict(pv.utils.storage.load_parameters("train13.model"))

In [None]:
m.eval()
X, Y = test_data.getdatas()
correct_tot = 0
for x, y in pv.data.data_generator(X, Y, batch_size=batch, shuffle=False):
    y_pred = model(x)
    correct_tot += sum(y_pred.argmax(1, False).to_numpy() == y)
acc = correct_tot / Y.shape[0]
print(f"********** eval: accuracy {100*acc:.2f}% **********")

In [None]:
pv.tensor.atan()