In [None]:
from qiskit import *
from qiskit.visualization import *
import scipy
from scipy.optimize import minimize
from qiskit.circuit.library import RealAmplitudes
from qiskit.algorithms.optimizers import COBYLA,SLSQP,SPSA, ADAM
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import random
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import os
import warnings
warnings.filterwarnings("ignore")

qasmsim = BasicAer.get_backend("qasm_simulator")
statevecsim = BasicAer.get_backend("statevector_simulator")
TOTAL_COUNTS= 1024

## Quantum Discriminator

In [None]:
class Discriminator:

    def __init__(self, feature_vector, d_params, num_ansatz_reps):

        self.feature_vector = feature_vector
        self.d_params = d_params
        self.num_ansatz_reps = num_ansatz_reps


    def d_distribution(self):

        num_qubits = len(self.feature_vector)
        qc = QuantumCircuit(num_qubits,1)

        #---------- QUANTUM FEATURE MAP ------------ #
        feature_vector = self.feature_vector
        for qubit, variable in enumerate(feature_vector):
            qc.ry(variable, qubit)
        
        # -------------- ANSATZ --------------- #
        qc.barrier()
        ansatz = RealAmplitudes(num_qubits=num_qubits, 
                                reps=self.num_ansatz_reps,
                                ).assign_parameters(self.d_params)

        # --------------- MEASUREMENT ------------------ #
        qc.append(ansatz, list(range(num_qubits)))
        qc.measure(len(feature_vector)-1 , 0)
        counts = execute(qc, backend=qasmsim, counts=TOTAL_COUNTS).result().get_counts(qc)
        return counts

    def parity_function(self):

        P_d = self.d_distribution()

        # ------------ FIXING QISKIT BUGS ------------ #
        if '0' not in P_d:
            P_d['0']=0
        if '1' not in P_d:
            P_d['1']=0

        # ----------- PARITY FUNCTION <Z> ------------ #
        exp_value =  (P_d['0'] - P_d['1'])/TOTAL_COUNTS                   
        norm_exp_value = (exp_value + 1)/2

        return norm_exp_value

## Quantum Generator

In [None]:
class Generator:

    def __init__(self, latent_vector, g_params, num_ansatz_reps):

        self.latent_vector = latent_vector
        self.g_params = g_params
        self.num_ansatz_reps = num_ansatz_reps

    def g_distribution(self):

        num_qubits = len(self.latent_vector)
        aux_qubits = 1
        total_qubits = num_qubits + aux_qubits
        qc = QuantumCircuit(total_qubits, num_qubits)

        # -------------------- QUANTUM FEATURE MAP -------------------- #
        latent_vector = self.latent_vector
        for qubit, variable in enumerate(latent_vector):
            qc.ry(variable, qubit)
        
        # --------------------------- ANSATZ -------------------------- #
        qc.barrier()
        ansatz = RealAmplitudes(num_qubits=num_qubits, 
                                reps=self.num_ansatz_reps,
                                ).assign_parameters(self.g_params)

        # ----------------- GENERATOR DISTRIBUTION ------------------- #
        qc.append(ansatz, list(range(total_qubits))[1:])
        qc.measure(range(total_qubits)[1:], range(num_qubits))
        counts = execute(qc, backend=qasmsim, shots=TOTAL_COUNTS).result().get_counts(qc)

        # ------------------ SORTED DISTRIBUTION --------------------- #
        strings=['00','01','10','11']

        sorted_counts = {}
        for string in strings:
            if string not in counts:
                counts[string]=0
            sorted_counts[string] = counts[string]

        probs = []
        for key, value in sorted_counts.items():
            probs.append( (2*np.pi* value) / TOTAL_COUNTS)
        
        return probs

## Preparing normalized real data

In [None]:
path = os.path.abspath(os.path.join(os.getcwd(), os.pardir, 'data'))
filename = "irissetosa.data"
data = pd.read_csv(os.path.join(path,filename))
label = data.iloc[:, -1]
features = data.iloc[:, :-1]

x = features.values
min_max_scaler = preprocessing.MinMaxScaler(feature_range=(0, 2*np.pi))
x_scaled = min_max_scaler.fit_transform(x)
features = pd.DataFrame(x_scaled)
data = features.assign(labels = label)

data.head()

## Cross-entropy from Discriminator

In [None]:
def D_crossentropy(disc_params):

    # -------------------- DISCRIMINATOR CROSS-ENTROPY --------------------- #
    d_loss = 0

    for fake_data, real_data in zip(fake_minibatch, real_minibatch):
        # ---------------- PASS FAKE DATA ON DISCRIMINATOR ----------------- #
        fake_y_estimator = Discriminator(feature_vector = fake_data, 
                                        d_params = disc_params, 
                                        num_ansatz_reps = d_num_reps
                                        ).parity_function()

        # ------------------ DISCRIMINATOR LOSS FOR FAKE ------------------- #            
        d_loss_fake = -(np.log(1 - fake_y_estimator))

        # ---------------- PASS REAL DATA ON DISCRIMINATOR ----------------- #
        real_y_estimator = Discriminator(feature_vector = real_data, 
                                         d_params = disc_params, 
                                         num_ansatz_reps = d_num_reps
                                        ).parity_function()

        # ------------------ DISCRIMINATOR LOSS FOR REAL ------------------- #            
        d_loss_real = -(np.log(real_y_estimator))

        # -----------------DISCRIMINATOR CROSS ENTROPY --------------------- #
        d_loss += d_loss_fake + d_loss_real

    d_loss = d_loss/NUM_SAMPLES
    return d_loss


## Cross-entropy from Generator

In [None]:
def G_crossentropy(g_params):
    
    # -------------------- GENERATOR CROSS-ENTROPY --------------------- #
    g_loss = 0    
    for fake_data in fake_minibatch:
        # ---------------- PASS FAKE DATA ON DISCRIMINATOR ----------------- #
        fake_y_estimator = Discriminator(feature_vector = fake_data, 
                                         d_params = optimal_params_discriminator, 
                                         num_ansatz_reps = d_num_reps
                                         ).parity_function()

        # ------------------------ GENERATOR LOSS -------------------------- #
        g_loss = -np.log(fake_y_estimator)

    g_loss = g_loss/NUM_SAMPLES
    return g_loss

In [None]:
def relative_entropy(G, X):

    """Kullback-Leibler divergence"""

    d_kl = 0
    for x in X:
        for g in G:
            #d_kl += sum([(g[i]*np.log(g[i]/x[i]))
            d_kl += sum([(g[i]-x[i])
                            for i in range(len(g))])

    return d_kl

## Quantum GAN

In [None]:
# --------------- PARAMETERS OF THE PROBLEMS -------------- # 
GEN_IT = 1; EPOCHS=100; DISC_IT = 5; NUM_SAMPLES = data.shape[0]
g_num_qubits = 2; d_num_qubits = 4; g_num_reps = 4; d_num_reps = 4
g_initial_guess = np.random.uniform(0, np.pi, g_num_qubits + g_num_qubits*g_num_reps)
d_initial_guess = np.random.uniform(0, np.pi, d_num_qubits + d_num_qubits*d_num_reps)
loss_values_discriminator, loss_values_generator, KL_divergence = [], [], []

# ---------------- PREPARE REAL MINIBATCH  --------------- #    
real_minibatch = []

for m in range(NUM_SAMPLES):        
    # --------------------- SAMPLE FROM REAL DATA ---------------------- #
    real_vector = data.iloc[m, :-1] 
    real_minibatch.append(real_vector)

# ---------------- SETTING THE OPTIMIZERS  --------------- #
disc_optimizer = ADAM(maxiter=DISC_IT, tol=1e-06, lr=0.01, beta_1=0.9, beta_2=0.99, noise_factor=1e-08, eps=1e-10)
gen_optimizer = ADAM(maxiter=GEN_IT, tol=1e-06, lr=0.01, beta_1=0.9, beta_2=0.99, noise_factor=1e-08, eps=1e-10)

for epoch in range(EPOCHS):

    print(f"---------------------------------------------- epoch = {epoch} ----------------------------------------------")
    print( "-------------------------------------------------------------------------------------------------------------")

    # --------------- PREPARE LATENT MINIBATCH  -----------------#
    latent_minibatch = []
    for m in range(NUM_SAMPLES):        
        # -------------------- GENERATE LATENT VECTOR ---------------------- #
        latent_vec = np.random.uniform(0, 2*np.pi, g_num_qubits)
        #latent_vec = [np.pi*np.random.random(), 2*np.pi*np.random.random()]
        latent_minibatch.append(latent_vec)    
    # --------------- PREPARE FAKE MINIBATCH  -----------------#
    fake_minibatch = []
    for m in latent_minibatch:        
        # ---------------- PASS LATENT VECTOR ON GENERATOR ----------------- #
        generator = Generator(latent_vector = m, 
                                g_params = g_initial_guess, 
                                num_ansatz_reps = g_num_reps)
        # -------------------- GENERATE FAKE DISTRIBUTION ------------------ #
        fake_distr = generator.g_distribution()            
        fake_minibatch.append(fake_distr)

    
    # ------------------ TRAIN DISCRIMINATOR --------------------#
    result_d = disc_optimizer.minimize(fun = D_crossentropy,
                                       x0 = d_initial_guess)
    
    optimal_params_discriminator = result_d.x
    d_initial_guess = optimal_params_discriminator
    loss_values_discriminator.append(result_d.fun)


    # ------------------ TRAIN GENERATOR --------------------#
    result_g = gen_optimizer.minimize(fun = G_crossentropy,
                                       x0 = g_initial_guess)
    
    optimal_params_generator = result_g.x
    g_initial_guess = optimal_params_generator
    loss_values_generator.append(result_g.fun)

    # ------------------ RELATIVE ENTROPY -------------------#
    kl_divergence = relative_entropy(fake_minibatch, real_minibatch)
    KL_divergence.append(kl_divergence)


    print(f"Discriminator Loss: {result_d.fun} \t Generator Loss: {result_g.fun} \t KL Divergence: {kl_divergence}")

In [None]:
loss_values_discriminator = loss_values_discriminator/np.linalg.norm(loss_values_discriminator)
loss_values_generator = loss_values_generator/np.linalg.norm(loss_values_generator)

In [None]:
plt.plot(loss_values_discriminator)
plt.plot(loss_values_generator)
plt.ylabel('Loss function')
plt.xlabel("Iteration")
plt.grid()


In [None]:
plt.plot(KL_divergence)
plt.ylabel('Divergence')
plt.xlabel("Iteration")
plt.grid()

In [None]:
NUM_NEW_SAMPLES = 50

new_samples = []

for sample in range(NUM_NEW_SAMPLES):
    # ----------------------- NEW LATENT VECTOR ------------------------ #
    latent_vec = np.random.uniform(0, 2*np.pi, g_num_qubits)

    # ---------------- PASS LATENT VECTOR ON GENERATOR ----------------- #
    generator = Generator(latent_vector = latent_vec, 
                            g_params = optimal_params_generator, 
                            num_ansatz_reps = g_num_reps)

    # -------------------- GENERATE FAKE DISTRIBUTION ------------------ #
    fake_distr = generator.g_distribution()            
    new_samples.append(fake_distr)

print("NEW SAMPLES DERIVED FROM QUANTUM GENERATOR: \n ", np.matrix(new_samples)[:5])


new_data = np.array(new_samples)

sepal_len = [new_data[i][0] for i in range(len(new_data))]
sepal_width = [new_data[i][1] for i in range(len(new_data))]
petal_len = [new_data[i][2] for i in range(len(new_data))]
petal_width = [new_data[i][3] for i in range(len(new_data))]

plt.scatter(sepal_len, petal_len)
#plt.scatter(sepal_width, petal_width)

plt.title("Correlation between variables")
plt.legend(["SL vs PL", "SW vs PW"])

In [None]:
plt.scatter(data.iloc[:, 0].to_numpy(), data.iloc[:, 2].to_numpy())