#### Imports

In [26]:
import torch
import classiq
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

from typing import Dict
from classiq import Model, synthesize, QReg
from classiq.builtin_functions import HardwareEfficientAnsatz
from classiq.applications.qnn import QLayer
from classiq.execution import execute_qnn
from classiq.synthesis import SerializedQuantumProgram

from classiq.applications.qnn.types import (
    MultipleArguments,
    SavedResult,
    ResultsCollection,
)

classiq.authenticate()

Generating a new refresh token should only be done if the current refresh token is compromised.
To do so, set the overwrite parameter to true


In [9]:
_NUM_QUBITS =  4
_REPS = 1
_FULLY_CONNECTED_MESH = [[0, 1], [1, 2], [2, 3], [3, 2], [2, 1], [1, 0]]

#### Classical Layer for Image Commpression:
The input MNIST images are all 28 × 28. This Classical Layer will firstly center-crop them to 24 × 24 and
then down-sample them to 4 × 4 for MNIST.

In [18]:
class ClassicalCompressionLayer(nn.Module):
    def __init__(self):
        super(ClassicalCompressionLayer, self).__init__()
        self.center_crop = transforms.CenterCrop((24, 24))
        self.down_sample = transforms.Resize((4, 4))
        self.flatten = nn.Flatten()
        
    def forward(self, x):
        x = self.center_crop(x)
        x = self.down_sample(x)
        x = self.flatten(x)
        return x

In [10]:
def add_entanglement(md: Model, prefix: str, in_wire=None) -> Dict[str, QReg]:
    if in_wire is not None:
        kwargs = { "in_wires": { "IN": in_wire["OUT"] } }
    else: 
        kwargs = {}
    
    hwea_params = HardwareEfficientAnsatz(
        num_qubits=_NUM_QUBITS,
        connectivity_map=_FULLY_CONNECTED_MESH,
        reps=_REPS,
        one_qubit_gates=[],
        two_qubit_gates=["rzz", "rxx", "rzx", "cz"],
        parameter_prefix=prefix,
    )
    
    return md.HardwareEfficientAnsatz(hwea_params, **kwargs)

In [11]:
model = Model()
out1 = add_entanglement(model, "input_")
out2 = add_entanglement(model, "weight_", out1)

quantum_program = synthesize(model.get_model())

In [12]:
def execute(quantum_program: SerializedQuantumProgram, arguments: MultipleArguments) -> ResultsCollection:
    return execute_qnn(quantum_program, arguments)

In [13]:
# Post-process the result, returning a dict:
# Note: this function assumes that we only care about
#   differentiating a single state (|0>)
#   from all the rest of the states.
#   In case of a different differentiation, this function should change.
def post_process(result: SavedResult) -> torch.Tensor:
    """
    Take in a `SavedResult` with `ExecutionDetails` value type, and return the
    probability of measuring |0> which equals the amount of `|0>` measurements
    divided by the total amount of measurements.
    """
    counts: dict = result.value.counts
    # The probability of measuring |0>
    p_zero: float = counts.get("0", 0.0) / sum(counts.values())
    return torch.tensor(p_zero)

In [14]:
class Net(torch.nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__()
        self.qlayer = QLayer(
            quantum_program,
            execute,
            post_process,
            *args,
            **kwargs
        )
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.qlayer(x)
        return x

In [15]:
model = Net()

### Data Loading

In [22]:
class CTDataset(Dataset):
    def __init__(self, filepath):
        self.x, self.y = torch.load(filepath)
        self.x = self.x / 255.
        self.y = F.one_hot(self.y, num_classes=10).to(float)
    def __len__(self): 
        return self.x.shape[0]
    def __getitem__(self, ix): 
        return self.x[ix], self.y[ix]

In [24]:
train_ds = CTDataset('MNIST_DATASET/processed/training.pt')
test_ds = CTDataset('MNIST_DATASET/processed/test.pt')
train_dl = DataLoader(train_ds, batch_size=5)