# Training Hybrid Classical-Quantum Classifiers via Stochastic Variational Optimization

To run without installing additional packages:


*   Upload to your Google Drive account;
*   Open with Colab;
*   From the Runtime menu in the upper left corner, execute Run all.

Import necessary dependencies and set random seed:

In [None]:
import numpy as np
import math
from scipy.stats import bernoulli, binom
import matplotlib.pyplot as plt
import pandas as pd
from collections import namedtuple

import itertools
import random

from sklearn.metrics import roc_curve
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import accuracy_score

In [None]:
seed = 2021
rng = np.random.default_rng(seed)
rng.random()

0.7569478279346672

Generate Bars-and-Stripes (BAS) samples [1]: 
* Data samples containing bars/stripes are labeled as positive, and the remainder of the patterns are labeled as negative.

In [None]:
def NumLinesStripes (a, b):
  return pow(2,a)+pow(2,b)-2

# generate lines and stripes
def LinesStripes (a, b, c): 
  arr = np.zeros(a*b)
  c = c+1
  if (c<pow(2,a)): #lines
    lines = np.zeros (a, dtype = bool)
    for i in range (a):
      if (c%2 == 1):
        lines[i] = True
      c = int(c/2)
    index = 0
    for element in lines:
      if element:
        for i in range(index, index+b):
          arr[i] = 1
      index = index+b
  else: #stripes
    c = c-pow(2,a)
    stripes = np.zeros (b, dtype = bool)
    for i in range (b):
      if (c%2 == 1):
        stripes[i] = True
      c = int(c/2)
    index = 0
    for element in stripes:
      if element:
        for i in range(index, a*b, b):
          arr[i] = 1
      index = index+1
  arr[arr == 0] = -1
  return arr

# generate random patterns
def NonLinesStripes(a,b): 
  arr_non = np.random.randint(2, size=a*b)
  arr_non[arr_non == 0] = -1
  return arr_non

# visualise patterns
def PrintLinesStripes (a, b, arr): 
  print(arr)
  print(CheckLinesStripes(np.array(arr),height,width))
  index = 0
  for i in range(a):
    word = ''
    for j in range (b):
      if(arr[index] == 1):
        word = word + 'X'
      else:
        word = word + '0'
      index = index + 1
    print(word)   

# generate lines and stripes (main)
def RandomLinesStripes (a, b): 
  maximum = pow(2,a)+pow(2,b)-2
  return LinesStripes (a, b, random.randint(0,maximum))

# check if a data sample contains a line or it is random for labeling
def CheckLines(arr): 
  for row in arr:
    ctt = row[0]
    for element in row:
      if(element != ctt):
        return -1
  return 1

# check if a data sample contains a line/stripe or it is random for labeling
def CheckLinesStripes(arr, *sizes): 
  if(sizes):
    arr = arr.reshape(sizes)
  return CheckLines(arr) or CheckLines(np.transpose(arr))  

In [None]:
# define data sample size d X d
height = 4
width = 4
CDk = 1
trainsize = NumLinesStripes(height,width)

In [None]:
data = []
labels = []

# number of data samples
samples_positive = 20
samples_negative = 20

# generate and label positive data samples
for i in range(samples_positive):
  arr = RandomLinesStripes (height, width)
  label = 1
  data.append([arr])
  labels.append([label])

# generate and label negative data samples
for i in range(samples_negative):
  arr = NonLinesStripes (height, width)
  label = CheckLinesStripes(np.array(arr),height,width)
  data.append([arr])
  labels.append([label])

In [None]:
# reshape data set
X_temp = np.reshape(data, newshape = (samples_positive + samples_negative, height*width))
y_temp = np.reshape(labels, newshape = (samples_positive + samples_negative,))

# test
ind = np.random.randint(low=0, high=X_temp.shape[0], size=6)

In [None]:
# test
def get_idx(rng):
    ind = np.random.randint(low=0, high=X_temp.shape[0], size=6)
    print(ind)

get_idx(rng)

[23 17  6  1 10 33]


In [None]:
alpha = 0.5 # constant for moving averages

Create the hybrid classifier:

In [None]:
class my_NN(object):
    def __init__(self):
        self.input = height*width # data sample dimension (flattened)
        self.output = 1 # output dimension
        self.hidden_units1 = 32 # single layer

        self.normalising_constant = self.hidden_units1 # normalising constant

        # initialize matrix of parameters;
        # variational parameters: input -> hidden layer 
        self.p1 = np.zeros([self.input, self.hidden_units1]) # input X hidden_units matrix
        self.synaptic_params1 = self._sigmoid(self.p1, 1)
        
        # variational parameters: hidden layer -> output
        self.p2 = np.zeros([self.hidden_units1, self.output])  # hidden_units X output matrix
        self.synaptic_params2 = self._sigmoid(self.p2, 1)
        
        
        # weights1: input -> hidden layer 
        self.w1 = self.get_weights(self.synaptic_params1) # input X hidden_units matrix
        # weights2: hidden layer -> hidden layer
        self.w2 = self.get_weights(self.synaptic_params2)  # hidden_units X output matrix

        self.baseline1 = 0 #initialise baseline for variational parameters
        self.baseline2 = 0 #initialise baseline for variational parameters

        self.e1_prev = np.zeros_like(self.w1) #initialise previous gradient for variational parameters (for moving average)
        self.e2_prev = np.zeros_like(self.w2) #initialise previous gradient for variational parameters (for moving average)

        

        self.loss_prev = 0

        self.loss_list = []

        self.flag = 0

    
    # sample weights/outputs - training
    def get_weights(self, synaptic_params):
        synaptic_params[synaptic_params > 0.9999] = 1 # depending on numerical precision, increse/decrese threshold to avoid probability greater than one
        synaptic_params[synaptic_params < 0] = 0 # depending on numerical precision, increse/decrese threshold to avoid probability lesser than zero
        self.weights = np.random.binomial(n = 1, p = synaptic_params, size = synaptic_params.shape) # sample weights
        self.weights[self.weights == 0] = -1 # np.random.binomial returns zeroes and ones; replace zeros with minus ones

        return self.weights

    # sample weights/outputs - inference
    def get_weights_eval(self, synaptic_params, threshold):
        synaptic_params[synaptic_params > 0.9999] = 1 # depending on numerical precision, increse/decrese threshold to avoid probability greater than one
        synaptic_params[synaptic_params < 0] = 0 # depending on numerical precision, increse/decrese threshold to avoid probability lesser than zero
        self.weights = np.where(synaptic_params > threshold, 1, -1) # set weights using a fixed threshold

        return self.weights

    # QGLM response function
    def get_response(self, dot):
        self.response = (1/2) + (1/2)*((1/self.normalising_constant)*(dot))**2  # Biased Quadratic; 
                                                                                # see Biased_Quadratic_Qiskit.ipynb for an implementation routine on quantum hardware

        return self.response

    # CGLM response function
    def get_response_classical(self, dot):
        self.response = self._sigmoid_classical(dot) 

        return self.response
    
    # forward propagation - training
    def _forward_propagation(self, X): 
        self.z2 = np.dot(self.w1.T, X.T) # multiply weights and input
        self.a2_temp = self.get_response(self.z2) # get QGLM response 
        self.a2 = self.get_weights(self.a2_temp) # sample QGLM output - same function used for sampling weights with the response as input 
        self.a2[self.a2 == 0] = -1 # np.random.binomial returns zeroes and ones; replace zeros with minus ones

        self.z3 = np.dot(self.w2.T, self.a2) # multiply weights and output of hidden layer
        self.a3_temp = self.get_response_classical(self.z3) # get CGLM response 

        return self.a3_temp

    # forward propagation - inference
    def _forward_propagation_eval(self, X, threshold): 
        self.z2 = np.dot(self.w1.T, X.T) # multiply weights and input
        self.a2_temp = self.get_response(self.z2) # get QGLM response 
        self.a2 = self.get_weights_eval(self.a2_temp, threshold) # get QGLM output
        self.a2[self.a2 == 0] = -1 # replace zeros with minus ones 

        self.z3 = np.dot(self.w2.T, self.a2) # multiply weights and output of hidden layer
        self.a3_temp = self.get_response_classical(self.z3) # get CGLM response 

        return self.a3_temp

    # tempered sigmoid with temperature annealing schedule 
    def _sigmoid(self, z, epoch):
        initial_temp = 1 
        temperature = initial_temp
        min = 0.01 
        epochs_decrease = 200 
        if epoch > 0 and epoch%epochs_decrease  == 0:
            if 1 - initial_temp * (min*(epoch/epochs_decrease)) > min:
                temperature = 1 - initial_temp * (min*(epoch/epochs_decrease))
            else:
                temperature = min

        return 1/(1+np.exp(-z/temperature))

    # classical sigmoid
    def _sigmoid_classical(self, z):

        return 1/(1+np.exp(-z))

    # loss function
    def _loss(self, predict, y):
        const = 0.001 # depending on numerical precision, increse/decrese const to avoid log of zero
        m = y.shape[0]
        predict[predict > 0.9999] = 1
        logprobs = np.multiply(np.log(predict + const), ((1 + y)/2)) + np.multiply(((1 - y)/2), np.log(1 - predict + const))
        loss = - np.sum(logprobs) / m

        return loss

    # backward propagation
    def _backward_propagation(self, X, y, loss, i):
        self.dp1_temp = self._loss_prime(self.w1, self.p1, i)
        self.dp2_temp = self._loss_prime(self.w2, self.p2, i)
        
    # loss derivative
    def _loss_prime(self, w, p, i):

        return ((1+w)/2 - self._sigmoid(p, i))

    # update parameters
    def _update(self, loss, num_samples, i):
        default_learning_rate = 0.2
        learning_rate = self.step_decay(i)
        self.p1 = self.p1 - default_learning_rate*((learning_rate*(self.loss - self.baseline1))*self.e1/num_samples)
        self.p2 = self.p2 - default_learning_rate*((learning_rate*(self.loss - self.baseline2))*self.e2/num_samples)

    # update baseline with moving average 
    def _update_baseline(self, loss, num_samples, i):
        const = 0.0001
        self.baseline1 = (alpha*self.loss_prev*(self.e1_prev**2) + (1-alpha)*self.loss*((self.e1/num_samples)**2)) / ((alpha*(self.e1_prev**2) + (1-alpha)*((self.e1/num_samples)**2))+ const)
        self.baseline2 = (alpha*self.loss_prev*(self.e2_prev**2) + (1-alpha)*self.loss*((self.e2/num_samples)**2)) / (alpha*(self.e2_prev**2) + (1-alpha)*((self.e2/num_samples)**2)+ const)
        

    # cyclical learning rate schedule
    def step_decay(self, epoch):
        base_lr = 0.1
        max_lr = 1000
        step_size = 1000
        cycle = np.floor(1+epoch/(2*step_size))
        x = np.abs(epoch/step_size - 2*cycle + 1)
        lrate = base_lr + (max_lr-base_lr)*np.maximum(0, (1-x))*0.5*(1+np.sin(cycle*np.pi/2.))

        return lrate

    # training function
    def train(self, Xtrain, ytrain, iteration=5000, num_samples = 10, batch_size = 16): 

        ###################### Training ########################

        if self.flag == 0:

            for i in range(iteration):

                ind = np.random.randint(low=0, high=X_temp.shape[0], size=batch_size) # create random batch of samples
                X = Xtrain[ind]
                y = ytrain[ind]

                loss_avg = 0

                self.e1 = np.zeros_like(self.w1) 
                self.e2 = np.zeros_like(self.w2) 

                # use multiple samples of the variational distribution q(.|.) to estimate loss
                for j in range(num_samples):
                    self.synaptic_params1 = self._sigmoid(self.p1, i) # weight probabilities
                    self.synaptic_params2 = self._sigmoid(self.p2, i) # weight probabilities

                    self.w1 = self.get_weights(self.synaptic_params1) # sample weights
                    self.w2 = self.get_weights(self.synaptic_params2) # sample weights

                    y_hat = self._forward_propagation(X) # forward propagation
                    loss = self._loss(y_hat, y) # calculate loss

                    loss_avg = loss_avg + loss # sum of loss over samples of the variational distribution q(.|.)

                    self._backward_propagation(X, y, loss_avg/num_samples, i) # backward propagation

                    self.e1 = self.e1 + self.dp1_temp # sum of gradient over samples of the variational distribution q(.|.)
                    self.e2 = self.e2 + self.dp2_temp # sum of gradient over samples of the variational distribution q(.|.)

                self.loss = loss_avg/num_samples

                self._update_baseline(self.loss, num_samples, i) # update the baseline
                self._update(self.loss, num_samples, i) # update the variational parameters

                self.e1_prev = self.e1/num_samples
                self.e2_prev = self.e2/num_samples

                self.loss_prev = self.loss

                ###################### Inference ########################

                if i == 0 or i%1000==0:

                    if i == 0:
                        Wthresholds = [0.5] # set initial threshold 
                    else:
                        Wthresholds = [0.05,0.1,0.15,0.2,0.25,0.3,0.35,0.4,0.45,0.5,0.55,0.6,0.65,0.7,0.75,0.8,0.85,0.9] # optimize output threshold during training

                    score_best = 0

                    # find the best output threshold
                    for Wthreshold in Wthresholds:

                        self.synaptic_params1 = self._sigmoid(self.p1, i)
                        self.synaptic_params2 = self._sigmoid(self.p2, i)

                        self.w1 = self.get_weights_eval(self.synaptic_params1, Wthreshold) 
                        self.w2 = self.get_weights_eval(self.synaptic_params2, Wthreshold) 

                        pred_avg = np.zeros_like(y_temp) 
                        loss_eval = 0
                        for j in range(num_samples):

                            y_hat = self._forward_propagation(Xtrain)
                            l = self._loss(y_hat, ytrain)

                            pred_avg = pred_avg + y_hat

                            loss_eval = loss_eval + l

                        self.loss_list.append(loss_eval/num_samples)
                    

                        y_hat_test = pred_avg/num_samples

                        # use the geometric mean
                        fpr, tpr, thresholds = roc_curve(y_temp, y_hat_test.reshape(samples_positive + samples_negative,))
                        gmeans = (tpr*samples_positive + (1-fpr)*samples_negative)/(samples_positive + samples_negative)
                        ix = np.argmax(gmeans)

                        y_hat_test_roc = [1 if i[0] > thresholds[ix] else -1 for i in y_hat_test.T] 
                        
                        score = self.score(y_hat_test_roc, y_temp) # get the accuracy score

                        
                        if score > score_best:
                            score_best = score
                    
                    print("Iteration and score: ", i, score_best)

                    


    def predict_test(self, X):
        y_hat_response = self._forward_propagation(X)
        y_hat = y_hat_response 
        return np.array(y_hat)
    
    # get the accuracy score 
    def score(self, predict, y):
        cnt = np.sum(predict==y)
        return (cnt/len(y))*100
        

Main function; initialize and train the model.

In [None]:
if __name__=='__main__':
    clr = my_NN() #initialize the model
    clr.train(X_temp, y_temp) # train model 

Iteration and score:  0 57.49999999999999
Iteration and score:  1000 67.5
Iteration and score:  2000 67.5
Iteration and score:  3000 70.0
Iteration and score:  4000 70.0


# References

[1]  F. Tacchino, C. Macchiavello, D. Gerace, and D. Bajoni, *An artificial neuron  implemented  on  an  actual  quantum  processor,* npj  QuantumInformation, vol. 5, no. 1, pp. 1–8, 2019.