<a href="https://colab.research.google.com/github/ShauryaJ1/qml_tutorial/blob/main/QML_Tutorials.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [1]:
!pip install pennylane qiskit qiskit_machine_learning

Collecting pennylane
  Downloading PennyLane-0.40.0-py3-none-any.whl.metadata (10 kB)
Collecting qiskit
  Downloading qiskit-1.3.2-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting qiskit_machine_learning
  Downloading qiskit_machine_learning-0.8.2-py3-none-any.whl.metadata (13 kB)
Collecting rustworkx>=0.14.0 (from pennylane)
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting tomlkit (from pennylane)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Collecting appdirs (from pennylane)
  Downloading appdirs-1.4.4-py2.py3-none-any.whl.metadata (9.0 kB)
Collecting autoray>=0.6.11 (from pennylane)
  Downloading autoray-0.7.0-py3-none-any.whl.metadata (5.8 kB)
Collecting pennylane-lightning>=0.40 (from pennylane)
  Downloading PennyLane_Lightning-0.40.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (27 kB)
Collecting diastatic-malt (from pennylane)
  Downloading diastatic_malt

In [None]:
import pennylane as qml
import torch
import torch.nn as nn
from torch import cat, no_grad, manual_seed
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torch.optim import lr_scheduler
import torch.optim as optim
from torch.nn import (
    Module,
    Conv2d,
    Linear,
    Dropout2d,
    NLLLoss,
    MaxPool2d,
    Flatten,
    Sequential,
    ReLU,
)
import torch.nn.functional as F
import matplotlib.pyplot as plt

from qiskit import QuantumCircuit
from qiskit.circuit import Parameter
from qiskit.circuit.library import RealAmplitudes, ZZFeatureMap
from qiskit_machine_learning.utils import algorithm_globals
from qiskit_machine_learning.neural_networks import SamplerQNN, EstimatorQNN
from qiskit_machine_learning.connectors import TorchConnector
from qiskit.primitives import StatevectorEstimator as Estimator
import time
import os
import copy
import pennylane as qml
from pennylane import numpy as np
import torchvision
from tqdm.auto import tqdm

# MNIST with Qiskit

In [None]:
# Train Dataset
# -------------

# Set train shuffle seed (for reproducibility)
manual_seed(42)

batch_size = 1
n_samples = 100  # We will concentrate on the first 100 samples

# Use pre-defined torchvision function to load MNIST train data
X_train = datasets.MNIST(
    root="./data", train=True, download=True, transform=transforms.Compose([transforms.ToTensor()])
)

# Filter out labels (originally 0-9), leaving only labels 0 and 1
idx = np.append(
    np.where(X_train.targets == 0)[0][:n_samples], np.where(X_train.targets == 1)[0][:n_samples]
)
X_train.data = X_train.data[idx]
X_train.targets = X_train.targets[idx]

# Define torch dataloader with filtered data
train_loader = DataLoader(X_train, batch_size=batch_size, shuffle=True)

In [None]:
n_samples_show = 6

data_iter = iter(train_loader)
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

while n_samples_show > 0:
    images, targets = data_iter.__next__()

    axes[n_samples_show - 1].imshow(images[0, 0].numpy().squeeze(), cmap="gray")
    axes[n_samples_show - 1].set_xticks([])
    axes[n_samples_show - 1].set_yticks([])
    axes[n_samples_show - 1].set_title("Labeled: {}".format(targets[0].item()))

    n_samples_show -= 1

In [None]:
n_samples = 50

# Use pre-defined torchvision function to load MNIST test data
X_test = datasets.MNIST(
    root="./data", train=False, download=True, transform=transforms.Compose([transforms.ToTensor()])
)

# Filter out labels (originally 0-9), leaving only labels 0 and 1
idx = np.append(
    np.where(X_test.targets == 0)[0][:n_samples], np.where(X_test.targets == 1)[0][:n_samples]
)
X_test.data = X_test.data[idx]
X_test.targets = X_test.targets[idx]

# Define torch dataloader with filtered data
test_loader = DataLoader(X_test, batch_size=batch_size, shuffle=True)

In [None]:
estimator = Estimator()
def create_qnn():
    feature_map = ZZFeatureMap(2)
    ansatz = RealAmplitudes(2, reps=1)
    qc = QuantumCircuit(2)
    qc.compose(feature_map, inplace=True)
    qc.compose(ansatz, inplace=True)

    # REMEMBER TO SET input_gradients=True FOR ENABLING HYBRID GRADIENT BACKPROP
    qnn = EstimatorQNN(
        circuit=qc,
        input_params=feature_map.parameters,
        weight_params=ansatz.parameters,
        input_gradients=True,
        estimator=estimator,
    )
    return qnn


qnn4 = create_qnn()



In [None]:
class Net(Module):
    def __init__(self, qnn):
        super().__init__()
        self.conv1 = Conv2d(1, 2, kernel_size=5)
        self.conv2 = Conv2d(2, 16, kernel_size=5)
        self.dropout = Dropout2d()
        self.fc1 = Linear(256, 64)
        self.fc2 = Linear(64, 2)  # 2-dimensional input to QNN
        self.qnn = TorchConnector(qnn)  # Apply torch connector, weights chosen
        # uniformly at random from interval [-1,1].
        self.fc3 = Linear(1, 1)  # 1-dimensional output from QNN

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = x.view(x.shape[0], -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.qnn(x)  # apply QNN
        x = self.fc3(x)
        return cat((x, 1 - x), -1)


model4 = Net(qnn4)

In [None]:
optimizer = optim.Adam(model4.parameters(), lr=0.001)
loss_func = NLLLoss()

# Start training
epochs = 10  # Set number of epochs
loss_list = []  # Store loss history
model4.train()  # Set model to training mode

for epoch in range(epochs):
    total_loss = []
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad(set_to_none=True)  # Initialize gradient
        output = model4(data)  # Forward pass
        loss = loss_func(output, target)  # Calculate loss
        loss.backward()  # Backward pass
        optimizer.step()  # Optimize weights
        total_loss.append(loss.item())  # Store loss
    loss_list.append(sum(total_loss) / len(total_loss))
    print("Training [{:.0f}%]\tLoss: {:.4f}".format(100.0 * (epoch + 1) / epochs, loss_list[-1]))

In [None]:
torch.save(model4.state_dict(), "model4.pt")


In [None]:
qnn5 = create_qnn()
model5 = Net(qnn5)
model5.load_state_dict(torch.load("model4.pt"))

  model5.load_state_dict(torch.load("model4.pt"))


<All keys matched successfully>

In [None]:
model5.eval()  # set model to evaluation mode
with no_grad():

    correct = 0
    for batch_idx, (data, target) in enumerate(test_loader):
        output = model5(data)
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()

        loss = loss_func(output, target)
        total_loss.append(loss.item())

    print(
        "Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%".format(
            sum(total_loss) / len(total_loss), correct / len(test_loader) / batch_size * 100
        )
    )

In [None]:
n_samples_show = 6
count = 0
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

model5.eval()
with no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        if count == n_samples_show:
            break
        output = model5(data[0:1])
        if len(output.shape) == 1:
            output = output.reshape(1, *output.shape)

        pred = output.argmax(dim=1, keepdim=True)

        axes[count].imshow(data[0].numpy().squeeze(), cmap="gray")

        axes[count].set_xticks([])
        axes[count].set_yticks([])
        axes[count].set_title("Predicted {}".format(pred.item()))

        count += 1

# PennyLane Quantum Transfer Learning with ResNet18


In [None]:
#https://download.pytorch.org/tutorial/hymenoptera_data.zip

In [None]:
!unzip -q /content/hymenoptera_data.zip -d /content/

In [None]:
n_qubits = 4                # Number of qubits
step = 0.0004               # Learning rate
batch_size = 4              # Number of samples for each training step
num_epochs = 10           # Number of training epochs
q_depth = 6                 # Depth of the quantum circuit (number of variational layers)
gamma_lr_scheduler = 0.1    # Learning rate reduction applied every 10 epochs.
q_delta = 0.01              # Initial spread of random quantum weights
start_time = time.time()    # Start of the computation timer

In [None]:
dev = qml.device("default.qubit", wires=n_qubits)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
data_transforms = {
    "train": transforms.Compose(
        [
            # transforms.RandomResizedCrop(224),     # uncomment for data augmentation
            # transforms.RandomHorizontalFlip(),     # uncomment for data augmentation
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            # Normalize input channels using mean values and standard deviations of ImageNet.
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    ),
    "val": transforms.Compose(
        [
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        ]
    ),
}

data_dir = "/content/hymenoptera_data"
image_datasets = {
    x if x == "train" else "validation": datasets.ImageFolder(
        os.path.join(data_dir, x), data_transforms[x]
    )
    for x in ["train", "val"]
}
dataset_sizes = {x: len(image_datasets[x]) for x in ["train", "validation"]}
class_names = image_datasets["train"].classes

# Initialize dataloader
dataloaders = {
    x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)
    for x in ["train", "validation"]
}

# function to plot images
def imshow(inp, title=None):
    """Display image from tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    # Inverse of the initial normalization operation.
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)

In [None]:
# Get a batch of training data
inputs, classes = next(iter(dataloaders["validation"]))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

dataloaders = {
    x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)
    for x in ["train", "validation"]
}

In [None]:
# Get a batch of training data
inputs, classes = next(iter(dataloaders["validation"]))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out, title=[class_names[x] for x in classes])

dataloaders = {
    x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True)
    for x in ["train", "validation"]
}

In [None]:
def H_layer(nqubits):
    """Layer of single-qubit Hadamard gates.
    """
    for idx in range(nqubits):
        qml.Hadamard(wires=idx)


def RY_layer(w):
    """Layer of parametrized qubit rotations around the y axis.
    """
    for idx, element in enumerate(w):
        qml.RY(element, wires=idx)


def entangling_layer(nqubits):
    """Layer of CNOTs followed by another shifted layer of CNOT.
    """
    # In other words it should apply something like :
    # CNOT  CNOT  CNOT  CNOT...  CNOT
    #   CNOT  CNOT  CNOT...  CNOT
    for i in range(0, nqubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
        qml.CNOT(wires=[i, i + 1])

In [None]:
@qml.qnode(dev)
def quantum_net(q_input_features, q_weights_flat):
    """
    The variational quantum circuit.
    """

    # Reshape weights
    q_weights = q_weights_flat.reshape(q_depth, n_qubits)

    # Start from state |+> , unbiased w.r.t. |0> and |1>
    H_layer(n_qubits)

    # Embed features in the quantum node
    RY_layer(q_input_features)

    # Sequence of trainable variational layers
    for k in range(q_depth):
        entangling_layer(n_qubits)
        RY_layer(q_weights[k])

    # Expectation values in the Z basis
    exp_vals = [qml.expval(qml.PauliZ(position)) for position in range(n_qubits)]
    return tuple(exp_vals)

In [None]:
drawer = qml.draw(quantum_net)
print(drawer([0,0,0,0],np.array([0,0,0,0]*6)))


In [None]:
class DressedQuantumNet(nn.Module):
    """
    Torch module implementing the *dressed* quantum net.
    """

    def __init__(self):
        """
        Definition of the *dressed* layout.
        """

        super().__init__()
        self.pre_net = nn.Linear(512, n_qubits)
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        self.post_net = nn.Linear(n_qubits, 2)

    def forward(self, input_features):
        """
        Defining how tensors are supposed to move through the *dressed* quantum
        net.
        """

        # obtain the input features for the quantum circuit
        # by reducing the feature dimension from 512 to 4
        pre_out = self.pre_net(input_features)
        q_in = torch.tanh(pre_out) * np.pi / 2.0

        # Apply the quantum circuit to each element of the batch and append to q_out
        q_out = torch.Tensor(0, n_qubits)
        q_out = q_out.to(device)
        for elem in q_in:
            q_out_elem = torch.hstack(quantum_net(elem, self.q_params)).float().unsqueeze(0)
            q_out = torch.cat((q_out, q_out_elem))

        # return the two-dimensional prediction from the postprocessing layer
        return self.post_net(q_out)

In [None]:
weights = torchvision.models.ResNet18_Weights.IMAGENET1K_V1
model_hybrid = torchvision.models.resnet18(weights=weights)

for param in model_hybrid.parameters():
    param.requires_grad = False


# Notice that model_hybrid.fc is the last layer of ResNet18
model_hybrid.fc = DressedQuantumNet()

# Use CUDA or CPU according to the "device" object.
model_hybrid = model_hybrid.to(device)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
optimizer_hybrid = optim.Adam(model_hybrid.fc.parameters(), lr=step)

In [None]:
exp_lr_scheduler = lr_scheduler.StepLR(
    optimizer_hybrid, step_size=10, gamma=gamma_lr_scheduler
)

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 10000.0  # Large arbitrary number
    best_acc_train = 0.0
    best_loss_train = 10000.0  # Large arbitrary number
    print("Training started:")

    for epoch in range(num_epochs):

        # Each epoch has a training and validation phase
        for phase in ["train", "validation"]:
            if phase == "train":
                # Set model to training mode
                model.train()
            else:
                # Set model to evaluate mode
                model.eval()
            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            n_batches = dataset_sizes[phase] // batch_size
            it = 0
            for inputs, labels in dataloaders[phase]:
                since_batch = time.time()
                batch_size_ = len(inputs)
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()

                # Track/compute gradient and make an optimization step only when training
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # Print iteration results
                running_loss += loss.item() * batch_size_
                batch_corrects = torch.sum(preds == labels.data).item()
                running_corrects += batch_corrects
                print(
                    "Phase: {} Epoch: {}/{} Iter: {}/{} Batch time: {:.4f}".format(
                        phase,
                        epoch + 1,
                        num_epochs,
                        it + 1,
                        n_batches + 1,
                        time.time() - since_batch,
                    ),
                    end="\r",
                    flush=True,
                )
                it += 1

            # Print epoch results
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            print(
                "Phase: {} Epoch: {}/{} Loss: {:.4f} Acc: {:.4f}        ".format(
                    "train" if phase == "train" else "validation  ",
                    epoch + 1,
                    num_epochs,
                    epoch_loss,
                    epoch_acc,
                )
            )

            # Check if this is the best model wrt previous epochs
            if phase == "validation" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == "validation" and epoch_loss < best_loss:
                best_loss = epoch_loss
            if phase == "train" and epoch_acc > best_acc_train:
                best_acc_train = epoch_acc
            if phase == "train" and epoch_loss < best_loss_train:
                best_loss_train = epoch_loss

            # Update learning rate
            if phase == "train":
                scheduler.step()

    # Print final results
    model.load_state_dict(best_model_wts)
    time_elapsed = time.time() - since
    print(
        "Training completed in {:.0f}m {:.0f}s".format(time_elapsed // 60, time_elapsed % 60)
    )
    print("Best test loss: {:.4f} | Best test accuracy: {:.4f}".format(best_loss, best_acc))
    return model

In [None]:
model_hybrid = train_model(
    model_hybrid, criterion, optimizer_hybrid, exp_lr_scheduler, num_epochs=num_epochs
)

In [None]:
def visualize_model(model, num_images=6, fig_name="Predictions"):
    images_so_far = 0
    _fig = plt.figure(fig_name)
    model.eval()
    with torch.no_grad():
        for _i, (inputs, labels) in enumerate(dataloaders["validation"]):
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images // 2, 2, images_so_far)
                ax.axis("off")
                ax.set_title("[{}]".format(class_names[preds[j]]))
                imshow(inputs.cpu().data[j])
                if images_so_far == num_images:
                    return

In [None]:
visualize_model(model_hybrid, num_images=batch_size)
plt.show()

In [None]:
from torchvision.models import ResNet18_Weights
classic_model = torchvision.models.resnet18(ResNet18_Weights.IMAGENET1K_V1)
classic_model.fc = nn.Linear(512, 2)

In [None]:
import time
import torch
from tqdm import tqdm

def train_model(model_name,model, loaders, optimizer, criterion, device, scheduler, num_epochs, dataset_sizes, num_classes):
    model.to(device)
    since = time.time()
    best_test_acc = 0
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Initialize arrays to track true positives and total counts for each class
            true_positives = torch.zeros(num_classes).to(device)
            total_counts = torch.zeros(num_classes).to(device)

            # Iterate over data.
            for batch in tqdm(loaders[phase], desc=f"{phase.capitalize()} Phase Progress"):
                inputs, labels = batch
                inputs = inputs.to(device)
                labels = labels.to(device)
                # print(inputs.device,labels.device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    # print(outputs.logits.shape)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

                # Update true positives and total counts for each class
                for class_idx in range(num_classes):
                    true_positives[class_idx] += torch.sum((preds == class_idx) & (labels == class_idx))
                    total_counts[class_idx] += torch.sum(labels == class_idx)

            if phase == 'train':
                scheduler.step()

            # Calculate epoch loss and accuracy
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            if phase=="test":
              if epoch_acc > best_test_acc:
                best_test_acc = epoch_acc
                print("Saving model...")
                print(f"Best test acc: {best_test_acc}")
                torch.save(model.state_dict(), f"{epoch}_{model_name}")
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f} Time elapsed: {time.time() - since}')

            # Calculate and print class-wise accuracies
            class_accuracies = (true_positives / total_counts) * 100  # Convert to percentage
            for class_idx, class_acc in enumerate(class_accuracies):
                print(f"Class {class_idx} Accuracy: {class_acc:.2f}%")

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')


In [None]:
dataset_sizes["test"] = dataset_sizes["validation"]

In [None]:
dataset_sizes

{'train': 244, 'validation': 153, 'test': 153}

In [None]:
dataloaders["test"] = dataloaders["validation"]

In [None]:
classic_optimizer =optim.Adam(classic_model.parameters(),lr=step)

In [None]:
train_model("classic_model",classic_model,dataloaders,classic_optimizer,criterion,torch.device("cpu"),lr_scheduler.StepLR(
    classic_optimizer, step_size=10, gamma=gamma_lr_scheduler
),10,dataset_sizes,2)

# Pre-Training with a Quantum Autoencoder

In [None]:
class U3Rotation(nn.Module):
    def __init__(self,device):
        super().__init__()
        # Trainable parameters
        self.theta_0 = nn.Parameter(2*torch.pi* torch.rand(1))  # Initialize randomly
        self.phi_0= nn.Parameter(torch.pi*torch.rand(1))    # Initialize randomly
        self.theta_1 = nn.Parameter(2*torch.pi* torch.rand(1))  # Initialize randomly
        self.phi_1 = nn.Parameter(torch.pi*torch.rand(1))    # Initialize randomly
        self.theta_2 = nn.Parameter(2*torch.pi* torch.rand(1))  # Initialize randomly
        self.phi_2 = nn.Parameter(torch.pi*torch.rand(1))    # Initialize randomly
        self.theta_3 = nn.Parameter(2*torch.pi* torch.rand(1))  # Initialize randomly
        self.phi_3 = nn.Parameter(torch.pi*torch.rand(1))    # Initialize randomly
        self.thetas = [self.theta_0,self.theta_1,self.theta_2,self.theta_3]
        self.phis = [self.phi_0,self.phi_1,self.phi_2,self.phi_3]
        self.device = device
    def forward(self, input_vector):
        """
        input_vector: A 2D vector of shape (batch_size, 16) with complex values.
        """
        batch_dim = input_vector.shape[0]
        input_vector = torch.reshape(input_vector,(input_vector.shape[0],8,2))
        input_vector = input_vector[:,:4] + input_vector[:,4:]*1j
        norm = torch.norm(input_vector, dim=1, keepdim=True) + 1e-8
        input_vector = input_vector / norm

        cos_theta_2_0 = torch.cos(self.theta_0 / 2)
        sin_theta_2_0 = torch.sin(self.theta_0 / 2)
        cos_theta_2_1 = torch.cos(self.theta_1 / 2)
        sin_theta_2_1 = torch.sin(self.theta_1 / 2)
        cos_theta_2_2 = torch.cos(self.theta_2 / 2)
        sin_theta_2_2 = torch.sin(self.theta_2 / 2)
        cos_theta_2_3 = torch.cos(self.theta_3 / 2)
        sin_theta_2_3 = torch.sin(self.theta_3 / 2)

        exp_i_phi_0 = torch.exp(1j * self.phi_0)
        exp_i_phi_1 = torch.exp(1j * self.phi_1)
        exp_i_phi_2 = torch.exp(1j * self.phi_2)
        exp_i_phi_3 = torch.exp(1j * self.phi_3)

        rotation_matrix_0 = torch.tensor([
            [cos_theta_2_0,-1*sin_theta_2_0],
            [exp_i_phi_0*sin_theta_2_0,exp_i_phi_0*cos_theta_2_0]
        ],device = self.device)
        rotation_matrix_1 = torch.tensor([
            [cos_theta_2_1,-1*sin_theta_2_1],
            [exp_i_phi_1*sin_theta_2_1,exp_i_phi_1*cos_theta_2_1]
        ],device = self.device)
        rotation_matrix_2 = torch.tensor([
            [cos_theta_2_2,-1*sin_theta_2_2],
            [exp_i_phi_2*sin_theta_2_2,exp_i_phi_2*cos_theta_2_2]
        ],device = self.device)
        rotation_matrix_3 = torch.tensor([
            [cos_theta_2_3,-1*sin_theta_2_3],
            [exp_i_phi_3*sin_theta_2_3,exp_i_phi_3*cos_theta_2_3]
        ],device = self.device)

        if input_vector.dtype != torch.complex64 and input_vector.dtype != torch.complex128:
            input_vector = input_vector.to(torch.complex64)

        tupled = tuple([torch.einsum('bij,bj->bi', torch.stack((rotation_matrix_0,rotation_matrix_1,rotation_matrix_2,rotation_matrix_3)), sv) for sv in input_vector])
        output_vector = torch.stack(tupled)
        output_vector *=norm
        # print(output_vector.shape)
        real_output = torch.reshape(output_vector.real,(batch_dim,8))
        imag_output = torch.reshape(output_vector.imag,(batch_dim,8))
        output_vector = torch.cat((real_output,imag_output),dim=1)
        return output_vector
    def return_angles(self):
        return self.thetas,self.phis

In [None]:
tuple([i for i in range(9)])

(0, 1, 2, 3, 4, 5, 6, 7, 8)

In [None]:
rot = U3Rotation(torch.device("cpu"))

In [None]:
input = torch.randn(2,16)

In [None]:
x = rot(input)

In [None]:
input

tensor([[-0.1149,  0.1742, -1.2597,  0.5258, -1.4999, -0.8797, -0.9982,  1.0927,
         -1.2896,  0.0526, -0.5390,  0.0826,  2.1487,  1.4182, -0.2370, -0.6672],
        [-0.9309, -0.8238, -0.3336, -1.0988,  0.8605, -0.0605, -1.9280,  2.3108,
         -1.8872,  0.8579, -0.9389, -0.0957,  1.8503,  0.6923,  0.7441,  1.6514]])

In [None]:
x

tensor([[-0.2500, -0.0926, -0.3777,  0.0420,  1.9809, -0.3681, -0.5997, -0.6491,
         -1.1010,  0.4364,  0.0485, -0.9987, -3.0392,  0.2210,  0.9312, -1.0704],
        [-0.2322,  1.1596,  1.2659,  0.7505, -0.4377,  0.3533, -0.6290, -0.1927,
         -2.0926,  0.2569,  0.3980, -0.1828, -1.6915,  1.1202, -1.8656, -2.8893]])

In [None]:
x1 = torch.rand(2,2)
x2 = torch.rand(2,2)
x3 = torch.rand(2,2)
x4 = torch.rand(2,2)
x = torch.stack((x1,x2,x3,x4))

In [None]:
x.shape

torch.Size([4, 2, 2])

In [None]:
x

tensor([[[0.4099, 0.0721],
         [0.1560, 0.4884]],

        [[0.7130, 0.9966],
         [0.4297, 0.2063]],

        [[0.7581, 0.0249],
         [0.4092, 0.9722]],

        [[0.8924, 0.8152],
         [0.9548, 0.2036]]])

In [None]:
class QuantumAutoencoder(nn.Module):
    def __init__(self, device,input_dim):
        super().__init__()
        self.device = device
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 16),
            U3Rotation(self.device),
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )
    def forward(self,x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
class ClassicalAutoencoder(nn.Module):
  def __init__(self, device,input_dim):
        super().__init__()
        self.device = device
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 16),
        )
        self.decoder = nn.Sequential(
            nn.Linear(16, 128),
            nn.ReLU(),
            nn.Linear(128, input_dim)
        )
  def forward(self,x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor
train_dataset = FashionMNIST(root='./data', train=True, download=True, transform=ToTensor())
test_dataset = FashionMNIST(root='./data', train=False, download=True, transform=ToTensor())

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 26.4M/26.4M [00:01<00:00, 15.5MB/s]


Extracting ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 29.5k/29.5k [00:00<00:00, 303kB/s]


Extracting ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 4.42M/4.42M [00:00<00:00, 5.57MB/s]


Extracting ./data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz to ./data/FashionMNIST/raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 5.15k/5.15k [00:00<00:00, 21.7MB/s]

Extracting ./data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw






In [None]:
train_dataset[0][0].shape

torch.Size([1, 28, 28])

In [None]:
batch_size = 16

In [None]:
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
def train_autoencoder(model, train_dataloader,criterion, optimizer, num_epochs):
    model.to(model.device)
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        for batch in tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}"):
            inputs, _ = batch
            inputs = inputs.to(model.device)
            inputs = torch.reshape(inputs,(inputs.shape[0],-1))
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        epoch_loss = running_loss / len(train_dataloader.dataset)
        print(f"Epoch {epoch + 1}/{num_epochs} - Loss: {epoch_loss:.4f}")

In [None]:
img = torch.randn(2,28,28)

In [None]:
img.shape

torch.Size([2, 28, 28])

In [None]:
torch.reshape(img,(img.shape[0],-1)).shape

torch.Size([2, 784])

In [None]:
q_autoencoder = QuantumAutoencoder(torch.device("cuda:0"),28*28)

In [None]:
autoencoder_criterion = nn.MSELoss()
autoencoder_optimizer = optim.Adam(q_autoencoder.parameters(), lr=0.001)

In [None]:
train_autoencoder(q_autoencoder,train_dataloader,autoencoder_criterion,autoencoder_optimizer,10)

In [None]:
x = train_dataset[1][0].flatten().unsqueeze(0).to(torch.device("cuda:0"))
with torch.no_grad():
    outputs = q_autoencoder(x)

In [None]:
plt.imshow(x[0].cpu().reshape(28,28))

In [None]:
plt.imshow(outputs[0].cpu().reshape(28,28))

In [None]:
c_autoencoder = ClassicalAutoencoder(torch.device("cuda:0"),28*28)

In [None]:
c_criterion = nn.MSELoss()
c_optimizer = optim.Adam(c_autoencoder.parameters(), lr=0.001)

In [None]:
train_autoencoder(c_autoencoder,train_dataloader,c_criterion,c_optimizer,10)

In [None]:
q_autoencoder.encoder[-1].thetas

In [None]:
q_autoencoder.encoder[-1].phis

In [None]:
class Classifier(nn.Module):
    def __init__(self, device,input_dim, hidden_dim,num_classes):
        super().__init__()
        self.device = device
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, hidden_dim),
        )
        self.classifier = nn.Sequential(
            nn.Linear(hidden_dim, num_classes),
        )
    def forward(self,x):
        x = self.encoder(x)
        x = self.classifier(x)
        return x

In [None]:
def train_classifier_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 10000.0  # Large arbitrary number
    best_acc_train = 0.0
    best_loss_train = 10000.0  # Large arbitrary number
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    print("Training started:")

    for epoch in range(num_epochs):

        # Each epoch has a training and validation phase
        for phase in ["train", "test"]:
            if phase == "train":
                # Set model to training mode
                model.train()
            else:
                # Set model to evaluate mode
                model.eval()
            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            n_batches = dataset_sizes[phase] // batch_size
            it = 0
            for inputs, labels in dataloaders[phase]:
                since_batch = time.time()
                batch_size_ = len(inputs)
                inputs = inputs.to(device)
                inputs = torch.reshape(inputs,(inputs.shape[0],-1))
                labels = labels.to(device)
                optimizer.zero_grad()

                # Track/compute gradient and make an optimization step only when training
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # Print iteration results
                running_loss += loss.item() * batch_size_
                batch_corrects = torch.sum(preds == labels.data).item()
                running_corrects += batch_corrects
                print(
                    "Phase: {} Epoch: {}/{} Iter: {}/{} Batch time: {:.4f}".format(
                        phase,
                        epoch + 1,
                        num_epochs,
                        it + 1,
                        n_batches + 1,
                        time.time() - since_batch,
                    ),
                    end="\r",
                    flush=True,
                )
                it += 1

            # Print epoch results
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            print(
                "Phase: {} Epoch: {}/{} Loss: {:.4f} Acc: {:.4f}        ".format(
                    "train" if phase == "train" else "validation  ",
                    epoch + 1,
                    num_epochs,
                    epoch_loss,
                    epoch_acc,
                )
            )

            # Check if this is the best model wrt previous epochs
            if phase == "test" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == "test" and epoch_loss < best_loss:
                best_loss = epoch_loss
            if phase == "train" and epoch_acc > best_acc_train:
                best_acc_train = epoch_acc
            if phase == "train" and epoch_loss < best_loss_train:
                best_loss_train = epoch_loss

            # Update learning rate
            if phase == "train":
                scheduler.step()

    # Print final results
    model.load_state_dict(best_model_wts)
    time_elapsed = time.time() - since
    print(
        "Training completed in {:.0f}m {:.0f}s".format(time_elapsed // 60, time_elapsed % 60)
    )
    print("Best test loss: {:.4f} | Best test accuracy: {:.4f}".format(best_loss, best_acc))
    return model

In [None]:
dataloaders = {'train': train_dataloader, 'test': test_dataloader}
dataset_sizes = {'train': len(train_dataset), 'test': len(test_dataset)}

In [None]:
hybrid_classifier = Classifier(torch.device("cuda:0"),784,16,10)

In [None]:
hybrid_classifier.encoder = q_autoencoder.encoder

In [None]:
classifier_optimizer = optim.Adam(hybrid_classifier.parameters(), lr=0.001)
classifier_criterion = nn.CrossEntropyLoss()
classifier_scheduler = lr_scheduler.StepLR(classifier_optimizer, step_size=10, gamma=0.1)

In [None]:
train_classifier_model(hybrid_classifier,classifier_criterion,classifier_optimizer,classifier_scheduler,10)

In [None]:
classic_classifier = Classifier(torch.device("cuda:0"),784,16,10)
classic_optimizer = optim.Adam(classic_classifier.parameters(), lr=0.001)
classic_criterion = nn.CrossEntropyLoss()
classic_lr_scheduler = lr_scheduler.StepLR(classic_optimizer, step_size=10, gamma=0.1)

In [None]:
train_classifier_model(classic_classifier,classic_criterion,classic_optimizer,classic_lr_scheduler,10)