In [1]:
import cirq
import sympy
import numpy as np
import tensorflow as tf
import tensorflow_quantum as tfq

In [2]:
from cirq.contrib.svg import SVGCircuit

In [None]:
%run ./Classifiers/Rescaling.ipynb

In [None]:
#Create custom accuracy
@tf.function
def custom_accuracy(y_true, y_pred):
    y_true = tf.squeeze(y_true)
    y_pred = tf.map_fn(lambda x: 1.0 if x >= 0 else -1.0, y_pred)
    return tf.keras.backend.mean(tf.keras.backend.equal(y_true, y_pred))

In [7]:
class PQC_Readout_Classifier :
    
    def __init__(self) :
        self.x = None #store data points
        self.x_tfcirc = None #store data embedding circuits
        self.y = None #store labels
        self.n_qubits = 0 #store numbers of qubits
        self.data_qubits = None #store data qubits
        self.readout_qubit = None #store readout qubit
        self.readout = None #store readout observable
        self.circuit = None #store circuit
        self.model = None #store model
        self.results = None #store results
    
    def set_training_data(self,x,y) :
        if np.max(x) != 1. or np.min(x) != -1. :
            self.x = rescale(x, x_min = -1., x_max = 1.)
        else :
            self.x = x.copy()
            
        if np.min(y) == 0. :
            self.y = y.copy()*2 - 1
        else :
            self.y = y.copy()
        
        self.n_qubits = len(x[0])
        self.data_qubits = cirq.GridQubit.rect(1,self.n_qubits)
        
        #create embedding circuits
        x_circ = []
        for point in self.x :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.data_qubits[i]))
            x_circ.append(embed)
        self.x_tfcirc = tfq.convert_to_tensor(x_circ)
        
    
    def create_circuit(self,n_layer) :
        """Creates PQC."""
        self.readout_qubit = cirq.GridQubit(-1,-1)
        self.readout = cirq.Z(self.readout_qubit)
        self.circuit = cirq.Circuit()
        
        self.circuit.append(cirq.H(self.readout_qubit))
        
        for i in range(n_layer):
            for gate,prefix in zip([cirq.XX,cirq.YY,cirq.ZZ],["xx","yy","zz"]):
                for j, qubit in enumerate(self.data_qubits):
                    symbol = sympy.Symbol(prefix + '_' + str(i) + '_' + str(j))
                    self.circuit.append(gate(qubit, self.readout_qubit)**symbol)
    
    def fit(self,epochs,batch_size,verbose) :
        #Create training model
        self.model = tf.keras.Sequential([tf.keras.layers.Input(shape=(), dtype = tf.string),
                                          tfq.layers.PQC(self.circuit,self.readout)])
        
        self.model.compile(loss = tf.losses.mse,
                           optimizer = tf.keras.optimizers.Adam(),
                           metrics=[custom_accuracy])
        
        #self.model.compile(loss = tf.keras.losses.Hinge(),
        #                   optimizer = tf.keras.optimizers.Adam(),
        #                   metrics=[custom_accuracy])

        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def more_epoch(self, epochs,batch_size,verbose = 1):
        """Add epoch to training"""
        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def fit_summary(self) :
        plt.plot(self.history.history['loss'],label="loss")
        plt.legend()
        plt.show()
    
    def training(self,x,y,n_layer,epochs,batch_size,verbose=1) :
        print("Data embedding")
        self.set_training_data(x,y)
        print("Creating circuit")
        self.create_circuit(n_layer)
        print("Start of training")
        self.fit(epochs,batch_size,verbose)
        print("Training completed")
    
    def predict_val(self,x_cl) :
        if len(x_cl[0]) != len(self.x[0]):
            print("Test data do not have the same number of features as trainign data")
            return
        
        if np.max(x_cl) != 1. or np.min(x_cl) != -1. :
            x_cl_r = rescale(x_cl, x_min = -1., x_max = 1.)
        else :
            x_cl_r = x_cl.copy()
        
        #Creates circuit embedding for test data
        x_cl_circ = []
        for point in x_cl_r :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.data_qubits[i]))
            x_cl_circ.append(embed)
        x_cl_tfcirc = tfq.convert_to_tensor(x_cl_circ)
        return self.model.call(x_cl_tfcirc).numpy()
    
    def predict(self, x_cl) :
        vals = self.predict_val(x_cl)
        return np.array([int(el > 0) for el in vals])

In [37]:
class PQC_Convolutional_Classifier :
    
    def __init__(self) :
        self.x = None #store training data
        self.x_tfcirc = None #store embedding circuit
        self.y = None #store training label
        self.n_qubits = 0 #store number of qubits
        self.symbols = () #store symbols
        self.qubits = None #store qubits
        self.readout = None #store readout operator
        self.model_circuit = cirq.Circuit() #store circuit
        self.model = None #store model
        self.results = None #store results
    
    def set_training_data(self,x,y) :
        if np.max(x) != 1. or np.min(x) != -1. :
            self.x = rescale(x, x_min = -1., x_max = 1.)
        else :
            self.x = x.copy()
            
        if np.min(y) == 0. :
            self.y = y.copy()*2 - 1
        else :
            self.y = y.copy()
        
        self.n_qubits = len(x[0])
        self.qubits = cirq.GridQubit.rect(1,self.n_qubits)
        
        #Create embedding circuits
        x_circ = []
        for point in self.x :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.qubits[i]))
            x_circ.append(embed)
        self.x_tfcirc = tfq.convert_to_tensor(x_circ)
    
    def one_qubit_unitary(self,bit, symbols):
        return cirq.Circuit(cirq.X(bit)**symbols[0],cirq.Y(bit)**symbols[1],cirq.Z(bit)**symbols[2])

    def two_qubit_unitary(self,bits, symbols):
        circuit = cirq.Circuit()
        circuit += self.one_qubit_unitary(bits[0], symbols[0:3])
        circuit += self.one_qubit_unitary(bits[1], symbols[3:6])
        circuit += [cirq.ZZ(*bits)**symbols[6]]
        circuit += [cirq.YY(*bits)**symbols[7]]
        circuit += [cirq.XX(*bits)**symbols[8]]
        circuit += self.one_qubit_unitary(bits[0], symbols[9:12])
        circuit += self.one_qubit_unitary(bits[1], symbols[12:15])
        return circuit


    def two_qubit_pool(self,source_qubit, sink_qubit, symbols):
        pool_circuit = cirq.Circuit()
        sink_basis_selector = self.one_qubit_unitary(sink_qubit, symbols[0:3])
        source_basis_selector = self.one_qubit_unitary(source_qubit, symbols[3:6])
        pool_circuit.append(sink_basis_selector)
        pool_circuit.append(source_basis_selector)
        pool_circuit.append(cirq.CNOT(control=source_qubit, target=sink_qubit))
        pool_circuit.append(sink_basis_selector**-1)
        return pool_circuit
    
    def quantum_conv_circuit(self,bits, symbols):
        circuit = cirq.Circuit()
        if len(bits) == 2 :
            circuit += self.two_qubit_unitary(bits,symbols)
            return circuit
        if len(bits)%2 == 0 :
            zip1 = zip(bits[0::2], bits[1::2])
            n_zip1 = min(len(bits[0::2]),len(bits[1::2]))
            zip2 = zip(bits[1::2], bits[2::2] + [bits[0]])
        else :
            zip1 = zip(bits[0::2],bits[1::2] + [bits[0]])
            n_zip1 = min(len(bits[0::2]),len(bits[1::2])+1)
            zip2 = zip(bits[1::2], bits[2::2])
            
        for ind,(first,second) in enumerate(zip1):
            circuit += self.two_qubit_unitary([first,second], symbols[15*ind:15*(ind+1)])
        for ind,(first,second) in enumerate(zip2):
            circuit += self.two_qubit_unitary([first,second], symbols[15*(n_zip1 + ind):15*(n_zip1 + ind + 1)])
        return circuit
    
    def quantum_pool_circuit(self,source_bits, sink_bits, symbols):
        circuit = cirq.Circuit()
        for ind,(source, sink) in enumerate(zip(source_bits, sink_bits)):
            circuit += self.two_qubit_pool(source, sink, symbols[ind * 6:(ind + 1) * 6])
        return circuit
    
    def create_model_circuit(self,qubits):
        n_qb = len(qubits)
        if n_qb == 1 :
            self.readout = qubits[0]
            self.readout_op = cirq.Z(self.readout)
            return
        symbols = sympy.symbols('qconv_'+str(self.sym_lay)+'_0:'+str(15*n_qb+6*int(n_qb/2)))
        self.model_circuit += self.quantum_conv_circuit(qubits,symbols[0:15*n_qb])
        self.model_circuit += self.quantum_pool_circuit(qubits[:n_qb//2],
                                                        qubits[n_qb//2:n_qb//2*2],
                                                        symbols[15*n_qb:])
        self.sym_lay += 1
        self.symbols += symbols
        self.create_model_circuit(qubits[n_qb//2:])

    
    def fit(self,epochs,batch_size,verbose) :
        self.model = tf.keras.Sequential([tf.keras.layers.Input(shape=(), dtype = tf.string),
                                          tfq.layers.PQC(self.model_circuit,self.readout_op)])
        
        self.model.compile(loss = tf.losses.mse,
                           optimizer = tf.keras.optimizers.Adam(),
                           metrics=[custom_accuracy])
        
        #self.model.compile(loss = tf.keras.losses.Hinge(),
        #                   optimizer = tf.keras.optimizers.Adam(),
        #                   metrics=[custom_accuracy])

        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def more_epoch(self, epochs,batch_size,verbose = 1):
        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def fit_summary(self) :
        plt.plot(self.history.history['loss'],label="loss")
        plt.legend()
        plt.show()
    
    def training(self,x,y,epochs,batch_size,verbose=1) :
        print("Data embedding")
        self.set_training_data(x,y)
        print("Creating circuit")
        self.model_circuit = cirq.Circuit()
        self.sym_lay = 0
        self.create_model_circuit(self.qubits)
        print("Start of training")
        self.fit(epochs,batch_size,verbose)
        print("Training completed")
    
    def predict_val(self,x_cl) :
        if len(x_cl[0]) != len(self.x[0]):
            print("Test data do not have the same number of features as trainign data")
            return
        
        if np.max(x_cl) != 1. or np.min(x_cl) != -1. :
            x_cl_r = rescale(x_cl, x_min = -1., x_max = 1.)
        else :
            x_cl_r = x_cl.copy()
        
        x_cl_circ = []
        for point in x_cl_r :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.qubits[i]))
            x_cl_circ.append(embed)
        x_cl_tfcirc = tfq.convert_to_tensor(x_cl_circ)
        return self.model.call(x_cl_tfcirc).numpy()
    
    def predict(self, x_cl) :
        vals = self.predict_val(x_cl)
        return np.array([int(el > 0) for el in vals])

In [None]:
class PQC_Convolutional_Hybrid_Classifier :
    
    def __init__(self) :
        self.x = None
        self.x_tfcirc = None
        self.y = None
        self.n_qubits = 0
        self.symbols = ()
        self.qubits = None
        self.readout = None
        self.model_circuit = cirq.Circuit()
        self.model = None
        self.results = None
    
    def set_training_data(self,x,y) :
        if np.max(x) != 1. or np.min(x) != -1. :
            self.x = rescale(x, x_min = -1., x_max = 1.)
        else :
            self.x = x.copy()
            
        if np.min(y) == 0. :
            self.y = y.copy()*2 - 1
        else :
            self.y = y.copy()
        
        self.n_qubits = len(x[0])
        self.qubits = cirq.GridQubit.rect(1,self.n_qubits)
        
        x_circ = []
        for point in self.x :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.qubits[i]))
            x_circ.append(embed)
        self.x_tfcirc = tfq.convert_to_tensor(x_circ)
    
    def one_qubit_unitary(self,bit, symbols):
        return cirq.Circuit(cirq.X(bit)**symbols[0],cirq.Y(bit)**symbols[1],cirq.Z(bit)**symbols[2])

    def two_qubit_unitary(self,bits, symbols):
        circuit = cirq.Circuit()
        circuit += self.one_qubit_unitary(bits[0], symbols[0:3])
        circuit += self.one_qubit_unitary(bits[1], symbols[3:6])
        circuit += [cirq.ZZ(*bits)**symbols[6]]
        circuit += [cirq.YY(*bits)**symbols[7]]
        circuit += [cirq.XX(*bits)**symbols[8]]
        circuit += self.one_qubit_unitary(bits[0], symbols[9:12])
        circuit += self.one_qubit_unitary(bits[1], symbols[12:15])
        return circuit


    def two_qubit_pool(self,source_qubit, sink_qubit, symbols):
        pool_circuit = cirq.Circuit()
        sink_basis_selector = self.one_qubit_unitary(sink_qubit, symbols[0:3])
        source_basis_selector = self.one_qubit_unitary(source_qubit, symbols[3:6])
        pool_circuit.append(sink_basis_selector)
        pool_circuit.append(source_basis_selector)
        pool_circuit.append(cirq.CNOT(control=source_qubit, target=sink_qubit))
        pool_circuit.append(sink_basis_selector**-1)
        return pool_circuit
    
    def quantum_conv_circuit(self,bits, symbols):
        circuit = cirq.Circuit()
        if len(bits) == 2 :
            circuit += self.two_qubit_unitary(bits,symbols)
            return circuit
        if len(bits)%2 == 0 :
            zip1 = zip(bits[0::2], bits[1::2])
            n_zip1 = min(len(bits[0::2]),len(bits[1::2]))
            zip2 = zip(bits[1::2], bits[2::2] + [bits[0]])
        else :
            zip1 = zip(bits[0::2],bits[1::2] + [bits[0]])
            n_zip1 = min(len(bits[0::2]),len(bits[1::2])+1)
            zip2 = zip(bits[1::2], bits[2::2])
            
        for ind,(first,second) in enumerate(zip1):
            circuit += self.two_qubit_unitary([first,second], symbols[15*ind:15*(ind+1)])
        for ind,(first,second) in enumerate(zip2):
            circuit += self.two_qubit_unitary([first,second], symbols[15*(n_zip1 + ind):15*(n_zip1 + ind + 1)])
        return circuit
    
    def quantum_pool_circuit(self,source_bits, sink_bits, symbols):
        circuit = cirq.Circuit()
        for ind,(source, sink) in enumerate(zip(source_bits, sink_bits)):
            circuit += self.two_qubit_pool(source, sink, symbols[ind * 6:(ind + 1) * 6])
        return circuit
    
    def create_model_circuit(self,qubits,n_layer):
        n_qb = len(qubits)
        if n_layer == 0 :
            self.readout = qubits
            self.readout_op = [cirq.Z(q) for q in qubits]
            self.n_input_classical = len(self.readout_op)
            return
        if n_qb == 0:
            print("Error : too many layers")
            return
        symbols = sympy.symbols('qconv_'+str(self.sym_lay)+'_0:'+str(15*n_qb+6*int(n_qb/2)))
        self.model_circuit += self.quantum_conv_circuit(qubits,symbols[0:15*n_qb])
        self.model_circuit += self.quantum_pool_circuit(qubits[:n_qb//2],
                                                        qubits[n_qb//2:n_qb//2*2],
                                                        symbols[15*n_qb:])
        self.sym_lay += 1
        self.symbols += symbols
        self.create_model_circuit(qubits[n_qb//2:],n_layer-1)

    
    def fit(self,n_dense,epochs,batch_size,verbose) :
        self.model = tf.keras.Sequential([tf.keras.layers.Input(shape=(), dtype = tf.string),
                                          tfq.layers.PQC(self.model_circuit,self.readout_op),
                                          tf.keras.layers.Dense(n_dense),
                                          tf.keras.layers.Dense(1)])

        
        self.model.compile(loss = tf.losses.mse,
                           optimizer = tf.keras.optimizers.Adam(),
                           metrics=[custom_accuracy])
        
        #self.model.compile(loss = tf.keras.losses.Hinge(),
        #                   optimizer = tf.keras.optimizers.Adam(),
        #                   metrics=[custom_accuracy])

        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def more_epoch(self, epochs,batch_size,verbose = 1):
        self.history = self.model.fit(self.x_tfcirc, self.y,
                                      batch_size = batch_size,
                                      epochs = epochs,
                                      verbose = verbose)
        
    def fit_summary(self) :
        plt.plot(self.history.history['loss'],label="loss")
        plt.legend()
        plt.show()
    
    def training(self, x, y, n_layer, n_dense, epochs, batch_size, verbose=1) :
        print("Data embedding")
        self.set_training_data(x,y)
        print("Creating circuit")
        self.model_circuit = cirq.Circuit()
        self.sym_lay = 0
        self.create_model_circuit(self.qubits,n_layer)
        print("Start of training")
        self.fit(n_dense,epochs,batch_size,verbose)
        print("Training completed")
    
    def predict_val(self,x_cl) :
        if len(x_cl[0]) != len(self.x[0]):
            print("Test data do not have the same number of features as trainign data")
            return
        
        if np.max(x_cl) != 1. or np.min(x_cl) != -1. :
            x_cl_r = rescale(x_cl, x_min = -1., x_max = 1.)
        else :
            x_cl_r = x_cl.copy()
        
        x_cl_circ = []
        for point in x_cl_r :
            embed = cirq.Circuit()
            for i,value in enumerate(point):
                angle = np.sign(value)*np.arcsin(np.sqrt(np.abs(value)))
                embed.append(cirq.rx(angle)(self.qubits[i]))
            x_cl_circ.append(embed)
        x_cl_tfcirc = tfq.convert_to_tensor(x_cl_circ)
        return self.model.call(x_cl_tfcirc).numpy()
    
    def predict(self, x_cl) :
        vals = self.predict_val(x_cl)
        return np.array([int(el > 0) for el in vals])