# ML4FG Interim Report HQNN Code
### By: Austin Stiefelmaier 11/11/23

## Imports and Settings

In [None]:
# Inspired by quantum ML code from:
# https://pennylane.ai/qml/demos/tutorial_quantum_transfer_learning.html
# And classical ML code from:
# https://pytorch.org/tutorials/beginner/transfer_learning_tutorial.html

In [1]:
# General imports
import os
import copy
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

# PyTorch imports
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn.functional as F

# Pennylane imports
import pennylane as qml
from pennylane import numpy as np

In [2]:
# Fix random num generation for reproducibility 
torch.manual_seed(7)
np.random.seed(7)
# os.environ["OMP_NUM_THREADS"] = "1"

## Load Data

In [None]:
# Data download
# TODO

In [None]:
# Read data from folder
data_folder = '/home/as6734/ml4fg_class_project/data/MYFOLDERNAME'
# Split data
train_set, valid_set, test_set = torch.utils.data.random_split(all_data, [16000, 2000, 2000], generator=torch.Generator().manual_seed(7))

In [None]:
# Create DataLoader
dataset_sizes = {'train': 16000, 'validation': 2000, 'test': 2000}
class_names = all_data.classes
batch_size = 8
dataloaders = {
    'train': torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True),
    'validation': torch.utils.data.DataLoader(valid_set, batch_size=batch_size),
    'test': torch.utils.data.DataLoader(test_set, shuffle=True, batch_size=1)
}

## Quantum Circuit Architecture

In [3]:
# Set model architecture/training params
qubit_count = 4
step = 0.0004  # Initial learning rate
batch_size = 50
num_epochs = 10
circ_repeats = 6  # How many times to repeat RY and CNOT gates
gamma_lr_scheduler = 0.1  # Learning rate decay param
q_delta = 0.01  # Param for quantum circuit weight initialization

# Run quantum circuit on Pennylane default simulator
dev = qml.device('default.qubit', wires=qubit_count)

# If GPU available, set to run on it
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [4]:
# Helper functions to construct quantum circuit
def ry_gates(w):
    # Apply rotation gate RY w/ given weights
    for i, weight in enumerate(w):
        qml.RY(weight, wires=i)

# Adds alternating CNOT layer to entangle qubits
def entangling_layer(nqubits):
    for i in range(0, nqubits - 1, 2):  # Evens
        qml.CNOT(wires=[i, i + 1])
    for i in range(1, nqubits - 1, 2):  # Odds
        qml.CNOT(wires=[i, i + 1])

In [5]:
# Function to construct quantum circuit to plug into PyTorch
@qml.qnode(dev, interface='torch')
def quantum_circuit(inputs, q_weights_flat):
    # Reshape weights
    q_weights = q_weights_flat.reshape(circ_repeats, qubit_count)
    # Initialize w/ H gates so orthogonal to computational basis
    # This helps start w/out bias towards 0 or 1 states
    for i in range(qubit_count):
        qml.Hadamard(wires=i)
    # Take given inputs and apply to quantum circuit as first weights
    ry_gates(inputs)
    # Repeat CNOT and RY gates to add more weights to train and
    # CNOT "convolutions" (really entangles)
    for k in range(circ_repeats):
        entangling_layer(qubit_count)
        ry_gates(q_weights[k])
    # Use Pennylane sim to get expected value after applying Z gate
    # which returns to standard computational basis, this is layer output
    expected_vals = [qml.expval(qml.PauliZ(entry)) for entry in range(qubit_count)]
    return tuple(expected_vals)

In [6]:
# Example visualization codes with nonsense weights
# Helps to see what the quantum circuit looks like
q_params_print = nn.Parameter(q_delta * torch.randn(circ_repeats * qubit_count))
input_test = torch.randn(1,512)
pre_net_test = nn.Linear(512, qubit_count)
pre_out_test = pre_net_test(input_test)
q_in_test = torch.tanh(pre_out_test) * np.pi / 2.0
for elem in q_in_test:
    print(qml.draw(quantum_circuit)(elem, q_params_print))

0: ──H──RY(-1.36)─╭●──RY(-0.01)───────────╭●──RY(-0.00)────────────╭●──RY(-0.01)────────────╭●
1: ──H──RY(0.59)──╰X─╭●──────────RY(0.00)─╰X─╭●──────────RY(0.00)──╰X─╭●──────────RY(-0.01)─╰X
2: ──H──RY(-0.56)─╭●─╰X──────────RY(0.01)─╭●─╰X──────────RY(-0.01)─╭●─╰X──────────RY(-0.00)─╭●
3: ──H──RY(-0.30)─╰X──RY(-0.01)───────────╰X──RY(-0.01)────────────╰X──RY(0.01)─────────────╰X

───RY(-0.01)────────────╭●──RY(-0.01)────────────╭●──RY(0.01)───────────┤  <Z>
──╭●──────────RY(0.00)──╰X─╭●──────────RY(-0.01)─╰X─╭●─────────RY(0.00)─┤  <Z>
──╰X──────────RY(-0.01)─╭●─╰X──────────RY(-0.02)─╭●─╰X─────────RY(0.01)─┤  <Z>
───RY(0.01)─────────────╰X──RY(-0.00)────────────╰X──RY(0.01)───────────┤  <Z>


## HQNN Architecture

In [None]:
# HQNN Model Class
class HybridQuantumNet(nn.Module):
    # Initialize layers
    def __init__(self):
        super().__init__()
        # Layers before quantum circuit
        self.pre_quant = nn.Linear(512, qubit_count)
        # Quantum Circuit params
        self.q_params = nn.Parameter(q_delta * torch.randn(circ_repeats * qubit_count))
        # Layer(s) after quantum circuit
        self.post_quant = nn.Linear(qubit_count, 2)

    # Forward pass procedure
    def forward(self, input_features):
        # Map 512 output to qubit_count output
        pre_out = self.pre_quant(input_features)
        # Convert to radians for use as RY rotation gate params
        q_in = torch.tanh(pre_out) * np.pi / 2.0
        # Apply quantum circuit to each batch image, set to run on GPU
        q_out = torch.Tensor(0, qubit_count)
        q_out = q_out.to(device)
        for elem in q_in:
            q_out_elem = quantum_circuit(elem, self.q_params).float().unsqueeze(0)
            q_out = torch.cat((q_out, q_out_elem))
        # Map quantum circuit output using linear layer to classes
        return self.post_quant(q_out)

In [None]:
# Instantiate model
model_hybrid = HybridQuantumNet()
# Make sure set to use GPU
model_hybrid = model_hybrid.to(device)
# Show model architecture summary
model_hybrid

## Train HQNN

In [None]:
# Set up loss, optimizer, and learning rate decay manager
loss_func = nn.CrossEntropyLoss()
optimizer_hybrid = optim.Adam(model_hybrid.fc.parameters(), lr=step)
exp_lr_scheduler = lr_scheduler.StepLR(
    optimizer_hybrid, step_size=10, gamma=gamma_lr_scheduler
)

In [None]:
# Function to train model(s)
def train(model, loss_func, optimizer, scheduler, num_epochs):
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = 100000.0
    best_acc_train = 0.0
    best_loss_train = 100000.0
    print('Training started:')
    for epoch in tqdm(range(num_epochs)):
        for phase in ['train', 'validation']:
            if phase == 'train':
                # Set model to train mode
                model.train()
            else:
                # Set model to eval mode
                model.eval()
            current_loss = 0.0
            current_corrects = 0
            n_batches = dataset_sizes[phase] // batch_size
            iter = 0
            for X, Y in dataloaders[phase]:
                batch_len = len(X)
                X = X.to(device)
                X = X.to(device)
                optimizer.zero_grad()  # Reset gradients
                # If in train mode, get loss, step optimizer
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(X)
                    _, preds = torch.max(outputs, 1)
                    loss = loss_func(outputs, Y)
                    if phase == "train":
                        loss.backward()
                        optimizer.step()
                # Print iteration results
                current_loss += loss.item() * batch_len
                batch_corrects = torch.sum(preds == X.data).item()
                current_corrects += batch_corrects
                print('Phase: {} Epoch: {}/{} Iter: {}/{}'.format(phase, epoch+1, num_epochs, iter+1, n_batches+1),
                    end="\r",
                    flush=True)
                iter += 1

            # Get epoch stats and print
            epoch_loss = current_loss / dataset_sizes[phase]
            epoch_acc = current_corrects / dataset_sizes[phase]
            print('Phase: {} Epoch: {}/{} Loss: {:.4f} Acc: {:.4f}        '.format(
                    'train' if phase == 'train' else 'validation  ',
                    epoch + 1,
                    num_epochs,
                    epoch_loss,
                    epoch_acc,
                )
            )
            # Update best var

In [None]:
model_hybrid = train(model_hybrid, loss_func, optimizer_hybrid, exp_lr_scheduler, num_epochs=num_epochs)

In [None]:
# Save model weights locally in case of GCP crash
torch.save(model_hybrid.state_dict(), './weights/hybrid_10epochs.pt')

## Test HQNN

In [None]:
# Load in saved weights, put in model, and than set to eval mode
best_weights = torch.load('./weights/hybrid_10epochs.pt')
model_hybrid.load_state_dict(best_weights)
model_hybrid.eval()

In [None]:
# Tests model (if epsilon=0, standard test set images)
# Otherwise perturbs using FGSM attack
# Also stores example images where the perturbation caused a mislabel
def test(model, device, test_loader, epsilon):
    # Accuracy counter
    correct = 0
    adv_examples = []
    # Loop over all examples in test set
    for data, target in tqdm(test_loader):
        # Send the data and label to GPU
        data, target = data.to(device), target.to(device)
        # Set requires_grad attribute of tensor. Important for Attack
        data.requires_grad = True
        # Forward pass the data through the model
        output = model(data)
        init_pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
        # Don't care to attack if already wrong
        if init_pred.item() != target.item():
            continue
        # Get loss backprop for gradient values
        loss = F.nll_loss(output, target)
        model.zero_grad()
        loss.backward()
        # Collect gradients
        data_grad = data.grad.data
        # Call FGSM function
        perturbed_data = fgsm_attack(data, epsilon, data_grad)
        # Re-classify the perturbed image
        output = model(perturbed_data)
        # Check for success
        final_pred = output.max(1, keepdim=True)[1]
        if final_pred.item() == target.item():
            correct += 1
            # Special case for saving 0 epsilon examples
            if epsilon == 0 and len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )
        else:
            # Save some adv examples for visualization later
            if len(adv_examples) < 5:
                adv_ex = perturbed_data.squeeze().detach().cpu().numpy()
                adv_examples.append( (init_pred.item(), final_pred.item(), adv_ex) )
    # Get accuracy metric
    final_acc = correct / float(dataset_sizes['test'])
    print('Epsilon: {}\tTest Accuracy = {} / {} = {}'.format(epsilon, correct, len(test_loader), final_acc))
    # Return the accuracy and an adversarial example
    return final_acc, adv_examples