In [None]:
import classiq
classiq.authenticate()

In [None]:
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim
from typing import List
import tqdm
import pdb

from classiq import (
    synthesize,
    qfunc,
    QArray,
    QBit,
    RX,
    CArray,
    Output,
    CReal,
    allocate,
    repeat,
    create_model,
    show
)

# choosing our data
import torchvision
import torchvision.transforms as transforms
import math
from classiq.applications.qnn.types import SavedResult
from classiq.applications.qnn import QLayer
from classiq.qmod.symbolic import pi
from classiq.synthesis import SerializedQuantumProgram

In [None]:
from classiq.execution import (
    ExecutionPreferences,
    execute_qnn,
    set_quantum_program_execution_preferences,
)
from classiq.synthesis import SerializedQuantumProgram
from classiq.applications.qnn.types import (
    MultipleArguments,
    ResultsCollection,
    SavedResult,
)


from classiq.applications.qnn.types import (
    MultipleArguments,
    ResultsCollection,
    SavedResult,
)

In [None]:
N_QUBITS = 4
num_shots = 4096

In [None]:
def execute(
    quantum_program: SerializedQuantumProgram, arguments: MultipleArguments
) -> ResultsCollection:
    quantum_program = set_quantum_program_execution_preferences(
        quantum_program, preferences=ExecutionPreferences(num_shots=num_shots)
    )
    return execute_qnn(quantum_program, arguments)

In [None]:
def post_process(result: SavedResult) -> torch.Tensor:
    res = result.value
    yvec = [
        (res.counts_of_qubits(k)["1"] if "1" in res.counts_of_qubits(k) else 0)
        / num_shots
        for k in range(N_QUBITS)
    ]

    return torch.tensor(yvec)

In [None]:
def get_circuit():

    @qfunc
    def vqc(input_: CArray[CReal, N_QUBITS], weight_: CArray[CArray[CReal, N_QUBITS], N_QUBITS], res: Output[QArray[QBit]]) -> None:
        num_qubits = input_.len
        num_qlayers = weight_.len
        allocate(input_.len, res)
        repeat(
            count=num_qlayers,
            iteration=lambda i: repeat(count=num_qubits, 
                                           iteration=lambda j:RX(pi * weight_[i][j], res[j]))
        )
        
    @qfunc
    def angle_embedding(input_: CArray[CReal, N_QUBITS], qbv: Output[QArray[QBit]]) -> None:
        allocate(input_.len, qbv)

        repeat(
            count=input_.len,
            iteration=lambda index: RX(pi * input_[index], qbv[index]),
        )

    @qfunc
    def main(input_: CArray[CReal, N_QUBITS], weight_: CArray[CArray[CReal, N_QUBITS], N_QUBITS], res: Output[QArray[QBit]]) -> None:
        x = QArray("x")
        angle_embedding(input_=input_, qbv=x)
        vqc(input_=input_, weight_=weight_, res=res)
        
    qmod = create_model(main)
    quantum_program  = synthesize(qmod)
    return quantum_program

In [None]:
class Patchify(torch.nn.Module):
    """
    Patchify layer implemented using the Conv2d layer
    """
    def __init__(self, in_channels:int, patch_size:int, hidden_size:int):
        super(Patchify, self).__init__()
        self.patch_size = patch_size
        self.conv = torch.nn.Conv2d(in_channels=in_channels, out_channels=hidden_size, kernel_size=self.patch_size, stride=self.patch_size)
        self.hidden_size = hidden_size
        
    def forward(self, x:torch.Tensor):
        bs, c, h, w = x.size()
        self.num_patches = (h // self.patch_size) ** 2

        x = self.conv(x)
        x = x.view(bs, self.num_patches, self.hidden_size)
        return x
    


In [None]:
class RotaryPositionalEmbedding(torch.nn.Module):
    """
    Rotary Positional Embedding
    """
    def __init__(self, d_model, max_seq_len):
        super(RotaryPositionalEmbedding, self).__init__()

        # Create a rotation matrix.
        self.rotation_matrix = torch.zeros(d_model, d_model)
        for i in range(d_model):
            for j in range(d_model):
                self.rotation_matrix[i, j] = math.cos(i * j * 0.01)

        # Create a positional embedding matrix.
        self.positional_embedding = torch.zeros(max_seq_len, d_model)
        for i in range(max_seq_len):
            for j in range(d_model):
                self.positional_embedding[i, j] = math.cos(i * j * 0.01)

    def forward(self, x):
        """
        Args:
            x: A tensor of shape (batch_size, seq_len, d_model).

        Returns:
            A tensor of shape (batch_size, seq_len, d_model).
        """

        # Add the positional embedding to the input tensor.
        x += self.positional_embedding

        # Apply the rotation matrix to the input tensor.
        x = torch.matmul(x, self.rotation_matrix)

        return x

In [None]:
quantum_program = get_circuit()

In [None]:
class QuantumLayer(torch.nn.Module):
    """
    Quantum Layer
    """
    def __init__(self, in_dim, out_dim):
        super(QuantumLayer, self).__init__()
        self.quantum_layer = QLayer(quantum_program, execute, post_process)

    def forward(self, x:torch.Tensor):
        size = x.size()
        x = x.view(-1, size[-1])
        x = self.quantum_layer(x)
        x = x.view(size)
        return x

In [None]:
class FFN(torch.nn.Module):
    """
    Feed Forward Network
    """
    def __init__(self, in_dim, hidden_size):
        super().__init__()
        self.linear_1 = torch.nn.Linear(in_dim, hidden_size)
        self.qlinear_1 = QuantumLayer(hidden_size, hidden_size)
        self.dropout = torch.nn.Dropout(p=0.4)
        self.linear_2 = torch.nn.Linear(hidden_size, in_dim)
        return
    
    def forward(self, x:torch.Tensor):
        x = self.linear_1(x)
        x = self.qlinear_1(x)
        x = self.dropout(x)
        x = torch.nn.functional.gelu(x)
        x = self.linear_2(x)
        return x

In [None]:
class qMHA(torch.nn.Module):
    """
    Quantum Multihead Attention
    """
    def __init__(self, in_dim:int, num_heads:int) -> None:
        super().__init__()

        self.qK = QuantumLayer(in_dim, in_dim);
        self.qQ = QuantumLayer(in_dim, in_dim);
        self.qV = QuantumLayer(in_dim, in_dim);
        self.dropout = torch.nn.Dropout(p=0.1)
        
        self.final_l = QuantumLayer(in_dim, in_dim)
        self.num_heads = num_heads
        self.in_dim = in_dim
        
        return

    def forward(self, X:torch.Tensor):

        dim = torch.sqrt(torch.Tensor([X.shape[-1]]))
        attention = torch.nn.functional.softmax((1/dim)*self.qK(X))*self.qQ(X)*self.qV(X)
        x = self.dropout(attention)
        x = self.final_l(x)
        return x

In [None]:
class qTransformerEncoder(torch.nn.Module):
    """
    Quantum Transformer Encoder Layer
    """
    def __init__(self, in_dim:int, num_heads:int) -> None:
        super().__init__()
        self.layer_norm_1 = torch.nn.LayerNorm(normalized_shape=in_dim)
        self.layer_norm_2 = torch.nn.LayerNorm(normalized_shape=in_dim)
        
        self.qMHA = qMHA(in_dim, num_heads)
        self.qFFN = FFN(in_dim, hidden_size=in_dim)
        self.dropout = torch.nn.Dropout(p=0.3)
        

    def forward(self, X:torch.Tensor):
        
        x = self.qMHA(X)
        x = self.layer_norm_1(x)
        x = self.dropout(x) + X

        y = self.layer_norm_2(x)
        y = self.qFFN(y)+y
        
        return x

In [None]:
class QVT(torch.nn.Module):
    """
    Quantum Vision Transformer;
    """
    def __init__(self, in_channels, patch_size, in_dim, hidden_size,  num_heads, n_classes, n_layers) -> None:
        super().__init__()
        self.patch_formation = Patchify(in_channels=in_channels, patch_size=patch_size, hidden_size=hidden_size)
        self.d_model = (in_dim//patch_size)**2
        self.pos_encoding = RotaryPositionalEmbedding(hidden_size, self.d_model)
        self.transformer_blocks = [qTransformerEncoder(hidden_size, num_heads) for i in range(n_layers)]
        
        self.n_classes = n_classes
        self.final_normalization = torch.nn.LayerNorm(hidden_size)
        self.final_layer = torch.nn.Linear(hidden_size, self.n_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:    
        x = self.patch_formation(x)
        x += self.pos_encoding(x)
        for trans_block in self.transformer_blocks:
            x = trans_block(x)
        x = self.final_normalization(x)
        x = x.mean(axis=1)
        x = self.final_layer(x)
        return x

In [None]:
from torchvision import datasets

In [None]:
transform=transforms.Compose([
                          transforms.ToTensor(), # first, convert image to PyTorch tensor
                          transforms.Normalize((0.1307,), (0.3081,)) # normalize inputs
                      ])

In [None]:
#### Example with the MNIST Dataset:

dataset1 = datasets.MNIST('../data', train=True, download=True,transform=transform)
dataset2 = datasets.MNIST('../data', train=False,transform=transform)

In [None]:
train_loader = torch.utils.data.DataLoader(dataset1,batch_size=1)
test_loader = torch.utils.data.DataLoader(dataset2,batch_size=1)

In [None]:

clf = QVT(in_channels=1, patch_size=7, in_dim=28, hidden_size=4, num_heads=1, n_classes=10, n_layers=1)

opt = optim.SGD(clf.parameters(), lr=0.01, momentum=0.5)

loss_history = []
acc_history = []

def train(epoch):
    clf.train() # set model in training mode (need this because of dropout)
    
    # dataset API gives us pythonic batching 
    for data, label in tqdm.tqdm(train_loader):
        # forward pass, calculate loss and backprop!
        opt.zero_grad()
        preds = clf(data)
        loss = torch.nn.functional.nll_loss(preds, label)
        loss.backward()
        loss_history.append(loss.item())
        opt.step()

def test(epoch):
    clf.eval() # set model in inference mode (need this because of dropout)
    test_loss = 0
    correct = 0
    
    for data, target in tqdm.tqdm(test_loader):
        
        output = clf(data)
        test_loss += torch.nn.functional.nll_loss(output, target).item()
        pred = output.argmax() # get the index of the max log-probability
        correct += pred.eq(target).cpu().sum()

    test_loss = test_loss
    test_loss /= len(test_loader) # loss function already averages over batch size
    accuracy = 100. * correct / len(test_loader.dataset)
    acc_history.append(accuracy)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        accuracy))

In [None]:
for epoch in range(0, 3):
    train(epoch)
    test(epoch)