In [1]:
!pip install qiskit==0.35.0

/home/cwetzel3/.bashrc: line 1: /home/umrigar/cs220/bin/student.rc: No such file or directory
Collecting qiskit==0.35.0
  Downloading qiskit-0.35.0.tar.gz (13 kB)
Collecting qiskit-aer==0.10.3
  Downloading qiskit_aer-0.10.3-cp39-cp39-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (18.0 MB)
[K     |████████████████████████████████| 18.0 MB 260 kB/s  eta 0:00:01
[?25hCollecting qiskit-ibmq-provider==0.18.3
  Downloading qiskit_ibmq_provider-0.18.3-py3-none-any.whl (238 kB)
[K     |████████████████████████████████| 238 kB 77.9 MB/s eta 0:00:01
[?25hCollecting qiskit-ignis==0.7.0
  Downloading qiskit_ignis-0.7.0-py3-none-any.whl (200 kB)
[K     |████████████████████████████████| 200 kB 78.8 MB/s eta 0:00:01
[?25hCollecting qiskit-terra==0.20.0
  Downloading qiskit_terra-0.20.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.5 MB)
[K     |████████████████████████████████| 6.5 MB 72.9 MB/s eta 0:00:01
Collecting requests-ntlm>=1.1.0
  Downloading requests_ntlm-1.2.0-py3-n

In [2]:
import numpy as np
import matplotlib.pyplot as plt

import torch
from torch.autograd import Function
from torchvision import datasets, transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F

import qiskit
from qiskit import transpile, assemble
from qiskit.visualization import *

Matplotlib is building the font cache; this may take a moment.


ModuleNotFoundError: No module named 'torch'

In [None]:
QUBITS_PER_CIRCUIT = 2

In [None]:
class QuantumCircuit:
    """ 
    This class provides a simple interface for interaction 
    with the quantum circuit 
    """
    
    def __init__(self, n_qubits, backend, shots):
        # --- Circuit definition ---
        self._circuit = qiskit.QuantumCircuit(n_qubits)
        
        all_qubits = [i for i in range(n_qubits)]
        self.theta = qiskit.circuit.Parameter('theta')
        
        self._circuit.h(all_qubits)
        self._circuit.barrier()
        self._circuit.ry(self.theta, all_qubits)
        
        self._circuit.measure_all()
        # ---------------------------

        self.backend = backend
        self.shots = shots
    
    def run(self, thetas):
        t_qc = transpile(self._circuit,
                         self.backend)
        qobj = assemble(t_qc,
                        shots=self.shots,
                        parameter_binds = [{self.theta: theta} for theta in thetas])
        job = self.backend.run(qobj)
        result = job.result().get_counts()
        
        counts = np.array(list(result.values()))
        states = np.array(list(result.keys())).astype(float)
        
        # Compute probabilities for each state
        probabilities = counts / self.shots
        # Get state expectation
        expectation = np.sum(states * probabilities)
        #print(expectation)
        
        return np.array([expectation])

In [None]:
class QuantumCircuit2: #electric boogaloo
    """ 
    This class provides a simple interface for interaction 
    with the quantum circuit 
    """
    
    def __init__(self, n_qubits, backend, shots):
        # --- Circuit definition ---
        self._circuit = qiskit.QuantumCircuit(n_qubits)
        
        all_qubits = [i for i in range(n_qubits)]
        self.theta = qiskit.circuit.Parameter('theta')
        
        self._circuit.h(all_qubits)
        self._circuit.barrier()
        self._circuit.ry(self.theta, all_qubits)
        
        self._circuit.measure_all()
        # ---------------------------

        self.backend = backend
        self.shots = shots
    
    def run(self, thetas):
        t_qc = transpile(self._circuit,
                         self.backend)
        qobj = assemble(t_qc,
                        shots=self.shots,
                        parameter_binds = [{self.theta: theta} for theta in thetas])
        job = self.backend.run(qobj)
        result = job.result().get_counts()
        
        counts = np.array(list(result.values()))
        states = np.array(list(result.keys())).astype(float)
        
        # Compute probabilities for each state
        probabilities = counts / self.shots
        # Get state expectation
        expectation = np.sum(states * probabilities)
        #print(expectation)
        
        return np.array([expectation])

In [None]:
simulator = qiskit.Aer.get_backend('aer_simulator')

circuit = QuantumCircuit(QUBITS_PER_CIRCUIT, simulator, 100)
print('Expected value for rotation pi {}'.format(circuit.run([np.pi])[0]))
circuit._circuit.draw()

In [None]:
class HybridFunction(Function):
    """ Hybrid quantum - classical function definition """
    
    @staticmethod
    def forward(ctx, input, quantum_circuit, quantum_circuit2, shift):
        """ Forward pass computation """
        ctx.shift = shift
        ctx.quantum_circuit = quantum_circuit
        ctx.quantum_circuit2 = quantum_circuit2

        expectation_z = ctx.quantum_circuit.run([ input[0][0].tolist() ])
        expectation_z2 = ctx.quantum_circuit2.run([ input[0][1].tolist() ])
        result = torch.tensor([expectation_z, expectation_z2])
        ctx.save_for_backward(input, result)

        return result
        
    @staticmethod
    def backward(ctx, grad_output):
        """ Backward pass computation """
        input, expectation_z = ctx.saved_tensors
        input_list = np.array(input.tolist())

        #print(input_list)
        
        shift_right = input_list + np.ones(input_list.shape) * ctx.shift
        shift_left = input_list - np.ones(input_list.shape) * ctx.shift

        length = len(input_list[0])
        split = int(length / 2)
        
        #ive come to the conclusion that thetas only ever has one theta in it
        #based on how we are using this code. This code will more than likely
        #break if you want to use two thetas. I think that they wrote this
        #to in theory let you rin the quantum circuit on more than one input
        #but didn't actually test it because it won't work with multiple thetas

        gradients = []
        for i in range(0, split):
            expectation_right = ctx.quantum_circuit.run([ shift_right[0][i].tolist() ])
            expectation_left  = ctx.quantum_circuit.run([ shift_left[0][i].tolist() ])
            
            #gradient = torch.tensor([expectation_right]) - torch.tensor([expectation_left])
            gradient = (expectation_right) - (expectation_left)
            gradients.append(gradient)
          
        for i in range(split, length):
            expectation_right = ctx.quantum_circuit2.run([ shift_right[0][i].tolist() ])
            expectation_left  = ctx.quantum_circuit2.run([ shift_left[0][i].tolist() ])
            
            #gradient = torch.tensor([expectation_right]) - torch.tensor([expectation_left])
            gradient = (expectation_right) - (expectation_left)
            gradients.append(gradient)
        
        gradients = np.array([gradients]).T

        ret = torch.tensor(gradients).float() * grad_output.float()

        return ret.view(1,2), None, None, None

class Hybrid(nn.Module):
    """ Hybrid quantum - classical layer definition """
    
    def __init__(self, backend, shots, shift):
        super(Hybrid, self).__init__()
        self.quantum_circuit = QuantumCircuit(QUBITS_PER_CIRCUIT, backend, shots)
        self.quantum_circuit2 = QuantumCircuit2(QUBITS_PER_CIRCUIT, backend, shots)
        self.shift = shift
        
    def forward(self, input):
        return HybridFunction.apply(input, self.quantum_circuit, self.quantum_circuit2, self.shift)

In [None]:
# Concentrating on the first 100 samples
n_samples = 100

X_train = datasets.MNIST(root='./data', train=True, download=True,
                         transform=transforms.Compose([transforms.ToTensor()]))

# Leaving only labels 0 and 1 
idx = np.append(np.where(X_train.targets == 0)[0][:n_samples], 
                np.where(X_train.targets == 1)[0][:n_samples])
#adding 100 samples from rest
idx = np.append(idx, np.where(X_train.targets == 2)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 3)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 4)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 5)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 6)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 7)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 8)[0][:n_samples])
idx = np.append(idx, np.where(X_train.targets == 9)[0][:n_samples])

X_train.data = X_train.data[idx]
X_train.targets = X_train.targets[idx]

print(X_train)

train_loader = torch.utils.data.DataLoader(X_train, batch_size=1, shuffle=True)

In [None]:
n_samples_show = 6

data_iter = iter(train_loader)
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

while n_samples_show > 0:
    images, targets = data_iter.__next__()

    axes[n_samples_show - 1].imshow(images[0].numpy().squeeze(), cmap='gray')
    axes[n_samples_show - 1].set_xticks([])
    axes[n_samples_show - 1].set_yticks([])
    axes[n_samples_show - 1].set_title("Labeled: {}".format(targets.item()))
    
    n_samples_show -= 1

In [None]:
n_samples = 15

X_test = datasets.MNIST(root='./data', train=False, download=True,
                        transform=transforms.Compose([transforms.ToTensor()]))

idx = np.append(np.where(X_test.targets == 0)[0][:n_samples], 
                np.where(X_test.targets == 1)[0][:n_samples])

idx = np.append(idx, np.where(X_test.targets == 2)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 3)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 4)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 5)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 6)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 7)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 8)[0][:n_samples])
idx = np.append(idx, np.where(X_test.targets == 9)[0][:n_samples])

X_test.data = X_test.data[idx]
X_test.targets = X_test.targets[idx]

test_loader = torch.utils.data.DataLoader(X_test, batch_size=1, shuffle=True)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.dropout = nn.Dropout2d()
        self.fc1 = nn.Linear(256, 64)
        self.fc2 = nn.Linear(64, 2)
        self.hybrid = Hybrid(qiskit.Aer.get_backend('aer_simulator'), 100, np.pi / 2)
        #added
        self.fc3 = nn.Linear(2, 10)
        self.lsm = nn.LogSoftmax(dim = 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout(x)
        x = x.view(1, -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        x = self.hybrid(x)
        #hybrid returns a 64-bit float, so we must cast to 32 bit for the last linear layer
        x = x.type(torch.float32)
        x = torch.reshape(x, (1, 2))
        x = self.fc3(x)
        #need a logsoftmax layer
        x = self.lsm(x)
        return x

In [None]:
model = Net()
#optimizer = optim.Adam(model.parameters(), lr=0.001)
optimizer = optim.Adam(model.parameters(), lr=0.003)
loss_func = nn.NLLLoss()

#20
epochs = 20
loss_list = []

model.train()
for epoch in range(epochs):
    total_loss = []
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        # Forward pass
        output = model(data)
        # Calculating loss
        loss = loss_func(output, target)
        # Backward pass
        loss.backward()
        # Optimize the weights
        optimizer.step()
        
        total_loss.append(loss.item())
    loss_list.append(sum(total_loss)/len(total_loss))
    print('Training [{:.0f}%]\tLoss: {:.4f}'.format(
        100. * (epoch + 1) / epochs, loss_list[-1]))

In [None]:
plt.plot(loss_list)
plt.title('Hybrid NN Training Convergence')
plt.xlabel('Training Iterations')
plt.ylabel('Neg Log Likelihood Loss')

In [None]:
model.eval()
with torch.no_grad():
    
    correct = 0
    for batch_idx, (data, target) in enumerate(test_loader):
        output = model(data)
        
        pred = output.argmax(dim=1, keepdim=True) 
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        loss = loss_func(output, target)
        total_loss.append(loss.item())
        
    print('Performance on test data:\n\tLoss: {:.4f}\n\tAccuracy: {:.1f}%'.format(
        sum(total_loss) / len(total_loss),
        correct / len(test_loader) * 100)
        )

In [None]:
n_samples_show = 6
count = 0
fig, axes = plt.subplots(nrows=1, ncols=n_samples_show, figsize=(10, 3))

model.eval()
with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
        if count == n_samples_show:
            break
        output = model(data)
        
        pred = output.argmax(dim=1, keepdim=True) 

        axes[count].imshow(data[0].numpy().squeeze(), cmap='gray')

        axes[count].set_xticks([])
        axes[count].set_yticks([])
        axes[count].set_title('Predicted {}'.format(pred.item()))
        
        count += 1