In [1]:
# Some parts of this code are based on the Python script:
# https://github.com/pytorch/tutorials/blob/master/beginner_source/transfer_learning_tutorial.py
# License: BSD

import time
import os
import copy

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F

import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, transforms
import torchvision.transforms as transforms

# Pennylane
import pennylane as qml
from pennylane import numpy as np

torch.manual_seed(42)
np.random.seed(42)

# Plotting
import matplotlib.pyplot as plt


import itertools

# OpenMP: number of parallel threads.
# os.environ["OMP_NUM_THREADS"] = "1"

In [2]:
torch.cuda.empty_cache()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [3]:
# Hyperparameters
batch_size = 64
learning_rate = 0.001
num_epochs = 1

# Data loading and preprocessing for CIFAR-10
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])



train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform, download=True)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, transform=transform, download=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)



class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # in[N, 3, 32, 32] => out[N, 16, 16, 16]
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=16,
                kernel_size=5,
                stride=1,
                padding=2
            ),
            nn.ReLU(True),
            nn.MaxPool2d(kernel_size=2)
        )
        # in[N, 16, 16, 16] => out[N, 32, 8, 8]
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, 5, 1, 2),
            nn.ReLU(True),
            nn.MaxPool2d(2)
        )
        # in[N, 32 * 8 * 8] => out[N, 128]
        self.fc1 = nn.Sequential(
            nn.Linear(32 * 8 * 8, 128),
            nn.ReLU(True)
        )
        # in[N, 128] => out[N, 64]
        self.fc2 = nn.Sequential(
            nn.Linear(128, 64),
            nn.ReLU(True)
        )
        # in[N, 64] => out[N, 10]
        self.out = nn.Linear(64, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1) # [N, 32 * 8 * 8]
        x = self.fc1(x)
        x = self.fc2(x)
        output = self.out(x)
        return output




# Instantiate the model, move it to GPU, and set up loss function and optimizer
model = SimpleCNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)




# Training loop
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images, labels = images.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

# Testing loop
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)  # Move data to GPU
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy on the test set: {(100 * correct / total):.2f}%")

Files already downloaded and verified
Files already downloaded and verified
Epoch [1/1], Step [100/782], Loss: 1.7140
Epoch [1/1], Step [200/782], Loss: 1.5788
Epoch [1/1], Step [300/782], Loss: 1.6066
Epoch [1/1], Step [400/782], Loss: 1.4730
Epoch [1/1], Step [500/782], Loss: 1.2113
Epoch [1/1], Step [600/782], Loss: 1.2181
Epoch [1/1], Step [700/782], Loss: 1.3376
Accuracy on the test set: 56.32%


In [4]:

# NN weights

numpy_weights = {}
nw_list = [] 
nw_list_normal = []
for name, param in model.state_dict().items():
    numpy_weights[name] = param.cpu().numpy()
for i in numpy_weights:
    nw_list.append(list(numpy_weights[i].flatten()))
for i in nw_list:
    for j in i:
        nw_list_normal.append(j)
print("# of NN parameters: ", len(nw_list_normal))
n_qubits = int(np.ceil(np.log2(len(nw_list_normal))))
print("Required qubit number: ", n_qubits)


# of NN parameters:  285226
Required qubit number:  19


In [5]:

# dev = qml.device("default.qubit", wires=n_qubits)
dev = qml.device("lightning.gpu", wires=n_qubits, batch_obs=True)

n_qubit = n_qubits

def H_layer(nqubits):
    """Layer of single-qubit Hadamard gates.
    """
    for idx in range(nqubits):
        qml.Hadamard(wires=idx)

def RY_layer(w):
    """Layer of parametrized qubit rotations around the y axis.
    """
    for idx, element in enumerate(w):
        qml.RY(element, wires=idx)

def RZ_layer(w):
    """Layer of parametrized qubit rotations around the y axis.
    """
    for idx, element in enumerate(w):
        qml.RZ(element, wires=idx)
        
def entangling_layer(nqubits):
    """Layer of CNOTs followed by another shifted layer of CNOT.
    """
    # In other words it should apply something like :
    # CNOT  CNOT  CNOT  CNOT...  CNOT
    #   CNOT  CNOT  CNOT...  CNOT
    for i in range(0, nqubits - 1, 2):  # Loop over even indices: i=0,2,...N-2
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Loop over odd indices:  i=1,3,...N-3
        qml.CNOT(wires=[i, i + 1])

In [6]:
### Some tool function definition ###########

def probs_to_weights(probs_):
    
    new_state_dict = {}
    data_iterator = probs_.view(-1)

    for name, param in SimpleCNN().state_dict().items():
        shape = param.shape
        num_elements = param.numel()
        chunk = data_iterator[:num_elements].reshape(shape)
        new_state_dict[name] = chunk
        data_iterator = data_iterator[num_elements:]
        
    return new_state_dict

def generate_qubit_states_torch(n_qubit):
    # Create a tensor of shape (2**n_qubit, n_qubit) with all possible combinations of 0 and 1
    all_states = torch.cartesian_prod(*[torch.tensor([-1, 1]) for _ in range(n_qubit)])
    return all_states

####################


# @qml.qnode(dev, diff_method="spsa")
# def quantum_net(q_weights_flat):
#     """
#     The variational quantum circuit.
#     """

#     # Reshape weights
#     q_weights = q_weights_flat.reshape(q_depth, n_qubits)

#     # Start from state |+> , unbiased w.r.t. |0> and |1>
#     H_layer(n_qubits)
#     # Repeated layer
#     for i in range(q_depth):
        
#         # Parameterised layer
#         if i%2 == 0:
#             for y in range(n_qubits):
#                 qml.RY(q_weights[i][y], wires=y)
#         else:
#             for z in range(n_qubits):
#                 qml.RZ(q_weights[i][z], wires=z)

#         # Control Z gates
#         for y in range(n_qubits - 1):
#             qml.CZ(wires=[y, y + 1])
    
    
    
#     probs_ = qml.probs(wires=list(range(n_qubits)))
    
#     return probs_

@qml.qnode(dev, diff_method="spsa")
# @qml.qnode(dev, diff_method="parameter-shift")

def quantum_net(q_weights_flat):
    """
    The variational quantum circuit.
    """
    # Reshape weights
    q_weights = q_weights_flat.reshape(q_depth, n_qubits)
    H_layer(n_qubits)
    # Repeated layer
    for i in range(q_depth):
        # Parameterised layer
        if i%2 == 0:
            for y in range(n_qubits):
                qml.RY(q_weights[i][y], wires=y)
        else:
            for z in range(n_qubits):
                qml.RZ(q_weights[i][z], wires=z)
        for y in range(n_qubits - 1):
            qml.CZ(wires=[y, y + 1])
    

    
    # state_mag = qml.probs(wires=list(range(n_qubits)))

    return qml.probs(wires=list(range(n_qubits)))#x_

In [7]:

class LewHybridNN(nn.Module):
    """
    Torch module implementing full quantum net.
    """

    class MappingModel(nn.Module):
        def __init__(self, input_size, hidden_sizes, output_size):
            super().__init__()
            # Initialize layers: an input layer, multiple hidden layers, and an output layer
            self.input_layer = nn.Linear(input_size, hidden_sizes[0])
            self.hidden_layers = nn.ModuleList([nn.Linear(hidden_sizes[i], hidden_sizes[i+1]) for i in range(len(hidden_sizes)-1)])
            self.output_layer = nn.Linear(hidden_sizes[-1], output_size)
            
        def forward(self, X):
            X = X.type_as(self.input_layer.weight)
            X = self.input_layer(X)
            for hidden in self.hidden_layers:
                X = hidden(X)
            output = self.output_layer(X)
            return output
        
    def __init__(self):

        super().__init__()
        self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
        # self.simple_cnn = SimpleCNN()
        self.MappingNetwork = self.MappingModel(n_qubit+1, [40, 200, 40], 1).to(device)

    
    def forward(self, x):
        """
        Defining how tensors are supposed to move through the *dressed* quantum
        net.
        """
        device = x.device
        self.q_params.requires_grad = True
        
        easy_scale_coeff = 2**(n_qubit-1)
        gamma = 0.1
        beta  = 0.8
        alpha = 0.3
            
        probs_ = quantum_net(self.q_params)
        probs_ = probs_[:len(nw_list_normal)]
        x_ = torch.abs(probs_) ** 2
        x_ = (beta*torch.tanh(gamma*easy_scale_coeff*x_))**(alpha) 
        x_ = x_ - torch.mean(x_)
        x_.to(device)
        
        probs_ = x_ 
        # print(probs_)
        
        
        # Generate qubit states using PyTorch
        qubit_states_torch = generate_qubit_states_torch(n_qubit)[:len(nw_list_normal)]
        qubit_states_torch = qubit_states_torch.to(device)

        # Combine qubit states with probability values using PyTorch
        # combined_data_torch = torch.cat((qubit_states_torch, probs_.unsqueeze(1)), dim=1)
        # print("probs_:", probs_)
        # print("qubit_states_torch:", qubit_states_torch)
        combined_data_torch = torch.cat((qubit_states_torch, probs_.unsqueeze(1)), dim=1)
        # print("combined_data_torch:", combined_data_torch)
        # input_size = combined_data_torch.size(1)

        prob_val_post_processed = self.MappingNetwork(combined_data_torch)

        state_dict = probs_to_weights(prob_val_post_processed)

        ######## 
            
        
        dtype = torch.float32  # Ensure all tensors are of this type

        # Convolution 1
        weight = state_dict['conv1.0.weight'].to(device).type(dtype)
        bias = state_dict['conv1.0.bias'].to(device).type(dtype)

        
        x = F.conv2d(x, weight, bias, stride=1, padding=2)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)

        # Convolution 2
        weight = state_dict['conv2.0.weight'].to(device).type(dtype)
        bias = state_dict['conv2.0.bias'].to(device).type(dtype)
        x = F.conv2d(x, weight, bias, stride=1, padding=2)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)

        # Flatten
        x = x.view(x.size(0), -1)

        # Fully connected 1
        weight = state_dict['fc1.0.weight'].to(device).type(dtype)
        bias = state_dict['fc1.0.bias'].to(device).type(dtype)
        x = F.linear(x, weight, bias)
        x = F.relu(x)

        # Fully connected 2
        weight = state_dict['fc2.0.weight'].to(device).type(dtype)
        bias = state_dict['fc2.0.bias'].to(device).type(dtype)
        x = F.linear(x, weight, bias)
        x = F.relu(x)

        # Output layer
        weight = state_dict['out.weight'].to(device).type(dtype)
        bias = state_dict['out.bias'].to(device).type(dtype)
        x = F.linear(x, weight, bias)

    
        return x #self.simple_cnn(x)

In [8]:

# # Define the neural network using nn.Module
# class MappingModel(nn.Module):
#     def __init__(self, input_size, hidden_sizes, output_size):
#         super(MappingModel, self).__init__()
#         # Initialize layers: an input layer, multiple hidden layers, and an output layer
#         self.input_layer = nn.Linear(input_size, hidden_sizes[0])
#         self.batch_norms_input = nn.BatchNorm1d(hidden_sizes[0])
#         self.hidden_layers = nn.ModuleList()
#         self.batch_norms = nn.ModuleList()

#         for i in range(len(hidden_sizes) - 1):
#             self.hidden_layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))
#             self.batch_norms.append(nn.BatchNorm1d(hidden_sizes[i+1]))

#         self.output_layer = nn.Linear(hidden_sizes[-1], output_size)
        
#     def forward(self, X):
#         # Ensure the input tensor is the same type as the weights
#         X = X.type_as(self.input_layer.weight)

#         # Input layer with batch normalization and ReLU activation
#         X = self.input_layer(X)
#         X = self.batch_norms_input(X)
#         # X = F.relu(X)

#         # Hidden layers with batch normalization and ReLU activation
#         for hidden, batch_norm in zip(self.hidden_layers, self.batch_norms):
#             X = hidden(X)
#             X = batch_norm(X)
#             # X = F.relu(X)

#         # Output layer with linear activation
#         output = self.output_layer(X)
        
#         return output




# class LewHybridNN(nn.Module):
#     """
#     Torch module implementing full quantum net.
#     """

#     def __init__(self):

#         super().__init__()
#         self.q_params = nn.Parameter(q_delta * torch.randn(q_depth * n_qubits))
#         # self.simple_cnn = SimpleCNN()
#         self.MappingNetwork = MappingModel(n_qubit+1, [150, 50, 30, 20], 1)

    
#     def forward(self, x):
#         """
#         Defining how tensors are supposed to move through the *dressed* quantum
#         net.
#         """
#         device = x.device
#         self.q_params.requires_grad = True

            
#         probs_ = quantum_net(self.q_params)
#         probs_ = probs_[:len(nw_list_normal)]
#         # print(probs_)
#         # Generate qubit states using PyTorch
#         qubit_states_torch = generate_qubit_states_torch(n_qubit)[:len(nw_list_normal)]
#         qubit_states_torch = qubit_states_torch.to(device)

#         # Combine qubit states with probability values using PyTorch
#         combined_data_torch = torch.cat((qubit_states_torch, probs_.unsqueeze(1)), dim=1)
#         # input_size = combined_data_torch.size(1)

#         self.MappingNetwork.to(device)
#         prob_val_post_processed = self.MappingNetwork(combined_data_torch)

#         state_dict = probs_to_weights(prob_val_post_processed)

#         ######## 
            
        
#         dtype = torch.float32  # Ensure all tensors are of this type

#         # Convolution 1
#         weight = state_dict['conv1.0.weight'].to(device).type(dtype)
#         bias = state_dict['conv1.0.bias'].to(device).type(dtype)

        
#         x = F.conv2d(x, weight, bias, stride=1, padding=2)
#         x = F.relu(x)
#         x = F.max_pool2d(x, kernel_size=2)

#         # Convolution 2
#         weight = state_dict['conv2.0.weight'].to(device).type(dtype)
#         bias = state_dict['conv2.0.bias'].to(device).type(dtype)
#         x = F.conv2d(x, weight, bias, stride=1, padding=2)
#         x = F.relu(x)
#         x = F.max_pool2d(x, kernel_size=2)

#         # Flatten
#         x = x.view(x.size(0), -1)

#         # Fully connected 1
#         weight = state_dict['fc1.0.weight'].to(device).type(dtype)
#         bias = state_dict['fc1.0.bias'].to(device).type(dtype)
#         x = F.linear(x, weight, bias)
#         x = F.relu(x)

#         # Fully connected 2
#         weight = state_dict['fc2.0.weight'].to(device).type(dtype)
#         bias = state_dict['fc2.0.bias'].to(device).type(dtype)
#         x = F.linear(x, weight, bias)
#         x = F.relu(x)

#         # Output layer
#         weight = state_dict['out.weight'].to(device).type(dtype)
#         bias = state_dict['out.bias'].to(device).type(dtype)
#         x = F.linear(x, weight, bias)

    
#         return x #self.simple_cnn(x)

In [9]:
step = 0.0004               # Learning rate
batch_size = 64             # Number of samples for each training step
num_epochs = 10             # Number of training epochs
q_depth = 50                 # Depth of the quantum circuit (number of variational layers)
gamma_lr_scheduler = 0.1    # Learning rate reduction applied every 10 epochs.
q_delta = 0.01              # Initial spread of random quantum weights



# Instantiate the model, move it to GPU, and set up loss function and optimizer
model = LewHybridNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=step)

# exp_lr_scheduler = lr_scheduler.StepLR(
#     optimizer, step_size=10, gamma=gamma_lr_scheduler
# )


num_trainable_params_QNN = sum(p.numel() for p in LewHybridNN.MappingModel(n_qubit+1,  [40, 200, 40], 1).parameters() if p.requires_grad)

num_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("# of trainable parameter in Mapping model: ", num_trainable_params_QNN)
print("# of trainable parameter in QNN model: ", num_trainable_params - num_trainable_params_QNN)
print("# of trainable parameter in full model: ", num_trainable_params)

# of trainable parameter in Mapping model:  17121
# of trainable parameter in QNN model:  950
# of trainable parameter in full model:  18071


In [10]:


# Training loop
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        since_batch = time.time()
        
        images, labels = images.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # Compute loss
        loss = criterion(outputs, labels)
        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # if (i+1) % 100 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}, batch time: {time.time() - since_batch}")

Epoch [1/10], Step [1/782], Loss: 78.0615, batch time: 7.080328941345215
Epoch [1/10], Step [2/782], Loss: 2.3047, batch time: 6.704021692276001
Epoch [1/10], Step [3/782], Loss: 2.3002, batch time: 6.758169412612915
Epoch [1/10], Step [4/782], Loss: 2.2994, batch time: 6.705949544906616
Epoch [1/10], Step [5/782], Loss: 2.3043, batch time: 6.787644624710083
Epoch [1/10], Step [6/782], Loss: 2.3020, batch time: 6.7213218212127686
Epoch [1/10], Step [7/782], Loss: 2.2947, batch time: 6.813401222229004
Epoch [1/10], Step [8/782], Loss: 2.3051, batch time: 6.7331461906433105
Epoch [1/10], Step [9/782], Loss: 2.3060, batch time: 6.783542633056641
Epoch [1/10], Step [10/782], Loss: 2.3049, batch time: 6.798852920532227
Epoch [1/10], Step [11/782], Loss: 2.2987, batch time: 6.725883960723877
Epoch [1/10], Step [12/782], Loss: 2.3036, batch time: 6.743467569351196
Epoch [1/10], Step [13/782], Loss: 2.3085, batch time: 6.7090160846710205
Epoch [1/10], Step [14/782], Loss: 2.3030, batch time: 6

KeyboardInterrupt: 

In [None]:
# Testing loop
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)  # Move data to GPU
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy on the test set: {(100 * correct / total):.2f}%")