# Comments

Initial Conv2D Layers is replaced for a custom build Qonv2D
which implements a trainable quantum circuit. The Qonv2D is set
for a kernel_size = (2, 2), but the strides can be varied. Since it has
padding implementation only computes 'valids' convolution, nevertheless is
really straightforward to implement the padding which enables 'same' convolutions.



Too much computationally complex for now. Takes a lot to train
and the model performance slowly increase at each epoch.
Presumably with more time the model can achieve good performance,
nevertheless I am yet to be able to let the model run for a very long time.

# Imports

In [61]:
import tensorflow as tf
from tensorflow import keras

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization, Conv2D, MaxPooling2D, Activation, Flatten, Dropout, Dense
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.datasets import mnist

from tensorflow.keras import layers

import numpy as np
import time
import sys
import matplotlib.pyplot as plt
import datetime

%load_ext tensorboard

import os

os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

from qiskit import ClassicalRegister, QuantumRegister, QuantumCircuit, Aer, execute, IBMQ
from qiskit.tools.visualization import circuit_drawer
from qiskit.tools.visualization import plot_histogram
from qiskit.extensions.unitary import unitary
from qiskit.tools.monitor import job_monitor
from qiskit.compiler import transpile, assemble
from qiskit.providers.aer import QasmSimulator
S_simulator = Aer.backends(name = 'statevector_simulator')[0]
M_simulator = Aer.backends(name = 'qasm_simulator')[0]

backend = QasmSimulator(configuration = {'method' : 'density_matrix'})
M_simulator = backend


import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from scipy import misc
import cv2

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


# Quantum Functions definition

In [62]:
def convolution(FOCUS_TENSOR, FILTER, shots = 8192):
    '''
    FOCUS = [[F00, F01],
             [F10, F11]]
    FILTER = [[FI00, FI01],
              [FI10, FI11]]
    '''
    q = QuantumRegister(4, name = 'q_r')
    a = QuantumRegister(1, name = 'a_r')
    c = ClassicalRegister(1, name = 'c_r')
    qc = QuantumCircuit(q, a, c, name = 'q_circ')
    
    FOCUS = FOCUS_TENSOR.numpy()

    qc.h(q)

    qc.u3(FOCUS[0, 0] * FILTER[0, 0], FOCUS[0, 0] * FILTER[0, 1], FOCUS[0, 0] * FILTER[1, 0], q[0])

    qc.u3(FOCUS[0, 1] * FILTER[0, 0], FOCUS[0, 1] * FILTER[0, 1], FOCUS[0, 1] * FILTER[1, 0], q[1])

    qc.u3(FOCUS[1, 0] * FILTER[0, 0], FOCUS[1, 0] * FILTER[0, 1], FOCUS[1, 0] * FILTER[1, 0], q[2])

    qc.u3(FOCUS[1, 1] * FILTER[0, 0], FOCUS[1, 1] * FILTER[0, 1], FOCUS[1, 1] * FILTER[1, 0], q[3])
    
    qc.h(q)

    qc.mct(q, a, None, mode = 'noancilla')

    qc.measure(a[0], c)

    #transpiled_circuit = transpile(qc, M_simulator, optimization_level = 1)
    #job = M_simulator.run(assemble(transpiled_circuit, shots = shots))
    job = execute(qc, M_simulator, shots = shots, optimization_level = 1)
    results = job.result()

    readout = results.get_counts()
    convolution = (readout.get('1', 0) / shots) * FILTER[1, 1]
    return convolution

In [67]:
def qonv2D(FILTERS = None, kernel_size = (2, 2), stride = (1, 1), image = None):
    KERNEL = kernel_size
    STRIDE = stride
    N_FILTERS = FILTERS.shape[0]
    CONV_SHAPE = ((image.shape[0] - KERNEL[0]) // STRIDE[0] + 1, (image.shape[0] - KERNEL[0]) // STRIDE[1] + 1, N_FILTERS)
    #tf.print(image.numpy().shape)
    
    #CONV_IMAGE = [[] for _ in range(N_FILTERS)]
    CONV_IMAGE = np.zeros(shape = CONV_SHAPE)
    #tf.print(CONV_IMAGE.shape)
    for row in range(0, image.shape[0] - KERNEL[0] + 1, STRIDE[0]):
        for col in range(0, image.shape[1] - KERNEL[1] + 1, STRIDE[1]):
            for index, FILTER in enumerate(FILTERS):
                focus = image[row : row + KERNEL[0], col : col + KERNEL[1]]
                convol = convolution(focus, FILTER, shots = 100)
                #CONV_IMAGE[index].append(convol)
                CONV_IMAGE[row // STRIDE[0], col // STRIDE[1], index] = convol
    #CONV_OUTPUT = np.stack(CONV_IMAGE, axis = -1)
    #CONV_OUTPUT = CONV_OUTPUT.reshape(CONV_SHAPE)
    #tf.print(CONV_IMAGE.shape)
    return CONV_IMAGE

# Custom Keras Layer implementation

In [4]:
class Qonv2D(layers.Layer):
    def __init__(self, filters = 1, kernel_size = (2, 2), strides = (1, 1), input_shape = (28, 28)):
        super(Qonv2D, self).__init__()
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides

    def build(self, input_shape):
        self.f = self.add_weight(
            name = "filters_customized",
            shape = (self.filters, self.kernel_size[0], self.kernel_size[1]),
            initializer = "random_normal",
            trainable=True,
        )
        
    def get_config(self):
        config = super(Qonv2D, self). get_config()
        return config

    def call(self, inputs):
        # Exotic calculations
        if tf.executing_eagerly():
            final_output = []
            #tf.print(f'Inputs shape: {inputs.shape}')
            for i in range(inputs.shape[0]): # For volume in batch
                # Dummy operation
                pred = qonv2D(FILTERS = self.f.numpy(), kernel_size = self.kernel_size, stride = self.strides, image = inputs[i])
                final_output.append(list(pred))
                    
            #final_output = np.vstack((final_output))
            ans = tf.convert_to_tensor(final_output, dtype = "float32")
            return ans
        return Conv2D(self.filters, kernel_size = self.kernel_size, strides = self.strides)(inputs)
    
class MyReLu(layers.Layer):
    def __init__(self):
        super(MyReLu, self).__init__()
    def call(self, x):
        return tf.math.maximum(x, 0)

# Model definition

In [5]:
def MyModel(width, height, depth, classes):
    input_shape = (height, width, depth)
    chanDim = -1
    
    model = Sequential()
    model.add(Qonv2D(filters = 4, kernel_size = (2, 2), strides = (2, 2), input_shape = (width, height, depth)))
    model.add(Activation(tf.nn.relu))
    model.add(BatchNormalization(axis = chanDim))
    model.add(MaxPooling2D(pool_size = (2, 2)))
    
    model.add(Conv2D(filters = 32, kernel_size = (3, 3)))
    model.add(Activation(tf.nn.relu))
    model.add(BatchNormalization(axis = chanDim))
    model.add(MaxPooling2D(pool_size = (2, 2)))
    
    model.add(Conv2D(filters = 64, kernel_size = (3, 3), padding = "same"))
    model.add(Activation(tf.nn.relu))
    model.add(BatchNormalization(axis = chanDim))
    model.add(MaxPooling2D(pool_size = (2, 2)))
    
    model.add(Flatten())
    #model.add(Dense(units = 10))
    #model.add(Activation(tf.nn.relu))
    #model.add(BatchNormalization())
    model.add(Dropout(0.5))
    
    model.add(Dense(units = classes))
    model.add(Activation(tf.nn.softmax))
    
    return model

In [6]:
# Initialize epochs, batch size and initial learning rate
EPOCHS = 1
BS = 1
INIT_LR = 0.001

# Loading MNIST
((X_train, Y_train), (X_test, Y_test)) = mnist.load_data()

# Adding a channel dimension and scaling
X_train = np.expand_dims(X_train, axis = -1)
X_test = np.expand_dims(X_test, axis = -1)
X_train = X_train.astype("float32") / 255.0
X_test = X_test.astype("float32") / 255.0

# One-hot encoding
Y_train = to_categorical(Y_train, 10)
Y_test = to_categorical(Y_test, 10)

In [7]:
loss_object = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam()
opt = Adam(learning_rate = INIT_LR)

In [8]:
# Define our metrics
train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
train_accuracy = tf.keras.metrics.CategoricalAccuracy('train_accuracy')
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.CategoricalAccuracy('test_accuracy')

In [53]:
#@tf.function # Needed to make sure gradients are recorded
def train_step(model, opt, x_train, y_train):
    # Keep track of our gradients
    with tf.GradientTape() as tape:
        # Make a prediction with the model and use it to calculate loss
        predictions = model(x_train, training = True)
        #tf.print(predictions)
        loss = loss_object(y_train, predictions)
    # Calculate the gradient using our tape and then update the model weights
    grads = tape.gradient(loss, model.trainable_variables)
    tf.print([(v.name, v) for v in model.trainable_variables])
    opt.apply_gradients(zip(grads, model.trainable_variables))
    
    train_loss(loss)
    train_accuracy(y_train, predictions)

def test_step(model, x_test, y_test):
    predictions = model(x_test)
    loss = loss_object(y_test, predictions)
    
    test_loss(loss)
    test_accuracy(y_test, predictions)

In [54]:
# Building model and initialize optimizer
model = MyModel(28, 28, 1, 10)

In [59]:
model.summary()

Model: "sequential_4"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
qonv2d_4 (Qonv2D)            (1, 14, 14, 4)            16        
_________________________________________________________________
activation_16 (Activation)   (1, 14, 14, 4)            0         
_________________________________________________________________
batch_normalization_12 (Batc (1, 14, 14, 4)            16        
_________________________________________________________________
max_pooling2d_12 (MaxPooling (1, 7, 7, 4)              0         
_________________________________________________________________
conv2d_8 (Conv2D)            (1, 5, 5, 32)             1184      
_________________________________________________________________
activation_17 (Activation)   (1, 5, 5, 32)             0         
_________________________________________________________________
batch_normalization_13 (Batc (1, 5, 5, 32)            

# Training

In [56]:
x_train = X_train[0:50]
x_test = X_test[0:20]
y_train = Y_train[0:50]
y_test = Y_test[0:20]

In [57]:
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

In [None]:
# Compute number of bacthes updates per epoch
numUpdatesTrain = x_train.shape[0] // BS
numUpdatesTest = x_test.shape[0] // BS

# Looping over the number of epochs
for epoch in range(EPOCHS):
    print(f"[INFO] starting epoch {epoch + 1}/{EPOCHS}...", end = "")
    sys.stdout.flush()
    epochStart = time.time()
    
    # Looping over the data in batch size increments
    for i in range(numUpdatesTrain):
        # Determine starting and ending slice indexes for the current batch
        start = i * BS
        end = start + BS
        
        # Take a step
        train_step(model, opt, x_train[start: end], y_train[start: end])
        
    with train_summary_writer.as_default():
        tf.summary.scalar('loss', train_loss.result(), step = epoch)
        tf.summary.scalar('accuracy', train_accuracy.result(), step = epoch)
    
    # Looping over the data in batch size increments
    for i in range(numUpdatesTest):
        # Determine starting and ending slice indexes for the current batch
        start = i * BS
        end = start + BS
        
        # Take a step
        test_step(model, x_test[start: end], y_test[start: end])
    
    with test_summary_writer.as_default():
        tf.summary.scalar('loss', test_loss.result(), step = epoch)
        tf.summary.scalar('accuracy', test_accuracy.result(), step = epoch)
        
    template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
    print(template.format(epoch + 1,
                         train_loss.result(),
                         train_accuracy.result() * 100,
                         test_loss.result(),
                         test_accuracy.result()*100))
    
    # Reset metrics every epoch
    train_loss.reset_states()
    test_loss.reset_states()
    train_accuracy.reset_states()
    test_accuracy.reset_states()
    
    # Show timing information for the epoch
    epochEnd = time.time()
    elapsed = (epochEnd - epochStart) / 60.0
    print(f"took {elapsed:.4} minutes")

# Evaluations

In [186]:
%tensorboard --logdir logs\gradient_tape

Reusing TensorBoard on port 6006 (pid 17356), started 0:03:13 ago. (Use '!kill 17356' to kill it.)