In [2]:
from utils.dataloader  import load_data
import torch
from utils.confLoader import *
import matplotlib.pyplot as plt
import torch.nn as nn
import qsimcirq
import time
import cirq
from math import pi
import torch.optim as optim
import torch.nn.functional as F
import os

In [3]:
train_on_gpu = torch.cuda.is_available()
if not train_on_gpu:
    print('CUDA is not available. Training on CPU...')
    device = torch.device('cpu')
else:
    print('CUDA is available. Training on GPU...')
    device = torch.device('cuda:0')
# device = torch.device('cpu')

CUDA is available. Training on GPU...


In [4]:
loader = load_data(train_labels_path, test_labels_path, train_image_path, test_image_path, columns, itype = '.jpg', batch_size = 4, shuffle=True, do_random_crop = False, device = 'cpu')
train_loader, test_loader, valid_loader = loader.create_loader()

In [5]:
def display(image_tensor, output_dir, file_name,  save = False, detach=False):
    # Assuming image_tensor is of shape [1, 3, 224, 224]
    assert len(image_tensor.shape) == 4, "Invalid input shape"
    
    if save and  not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    if detach:
        image_np = image_tensor.detach().cpu().numpy()[0]  # Convert to NumPy array and remove singleton dimensions
    else:
        image_np = image_tensor.squeeze().detach().numpy()

    num_layers = image_np.shape[0]  # Number of layers (channels)
    num_rows = (num_layers + 2) // 3  # Calculate the number of rows needed

    # Create a grid of subplots for each layer
    fig, axes = plt.subplots(num_rows, 3, figsize=(15, 5 * num_rows))

    # Flatten axes if we have just one row
    if num_rows == 1:
        axes = axes.reshape(1, -1)

    # Display each layer as a subplot
    for i in range(num_layers):
        row = i // 3
        col = i % 3
        axes[row, col].imshow(image_np[i], cmap = "gray")
        axes[row, col].set_title(f"Layer {i + 1}")
        axes[row, col].axis('off')  # Turn off axis ticks and labels
            
            
    if save:
        img_filename = os.path.join(output_dir, f"{file_name}.png")
        plt.savefig(img_filename, bbox_inches='tight')
        plt.close(fig)
        return
    plt.tight_layout()
    plt.show()

In [6]:
def loading_bar( current_value, total_value, bar_length=40):
    progress = min(1.0, current_value / total_value)
    arrow = '■' * int(progress * bar_length)
    spaces = ' ' * (bar_length - len(arrow))
    print(f'\r[{arrow}{spaces}] {int(progress * 100)}%', end='', flush=True)

In [7]:
def kernel3(P, weight):
    circuit = cirq.Circuit()
    # bias addition kernel
    Q = [cirq.GridQubit(i,0) for i in range(4)]
    W = cirq.GridQubit(0,1)
    keys = ["q"]
    for i in range(4):
        circuit.append(cirq.ry(P[i] * pi).on(Q[i]))
    
    for i in range(4):
        circuit.append(cirq.rx(weight[i]*pi).on(W))
        circuit.append(cirq.CNOT(W, Q[i]))

    circuit.append(cirq.measure(Q[3], key=keys[0]))
    
    return circuit, keys

In [8]:
def kernel4(P, weight):
    circuit = cirq.Circuit()
    # bias addition kernel
    Q = [cirq.GridQubit(i,0) for i in range(4)]
    W = cirq.GridQubit(0,1)
    R = cirq.GridQubit(1,1)
    keys = ["q"]
    for i in range(4):
        circuit.append(cirq.ry(P[i] * pi).on(Q[i]))
    
    for i in range(4):
        circuit.append(cirq.rx(weight[i]*pi).on(W))
        circuit.append(cirq.CCNOT(W, Q[i], R))

    circuit.append(cirq.measure(R, key=keys[0]))
    
    return circuit, keys

In [10]:

# from models.circuits.ConvKernel import kernel2, kernel3


class Quanv2D(nn.Module):
    def __init__(
        self,
        simulator,
        in_channels=1,
        out_channels=1,
        kernel_size=2,
        stride=1,
        precision=10,
    ):
        super().__init__()
        self.simulator = simulator
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.stride = stride
        self.kernel = kernel4
        self.precision = precision
        self.extension_factor = self.out_channels // self.in_channels
        self.weight = nn.Parameter(
            torch.rand((out_channels, self.kernel_size * self.kernel_size)),
            requires_grad=True,
        )

    def __repr__(self):
        return f"Quanv2D( {self.in_channels}, {self.out_channels}, kernel =({self.kernel_size},{self.kernel_size}) "\
            f"stride ={self.stride, self.stride}, precision={self.precision} )"

    def patches_generator(self, image):
        image_height, image_width = image.size()
        output_height = (image_height - self.kernel_size) // self.stride + 1
        output_width = (image_width - self.kernel_size) // self.stride + 1

        for h in range(output_height):
            for w in range(output_width):
                patch = image[
                    h * self.stride : (h * self.stride + self.kernel_size),
                    w * self.stride : (w * self.stride + self.kernel_size),
                ]
                patch = patch.reshape(self.kernel_size * self.kernel_size).tolist()
                yield patch, h, w

    def forward(self, x):
        self.inputs = x
        print(x.requires_grad)
        kernel_height, kernel_width = (2, 2)
        count, in_channels, image_height, image_width = x.size()
        expansion = self.in_channels * 4 == self.out_channels
        output_height = (image_height - kernel_height) // self.stride + 1
        output_weidth = (image_width - kernel_width) // self.stride + 1
        result = torch.zeros(count, self.out_channels, output_height, output_weidth)
        for c in range(count):
            print("c = ", c)
            for ic in range(in_channels):
                print("ic = ", ic)
                for patch, h, w in self.patches_generator(x[c][ic]):
                    for t in range(self.extension_factor):
                        W = [
                            w.item()
                            for w in self.weight[ic * self.extension_factor + t]
                        ]
                        circuit, keys = self.kernel(patch, W)
                        res = self.simulator.run(circuit, repetitions=self.precision)
                        result[c][ic * self.extension_factor + t][h][w] = (
                            res.histogram(key="q")[1] * 0.1
                        )
        result.requires_grad = True
        return result

    

In [11]:
class QuantamModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.simulator = qsimcirq.QSimSimulator()
        self.conv1 = nn.Conv2d( in_channels = 3, out_channels = 12, kernel_size = 2, stride = 1 )
        self.conv2 = nn.Conv2d( in_channels = 12, out_channels = 48, kernel_size = 2, stride = 1 )
        self.conv3 = nn.Conv2d( in_channels= 48, out_channels= 48, kernel_size= 2, stride= 2)
        self.convq = Quanv2D(self.simulator, in_channels = 48, out_channels = 48, kernel_size = 2, stride = 1)
        self.convq2 = Quanv2D(self.simulator, in_channels = 48, out_channels = 96, kernel_size = 2, stride = 1)
        
        # self.qunv = Quanv2D
        
        self.fc1 = nn.Linear(96*12*12, 50)
        self.fc2 = nn.Linear(50, 5)
        self.maxpool = nn.MaxPool2d(2, stride = 2)
        self.minpool = nn.AvgPool2d(2, stride = 2)
        self.activation = nn.SiLU()
        self.activation2 = nn.ReLU()
        
    def forward(self, x):
        # print(x.size())
        x = self.activation(self.maxpool(self.conv1(x))) # 12x111x111
        # print(x.size())
        x = torch.relu(self.maxpool(self.conv2(x))) # 48x55x55
        # print(x.size())
        x = self.activation(self.conv3(x)) # 48x27x27
        # print(x.size())
        x = self.activation2(self.maxpool(self.convq(x))) # 48x13x13
        x = self.activation(self.convq2(x)) # 48x4x4
        # print(x.size())
        x = x.view(-1, 96*12*12)
        # print(x.size())
        x = torch.relu(self.fc1(x))
        # print(x.size())
        x = torch.relu(self.fc2(x))
        # print(x.size())
        return x

In [12]:
class QClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.simulator = qsimcirq.QSimSimulator()
        self.conv2d_64 = nn.Conv2d(1, 64, kernel_size= (2,2), stride = (1,1))
        self.conv2d_64_128 = nn.Conv2d(64, 128, kernel_size= (2,2), stride = (1,1))
        self.conv2d_128_256 = nn.Conv2d(128, 256, kernel_size= (2,2), stride = (1,1))
        self.conv2d_256_256 = nn.Conv2d(256, 256, kernel_size= (2,2), stride = (1,1))
        self.conv2d_256_512 = nn.Conv2d(256, 512, kernel_size= (2,2), stride = (2,2))
        self.conv2d_256_128 = nn.Conv2d(256, 128, kernel_size= (2,2), stride = (1,1))
        
        self.convq_128_128 = Quanv2D(self.simulator, in_channels = 128, out_channels = 128, kernel_size = 2, stride = 1)
        self.convq_128_256 = Quanv2D(self.simulator, in_channels = 128, out_channels = 256, kernel_size = 2, stride = 1)
        self.convq_3_3 = Quanv2D(self.simulator, in_channels = 3, out_channels = 3, kernel_size = 2, stride = 1)
        self.batch_normal_64 = nn.BatchNorm2d(64)
        self.silu = nn.SiLU(inplace = True)
        
        self.adpt = nn.AdaptiveMaxPool2d((20,20))
        self.max_pooling = nn.MaxPool2d(kernel_size = 2, stride = 2, padding = 0, ceil_mode=False, dilation = 1,)
        self.lp_pooling = nn.LPPool2d(kernel_size = 2, stride = 2,  ceil_mode=False, norm_type = 2)
        # self.conv_classical = nn.Sequential(
        #     nn.Conv2d(3, 64, kernel_size= (2,2), stride = (1,1)),
        #     nn.BatchNorm2d(64),
        #     nn.SiLU(inplace = True),
        #     # nn.Conv2d(64, 128, kernel_size= (3,3), stride = (1,1), padding = (1,1)),
        #     # nn.LeakyReLU(inplace = True),
        #     # nn.MaxPool2d(kernel_size = 2, stride = 2, padding = 0, dilation = 1, ceil_mode=False),
            
        #     # nn.Conv2d(128, 128, kernel_size=(2,2), stride=(1, 1), padding=(1, 1)),
        #     # nn.ReLU(inplace=True),

        #     # nn.MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=True),
        # )
        
    def forward(self, x):
        x = self.conv2d_64(x)
        display(x, "images","conv2d_64_l1", False, True)
        # x = self.silu(x)
        # display(x,"images","silu_l3", True, True)
        # x = self.conv2d_64_128(x)
        # display(x,"images","conv2d_64_128", True, True)
        
        # x = self.lp_pooling(x)
        # display(x,"images","lppool", True, True)
        
        # x = self.conv2d_128_256(x)
        # display(x, "images","conv2d_128_256", True, True)
        
        # x = self.conv2d_256_256(x)
        # display(x, "images","conv2d_256_256", True, True)
        
        # x = self.max_pooling(x)
        # display(x,"images","maxpool", True, True)
        
        # x = self.conv2d_256_128(x)
        # display(x, "images","conv2d_256_128", True, True)
        
        # x = self.convq_128_128(x)
        # display(x, "images","quanv2d_128_128", True, True)
        
        # x = self.convq_128_256(x)
        # display(x, "images","quanv2d_128_256", True, True)
        
        # z = self.max_pooling(x)
        # display(z,"images","maxpool", True, True)
        
        # y = self.lp_pooling(x)
        # display(y,"images","lppool", True, True)
        
        # x = self.adpt(x)
        # display(x,"images","adptpool", True, True)
        # x = self.convq_3_3(x)
        # display(x,"images","convq", False, True)
        
        return x
        

In [17]:
class QClassifier(nn.Module):
    def __init__(self, testing = False):
        super().__init__()
        self.testing = testing
        self.simulator = qsimcirq.QSimSimulator()
        self.classical_expansion = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size= (2,2), stride = (1,1), padding = (1,1)),
            nn.SiLU(inplace = True),
            nn.Conv2d(64, 128, kernel_size= (2,2), stride = (1,1), padding = (1,1)),
            nn.LPPool2d(kernel_size = 2, stride = 2,  ceil_mode=False, norm_type = 2),
            
            nn.Conv2d(128, 32, kernel_size= (2,2), stride = (1,1), padding = (1,1)),
            nn.MaxPool2d(kernel_size = 2, stride = 2, padding = 0, ceil_mode=False, dilation = 1,)
        )
        
        self.quantam_expansion = nn.Sequential(
            Quanv2D(self.simulator, in_channels = 32, out_channels = 32, kernel_size = 2, stride = 1),
            nn.AdaptiveMaxPool2d((20,20)),
            Quanv2D(self.simulator, in_channels = 32, out_channels = 32, kernel_size = 2, stride = 1),
            nn.MaxPool2d(kernel_size = 2, stride = 2, padding = 0, ceil_mode=False, dilation = 1,)
        )
        
        self.fully_connected = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=32*9*9, out_features=4096, bias=True),
            nn.SiLU(inplace = True),
            nn.Dropout(p=0.5, inplace=False),
            nn.Linear(in_features=4096, out_features=1000, bias=True),
            nn.ReLU(inplace=True),
            nn.Linear(in_features=1000, out_features=5, bias=True),
        )

        
    def forward(self, x):
        x = self.classical_expansion(x)
        # if self.testing: 
            # display(x, "images","classical_expansion", True, True)
            
        x = self.quantam_expansion(x)
        # if self.testing: 
            # display(x, "images","quantam_expansion", True, True)
        # print(x)
        x = x.to(device)
        x = self.fully_connected(x)
        return x

In [18]:

# simulator = qsimcirq.QSimSimulator()
# model2 = Quanv2D(simulator, in_channels = 3, out_channels = 6, kernel_size = 2, stride = 2)
model2 = QClassifier()
model2 = model2.to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

In [19]:
outputs2 = None
outputs = None
for i, data in enumerate(test_loader):
    inputs, labels = data['image'], data['label'].view(1,-1)
    inputs.requires_grad = True
    inputs, labels = inputs.to(device), labels.to(device)
    print(inputs.size())
    # display(inputs)
    start = time.time()
    outputs2 = model2(inputs)
    print(outputs2.size())
    delta = time.time() - start
    print(f"Finished training in {delta:0.6f} sec")
    break
# display(outputs2, True)

RuntimeError: Caught RuntimeError in DataLoader worker process 0.
Original Traceback (most recent call last):
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torch/utils/data/_utils/worker.py", line 308, in _worker_loop
    data = fetcher.fetch(index)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 51, in fetch
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torch/utils/data/_utils/fetch.py", line 51, in <listcomp>
    data = [self.dataset[idx] for idx in possibly_batched_index]
  File "/home/rupax/github/GridProjectHealth-Care/utils/preprocessing.py", line 164, in __getitem__
    image = self.transform(image)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torchvision/transforms/transforms.py", line 95, in __call__
    img = t(img)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1501, in _call_impl
    return forward_call(*args, **kwargs)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torchvision/transforms/transforms.py", line 277, in forward
    return F.normalize(tensor, self.mean, self.std, self.inplace)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torchvision/transforms/functional.py", line 363, in normalize
    return F_t.normalize(tensor, mean=mean, std=std, inplace=inplace)
  File "/home/rupax/github/GridProjectHealth-Care/venv/lib/python3.10/site-packages/torchvision/transforms/_functional_tensor.py", line 928, in normalize
    return tensor.sub_(mean).div_(std)
RuntimeError: output with shape [1, 28, 28] doesn't match the broadcast shape [3, 28, 28]


In [None]:
# total_loss = 0
# start_time = time.time()
# for epoch in range(1):
#     print(f"Running Epoch no: {epoch+1}")
#     for i, data in enumerate(test_loader):
#         torch.autograd.set_detect_anomaly(True)
#         inputs, labels = data['image'], data['label']
#         print(inputs.shape, labels)
#         optimizer.zero_grad()
#         outputs = model(inputs)
#         # print(inputs.size(), outputs.size(), labels.size())
#         print(outputs)
#         loss = criterion(outputs, labels)
#         print("loss :",loss)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()
#         loading_bar(i, 24)
#         # break
# delta = time.time() - start_time
# print(f"Finished training in {delta:0.6f} sec")
# print("Total Loss : ", total_loss)

In [None]:
# model.zero_grad()
# # loss = model.backward(outputs2)
# loss = outputs2 - outputs  # Replace with your actual loss computation
# # loss_scalar = loss.mean()
# print(loss)
# loss.backward()
# # loss.backward()

In [None]:
# # define two tensors
# A = torch.tensor(2., requires_grad=True)
# print("Tensor-A:", A)
# B = torch.tensor(5., requires_grad=True)
# print("Tensor-B:", B)
  
# # define a function using above defined
# # tensors
# optimizer = optim.SGD([A,B], lr=0.5)
# for i in range(10):
#     optimizer.zero_grad()
#     loss = B - A
#     print("loss :", loss)
#     # call the backward method
#     loss.backward()
#     optimizer.step()
#     print("A:grad", A.grad, A)
#     print("B.grad:", B.grad, B)
#     print()
# # print the gradients using .grad