In [None]:
# This notebook is from https://github.com/Quandela/Ascella/blob/main/ClassificationTask.ipynb

# Classifying the iris dataset on Quandela's QPU Ascella

_This notebook was developed by Dario A. Fioretto and Alexia Salavrakos._

In this notebook, we train a model to classify the [iris toy dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_iris.html) using Ascella. We use a variational quantum algorithm (or quantum neural network) as explained in the paper.


As seen in the image below, we use modes 3 to 7 of Ascella for our ansatz. We input three photons in modes 3, 5 and 7 respectively. In a first block, the variational parameters $\theta$ are encoded by phase-shifters. In a second block, the features of the data $x$ are encoded. We then have a third block with more variational parameters $\theta$. Then, we have a block that implements pseudo-PNR on detectors 3 and 7 using modes 1,2 and 8,9 respectively. The lambda parameters are weights that multiply the output probabilities in order to define the observable and build the model.

![alternative text](img/ansatz_classifier.png)

## Imports and configurations

In [None]:
import perceval as pcvl
import numpy as np
import matplotlib.pyplot as plt
import skopt as sk

import scipy
from scipy.stats import poisson

from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

from datetime import datetime
import os
import time
import pandas as pd
from itertools import combinations

In [None]:
# to display the circuit
from perceval.rendering.circuit import DisplayConfig, SymbSkin
DisplayConfig.select_skin(SymbSkin)

Insert token from https://cloud.quandela.com/ here:

In [None]:
qpu_token = ''

Choose whether to run a noisy local simulation or a real experiment on Ascella:

In [None]:
run_on_qpu = True

## Define quantum classifier class

We define a class that contains all the relevant functions as methods: building the ansatz, computing the outputs of the experiment, implementing pseudo-PNR and its mapping, as well as building the classifier estimator from the output probabilities.

In [None]:
class QuantumClassifierOnAscella():
    def __init__(self, dataset, n_classes, run_on_qpu):
        self.dataset = dataset
        self.n_classes = n_classes
        
        # For a test on a very small dataset, use:
        #self.small_set_indices = [0,1,2, 51, 52, 53, 101, 102, 103]
        #self.X = np.array([self.dataset.data[i] for i in self.small_set_indices])
        #self.Y = np.array([self.dataset.target[i] for i in self.small_set_indices])
        
        # Prepare data 
        self.X = self.dataset.data
        self.Y = self.dataset.target
        self.X_angle = self.transform_features_to_phases()
        self.X_train, self.X_test, self.y_train, self.y_test = self.split_dataset(self.X_angle, self.Y)
        
        # Connect to the remote processor and get Ascella circuit
        self.processor = pcvl.RemoteProcessor("qpu:ascella", qpu_token)
        self.circuit = self.processor.specs["specific_circuit"]
        print("Received circuit from Ascella.")
        self.circuit_phases = self.circuit.get_parameters()
        
        # If running a simulation, change processor to a local backend with noisy source
        if not run_on_qpu:
            self.source = pcvl.Source(losses = 0.92, emission_probability=1, multiphoton_component=0,
                                      indistinguishability=0.92)
            self.processor = pcvl.Processor("SLOS", self.circuit, self.source)
        
        # Postselect on observing 3 photon counts (discard photon losses)
        self.processor.set_parameter("mode_post_select", 3)
        
        # Define chip phases and set values of fixed phases
        self.theta_phases = ['phi2', 'phi4', 'phi3', 'phi5', 'phi13',
                             'phi14', 'phi15', 'phi16', 'phi17', 'phi24',
                             'phi25', 'phi26', 'phi27', 'phi35', 'phi36', 'phi38',  # end block 1, 16 params
                             'phi57', 'phi58', 'phi59', 'phi60', 'phi61', 
                             'phi68', 'phi69', 'phi70', 'phi71', 'phi79',
                             'phi80', 'phi81', 'phi82', 'phi83', 'phi90', 'phi92'] # end block 2, 16 params
        self.data_phases = ['phi37', 'phi39', 'phi47', 'phi49'] 
        self.set_fixed_phases()
        
        # Define expected output states and pseudo PNR mapping dictionary
        self.possible_output_states = self.create_dict_output_states_with_pseudo_PNR(n_modes = 5, n_photons = 3,
                                                                                     starting_mode = 2)
        self.correspondance_dict = self.create_correspondance_pseudo_pnr()
        
        # Choose number of samples per experiment
        self.n_samples = 5*1e4
        
        # Define empty lists for the outcomes and probabilities and for the estimator 
        self.outcomes = []
        self.probabilities = []
        self.predicted_class = []
        self.model_output = []
        self.model_output_error = []
        
        # Create folder to save data
        self.path = datetime.now().strftime("%Y%m%d_%H%M%S") + '/'
        os.mkdir(self.path)
        
        
    def split_dataset(self, X, Y):
        # Given the data X and the target Y, split in a training and test set
        X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.25, random_state=42)
        return X_train, X_test, y_train, y_test

    
    def transform_features_to_phases(self):
        # Given the features from data X, reformat them as phases to be encoded by phase shifters
        a = []
        b = []
        for i in range(len(self.X[0])):
            a.append((max(self.X[:, i]) - min(self.X[:, i])) / 2)
            b.append(((max(self.X[:, i]) + min(self.X[:, i])) / 2))

        def sigmoid_Pi(x):
            return np.pi / (1 + np.exp(-x))

        X_angle = sigmoid_Pi((self.X - b) / a)

        return X_angle

    
    def get_ascella_fixed_phases_dict(self):
        # In our approach without transpilation, we set some phases on the chip to pi, or pi/2
        # The phases set to pi prevent photons from escaping to extra modes
        # The phases set to pi/2 enable pseudo-PNR by redirecting photons to some specific modes
        phases_set_to_pi = {'phi6': np.pi, 'phi12': np.pi, 'phi28': np.pi, 'phi34': np.pi, 'phi46': np.pi,
                            'phi48': np.pi, 'phi50': np.pi, 'phi56': np.pi, 'phi72': np.pi, 'phi78': np.pi,
                            'phi94': np.pi, 'phi102': np.pi, 'phi104': np.pi,
                            'phi112': np.pi, 'phi114': np.pi, 'phi122': np.pi, 
                            'phi124': np.pi, 'phi126': np.pi}
        phases_set_to_pi_half = {'phi100': np.pi/2, 'phi110': np.pi/2, 'phi116': np.pi/2, 'phi128': np.pi/2}
        return ({**phases_set_to_pi, **phases_set_to_pi_half})

    
    def get_ascella_theta_phases_dict(self, theta_values):
        # Create a dictionary that assigns a value to the phases corresponding to theta parameters
        theta_phases_dict = dict((self.theta_phases[i], theta_values[i]) for i in range(len(self.theta_phases)))
        return (theta_phases_dict)

    
    def get_ascella_data_phases_dict(self, data_values):
        # Create a dictionary that assigns a value to the phases corresponding to the data encoding
        data_phases_dict = dict((self.data_phases[i], data_values[i]) for i in range(len(self.data_phases)))
        return (data_phases_dict)

    
    def set_fixed_phases(self):
        # Create a dictionary for the fixed phases (set to 0, pi or pi/2)
        # Then set all other phases to 0 for uniformity, with the exception of the data phases
        # Some of these 0s will be replaced by theta parameters in set_unitaries_ascella
        all_phases_ordered = [('phi' + str(i)) for i in range(132)]
        fixed_phases_dict = self.get_ascella_fixed_phases_dict()
        puppet_data_phases_dict = self.get_ascella_data_phases_dict([1 for _ in range(len(self.data_phases))])
        
        for (i, j) in enumerate(all_phases_ordered):
            if j in fixed_phases_dict:
                self.circuit_phases[i].set_value(fixed_phases_dict[j])
            elif j in puppet_data_phases_dict:
                # do nothing
                # keep the data phases non assigned, as they will be sent in batch to the processor
                pass
            else:
                self.circuit_phases[i].set_value(0)

        return 0

    
    def set_unitaries_ascella(self, theta_values):
        # Assign values to the theta parameter phases in the circuit
        theta_phases_dict = self.get_ascella_theta_phases_dict(theta_values)
        all_phases_ordered = [('phi' + str(i)) for i in range(132)]
        for (i, j) in enumerate(all_phases_ordered):
            if j in theta_phases_dict:
                self.circuit_phases[i].set_value(theta_phases_dict[j])

        return 0

    
    def create_dict_output_states(self, n_modes, n_photons, starting_mode = 0):
        # Create a list with the possible output states
        possible_states = []

        for comb in combinations(np.arange(n_modes), n_photons):
            vector_state = [1 if i in comb else 0 for i in range(n_modes)]
            possible_states.append(pcvl.BasicState([0]*starting_mode + vector_state + 
                                                   [0]*(12 - n_modes - starting_mode)))

        return possible_states


    def create_dict_output_states_with_pseudo_PNR(self, n_modes, n_photons, starting_mode = 0):
        # Create a list with the possible output states
        possible_states = []

        for comb in combinations(np.arange(n_modes), n_photons):
            vector_state = [1 if i in comb else 0 for i in range(n_modes)]
            possible_states.append(pcvl.BasicState([0]*starting_mode + vector_state + 
                                                   [0]*(12 - n_modes - starting_mode)))

        # Here we hardcode the extra states from pseudo PNR:
        # 3 photons detected in modes 3/7
        possible_states.append(pcvl.BasicState([0]*2 + [3] + [0]*9))
        possible_states.append(pcvl.BasicState([0]*6 + [3] + [0]*5))
        
        # 2 photons detected in modes 3 + 1 photon elsewhere
        for comb in combinations(np.arange(4), 1):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            possible_states.append(pcvl.BasicState([0]*2 + [2] + vector_comb + [0]*5))
        
        # 2 photons detected in modes 7 + 1 photon elsewhere
        for comb in combinations(np.arange(4), 1):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            possible_states.append(pcvl.BasicState([0]*2 + vector_comb + [2] + [0]*5))

        return possible_states
    
    
    def create_correspondance_pseudo_pnr(self):
        # Prepare a correspondance dictionary for the pseudo PNR
        # The keys are the raw state and the values are the mapped pseudo pnr state 
        # This allows us to do the coarse graining or reinterpretation of the observed states as PNR states
        correspondance_dict = {}
        
        # "Normal" 3 photon coincidences between modes 3 and 7
        states_non_PNR = self.create_dict_output_states(n_modes = 5, n_photons = 3, starting_mode = 2)
        for state in states_non_PNR:
            correspondance_dict[state] = state

        # Hardcoded mapping for pseudo PNR
        # Three photons in detectors 1,2,3 
        # Same in 7,8,9
        correspondance_dict[pcvl.BasicState([1, 1, 1] + [0]*9)] = pcvl.BasicState([0]*2 + [3] + [0]*9)
        correspondance_dict[pcvl.BasicState([0]*6 + [1, 1, 1] + [0]*3)] = pcvl.BasicState([0]*6 + [3] + [0]*5)
        
        # Two photons in detectors 1,2,3 and one photon in detectors 4 to 7 
        for comb in combinations(np.arange(4), 1):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            correspondance_dict[pcvl.BasicState([1, 1, 0] + vector_comb + [0]*5)] = pcvl.BasicState(
                [0, 0, 2] + vector_comb + [0]*5)
            correspondance_dict[pcvl.BasicState([1, 0, 1] + vector_comb + [0]*5)] = pcvl.BasicState(
                [0, 0, 2] + vector_comb + [0]*5)
            correspondance_dict[pcvl.BasicState([0, 1, 1] + vector_comb + [0]*5)] = pcvl.BasicState(
                [0, 0, 2] + vector_comb + [0]*5)
            
        # Two photons in detectors 1,2,3 and one photon in detectors 8 or 9
        correspondance_dict[pcvl.BasicState([1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([0, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0])
                                            
        # Two photons in detectors 7,8,9 and one photon in detectors 3 to 6
        for comb in combinations(np.arange(4), 1):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            correspondance_dict[pcvl.BasicState([0, 0] + vector_comb + [0, 1, 1] + [0]*3)] = pcvl.BasicState(
                [0, 0] + vector_comb + [2, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([0, 0] + vector_comb + [1, 0, 1] + [0]*3)] = pcvl.BasicState(
                [0, 0] + vector_comb + [2, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([0, 0] + vector_comb + [1, 1, 0] + [0]*3)] = pcvl.BasicState(
                [0, 0] + vector_comb + [2, 0, 0] + [0]*3)
            
        # Two photons in detectors 7,8,9 and one photon in detectors 1 or 2
        correspondance_dict[pcvl.BasicState([1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([1, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])
        correspondance_dict[pcvl.BasicState([0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0])] = pcvl.BasicState(
            [0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0])

        # One photon in detector 1,2 and 2 photons in detectors 4,5,6,7
        for comb in combinations(np.arange(4), 2):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            correspondance_dict[pcvl.BasicState([1, 0, 0] + vector_comb + [0]*5)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [0]*5)
            correspondance_dict[pcvl.BasicState([0, 1, 0] + vector_comb + [0]*5)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [0]*5) 
            
        # One photon in detector 8,9 and 2 photons in detectors 3,4,5,6
        for comb in combinations(np.arange(4), 2):
            vector_comb = [1 if i in comb else 0 for i in range(4)]
            correspondance_dict[pcvl.BasicState([0, 0] + vector_comb + [0, 1, 0] + [0]*3)] = pcvl.BasicState(
                [0, 0] + vector_comb + [1, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([0, 0] + vector_comb + [0, 0, 1] + [0]*3)] = pcvl.BasicState(
                [0, 0] + vector_comb + [1, 0, 0] + [0]*3)
            
        
        # Finally, 1 photon in 1,2 and 1 photon in 4,5,6 and 1 photon in 8,9
        for comb in combinations(np.arange(3), 1):
            vector_comb = [1 if i in comb else 0 for i in range(3)]
            correspondance_dict[pcvl.BasicState([1, 0, 0] + vector_comb + [0, 1, 0] + [0]*3)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [1, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([1, 0, 0] + vector_comb + [0, 0, 1] + [0]*3)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [1, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([0, 1, 0] + vector_comb + [0, 1, 0] + [0]*3)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [1, 0, 0] + [0]*3)
            correspondance_dict[pcvl.BasicState([0, 1, 0] + vector_comb + [0, 0, 1] + [0]*3)] = pcvl.BasicState(
                [0, 0, 1] + vector_comb + [1, 0, 0] + [0]*3)
        
        return correspondance_dict
        
    
    def map_pseudo_PNR_states(self, raw_results):
        # Given a list of raw results from the experiment with pseudo PNR
        # Return reinterpreted results
        # For example: 111000000000 is sent to 003000000000
        results_PNR = {}
        
        for raw_state, pnr_state in self.correspondance_dict.items():
            if pnr_state in results_PNR.keys():
                previous_count = results_PNR[pnr_state]
                results_PNR[pnr_state] = previous_count + raw_results[raw_state]
            else:
                results_PNR[pnr_state] = raw_results[raw_state]

        return results_PNR
    
    
    def prepare_batch_data_phases(self, X_train):
        # Given a list of data points to be sent in batch, create a list of dictionaries 
        # Each dictionary contains the phases (e.g. phi_37) and the corresponding values (e.g. 1)
        list_data_phases = []
        for x in X_train:
            data_phases_dict = self.get_ascella_data_phases_dict(x)
            list_data_phases.append(data_phases_dict)
        
        return list_data_phases

    
    def get_results_ascella_batch(self, list_data_phases):
        # Given a batch of data points, run an experiment on Ascella

        # Define initial state: 3 photons
        initial_state = pcvl.BasicState([0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0])

        # Send computation to remote processor
        self.processor.set_circuit(self.circuit)
        self.processor.with_input(initial_state)
        self.processor.set_parameter("parameter_iterator", list_data_phases)
        sampler = pcvl.algorithm.Sampler(self.processor)
        remote_job = sampler.sample_count
        remote_job.name = "vqc_" + datetime.now().strftime("%Y%m%d_%H%M%S")
        remote_job.execute_async(self.n_samples)

        while not remote_job.is_complete:
             time.sleep(0.02)

        # Retrieve results and assign them to outcomes and probabilities
        results = remote_job.get_results()
        for j in range(len(results['results_list'])):
            result_batch_item = self.map_pseudo_PNR_states(results['results_list'][j]['results'])
            counts_list = []
            for i in self.possible_output_states:
                counts_list.append(result_batch_item[i])
            counts_array = np.array(counts_list)
            self.outcomes.append(counts_array)

            probs = counts_array / np.sum(counts_array)
            self.probabilities.append(probs)
        return self.outcomes, self.probabilities
    
    
    def get_results_ascella_all_data_batch(self, theta_values):
        # For given theta parameters, set phases values, then prepare batch of data phases
        # Then call get_results_ascella to run experiment on Ascella
        self.set_unitaries_ascella(theta_values)
        list_data_phases = self.prepare_batch_data_phases(self.X_train)
        self.get_results_ascella(list_data_phases)
        

    def get_results_ascella(self, data_phases):
        # Given a data point, run an experiment (without batch functionality)
        
        # set value to the right phase in the circuit
        data_phases_dict = self.get_ascella_data_phases_dict(data_phases)
        all_phases_ordered = [('phi' + str(i)) for i in range(132)]
        for (i, j) in enumerate(all_phases_ordered):
            if j in data_phases_dict:
                self.circuit_phases[i].set_value(data_phases_dict[j])

        # define initial state and send circuit to remote simulator
        initial_state = pcvl.BasicState([0, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0])

        self.processor.set_circuit(self.circuit)
        self.processor.with_input(initial_state)
        sampler = pcvl.algorithm.Sampler(self.processor)
        remote_job = sampler.sample_count
        remote_job.name = "vqc_" + datetime.now().strftime("%Y%m%d_%H%M%S")
        remote_job.execute_async(self.n_samples)

        while not remote_job.is_complete:
             time.sleep(0.02)

        # Retrieve results and assign them to outcomes and probabilities
        results = remote_job.get_results()
        rr = self.map_pseudo_PNR_states(results['results'])

        counts_list = []
        for i in self.possible_output_states:
            counts_list.append(rr[i])
        counts_array = np.array(counts_list)
        self.outcomes.append(counts_array)

        probs = counts_array / np.sum(counts_array)
        self.probabilities.append(probs)
        return self.outcomes, self.probabilities
    
    
    def get_results_ascella_all_data(self, theta_values):
        # For given theta parameters, set phases values
        # Then call get_results_ascella to run experiment on Ascella point by point (no batch)
        self.set_unitaries_ascella(theta_values)
        data_point_counter = 0
        for x in self.X_train:
            self.get_results_ascella(x)
            data_point_counter = data_point_counter + 1
            if (data_point_counter % 100 == 0):
                print('100 datapoint processed')
        
    
    def class_prediction(self, lambda_values):
        # Given measurement parameters lambda,
        # For the results of an experiment, obtain model prediction
        thresholds = np.linspace(-1, 1, self.n_classes, False)
        
        for i in [proba*100000 for proba in self.probabilities]: # factor added for the optimization
            model_output_data_point = np.sum(lambda_values*i)
            self.model_output.append(model_output_data_point)
            #print(i)
            if model_output_data_point <= thresholds[1]:
                self.predicted_class.append(0.)
            elif model_output_data_point > thresholds[-1]:
                self.predicted_class.append(float(self.n_classes - 1))
            else:
                for j in range(1, self.n_classes - 1):
                    if thresholds[j] < model_output_data_point <= thresholds[j + 1]:
                        self.predicted_class.append(j)
                        
    
    def class_prediction_with_error(self, lambda_values):
        # Given measurement parameters lambda,
        # For the results of an Ascella experiment, obtain model prediction
        # With an error estimated from counts
        thresholds = np.linspace(-1, 1, self.n_classes, False)
        
        for prob_index, proba in enumerate (self.probabilities):
            i = proba*100000 # factor added for the optimization
            distribution_counts = self.outcomes[prob_index]
            model_output_data_point = np.sum(lambda_values*i)
            model_output_error_data_point = [(lambda_values[j]**2)*(poisson.std(distribution_counts[j])**2)
                                             for j in range(len(lambda_values))]
            
            self.model_output.append(model_output_data_point)
            self.model_output_error.append(np.sqrt(np.sum(model_output_error_data_point)))
            
            if model_output_data_point <= thresholds[1]:
                self.predicted_class.append(0.)
            elif model_output_data_point > thresholds[-1]:
                self.predicted_class.append(float(self.n_classes - 1))
            else:
                for j in range(1, self.n_classes - 1):
                    if thresholds[j] < model_output_data_point <= thresholds[j + 1]:
                        self.predicted_class.append(j)

                        
    def cost_estimator(self, lambda_values):
        # Given measurement parameters lambda and results from Ascella experiment, 
        # Compute all predictions and compute accuracy and cost
        cost = 0
        self.class_prediction(lambda_values)
        y_predicted = np.array(self.predicted_class)
        accuracy = accuracy_score(self.y_train, y_predicted)
        cost = 1 - accuracy
        self.predicted_class = []
        return cost
    
    
    def reset_outcomes(self):
        # Reset probabilities, outcomes, predictions before running another experiment
        self.probabilities = []
        self.outcomes = []
        self.predicted_class = []
        self.model_output = []
        self.model_output_error = []
        self.predicted_class = []
        


## Train the model

We can now define the classifier using the iris dataset.

In [None]:
Classifier = QuantumClassifierOnAscella(dataset = datasets.load_iris(), n_classes = 3, run_on_qpu = run_on_qpu)

Print the Ascella circuit below.

Notice that several phases are already initialised in the definition of the QuantumClassifierOnAscella class.

In order to train the model faster, we do not use transpilation in this notebook. This means we hardcode the phases that we use to encode the data and the chip parameters. This is done in the definition of the QuantumClassifierOnAscella class (see variables theta_phases and data_phases).

In [None]:
pcvl.pdisplay(Classifier.circuit)

Now we can launch the optimisation.

As explained in the article, we optimise separately the chip parameters ($\theta$) and the measurements parameters ($\lambda$). 

We use different optimisers for each set. The theta optimisation is done via Gaussian processes, while the lambda optimisation is done via Nelder-Mead.

In [None]:
theta_optimisation_space = [sk.space.space.Real(0, 2*np.pi) for _ in range(len(Classifier.theta_phases))]

bounds=[]
for i in range(len(Classifier.possible_output_states)):
    bounds.append((-4, 4))

theta_optimizer = sk.Optimizer(theta_optimisation_space, "GP", acq_func="EI", acq_optimizer="sampling",
                               initial_point_generator="lhs")

best_accuracy = 0
best_params = []
best_iteration = 0

# Split the data - each batch will be sent as one job to the QPU 
# The split is needed in order to stay below the runtime limit
n_splits = 4
split_data = np.array_split(Classifier.X_train, n_splits)

for i in range(15):
    print("Iteration " + str(i))
    next_theta = theta_optimizer.ask()
    
    start_watch = time.time()
    
    if run_on_qpu:
        Classifier.set_unitaries_ascella(next_theta)
        # send data to qpu with split
        for data_batch in split_data:
            list_data_phases = []
            list_data_phases = Classifier.prepare_batch_data_phases(data_batch)
            Classifier.get_results_ascella_batch(list_data_phases)
    else:
        Classifier.get_results_ascella_all_data(next_theta) 
    
    end_watch = time.time()
    print('Obtained results from experiment in ' + str(round(end_watch - start_watch)) + ' seconds.')
    
    start_watch = time.time()
    opt_lambda = scipy.optimize.minimize(Classifier.cost_estimator,
                                         x0 = [0.0]*len(Classifier.possible_output_states),
                                         method = "Nelder-Mead", bounds=bounds)
    end_watch = time.time()
    print('Lambda optimization done in ' + str(round(end_watch - start_watch)) + ' seconds.')

    print('Objective function:' + str(opt_lambda.fun))
    
    # keep best accuracy
    if (1 - opt_lambda.fun) > best_accuracy:
        best_accuracy = (1 - opt_lambda.fun)
        best_params = [*next_theta, *opt_lambda["x"]]
        best_iteration = i
    
    # write to output files
    outcome_report = {}
    for j in range(len(Classifier.outcomes)):
        outcome_report["Counts" + str(j)] = Classifier.outcomes[j]
        outcome_report["Prob" + str(j)] = Classifier.probabilities[j]

    parameter_report = {"Parameters": [*next_theta, *opt_lambda["x"]]}
    iteration_report = {"Time": [datetime.now().strftime("%Y%m%d_%H%M%S")],
                        "Accuracy": [1 - opt_lambda.fun], "Cost": [opt_lambda.fun]}
    df_iteration = pd.DataFrame(data = iteration_report)
    df_iteration.to_csv(Classifier.path + "Iteration" + str(i) + ".csv", index = False)
    df_outcome = pd.DataFrame(data = outcome_report)
    df_outcome.to_csv(Classifier.path + "Outcomes" + str(i) + ".csv", index = False)
    df_parameters = pd.DataFrame(data = parameter_report)
    df_parameters.to_csv(Classifier.path + "Parameters" + str(i) + ".csv", index = False)
    
    # feed round to sk.Optimizer and reset
    res = theta_optimizer.tell(next_theta, opt_lambda.fun)
    Classifier.reset_outcomes()
    
print('Best iteration is ' + str(best_iteration) + ' with accuracy ' + str(best_accuracy) )

## Evaluate the model on test set

In [None]:
n_splits_tests = 2
split_data_test = np.array_split(Classifier.X_test, n_splits_tests)

# Set theta parameters to the best ones and run
if run_on_qpu:
    Classifier.set_unitaries_ascella(best_params[:len(Classifier.theta_phases)])
    # send data to qpu with split
    for data_batch_test in split_data_test:
        list_data_phases = []
        list_data_phases = Classifier.prepare_batch_data_phases(data_batch_test)
        Classifier.get_results_ascella_batch(list_data_phases)
else:
    Classifier.set_unitaries_ascella(best_params[:len(Classifier.theta_phases)])
    for x in Classifier.X_test:
        Classifier.get_results_ascella(x)

Classifier.class_prediction(best_params[len(Classifier.theta_phases):])
y_predicted_test = np.array(Classifier.predicted_class)
accuracy_test = accuracy_score(Classifier.y_test, y_predicted_test)
print("Test set accuracy is: " + str(accuracy_test))

# Write to output files
outcome_report = {}
for j in range(len(Classifier.outcomes)):
    outcome_report["Counts" + str(j)] = Classifier.outcomes[j]
    outcome_report["Prob" + str(j)] = Classifier.probabilities[j]

parameter_report = {"Parameters": best_params}
iteration_report = {"Time": [datetime.now().strftime("%Y%m%d_%H%M%S")],
                    "Accuracy": [accuracy_test], "Cost": [1 - accuracy_test]}
df_iteration = pd.DataFrame(data = iteration_report)
df_iteration.to_csv(Classifier.path + "Iteration" + "_test" + ".csv", index = False)
df_outcome = pd.DataFrame(data = outcome_report)
df_outcome.to_csv(Classifier.path + "Outcomes" + "_test" + ".csv", index = False)
df_parameters = pd.DataFrame(data = parameter_report)
df_parameters.to_csv(Classifier.path + "Parameters" + "_test" + ".csv", index = False)

# Reset
Classifier.reset_outcomes()

## Reproduce plots from article and get confusion matrices

This can be executed independently of the optimisation as it recovers results saved in csv file. 

Change the path as needed.

**Train set**

In [None]:
best_params_train_path = Classifier.path + "Parameters" + str(best_iteration) + ".csv"
best_outcomes_train_path = Classifier.path + "Outcomes" + str(best_iteration) + ".csv"

# If needed: change path and redefine Classifier class
# Classifier = QuantumClassifierOnAscella(dataset = datasets.load_iris(), n_classes = 3)

In [None]:
best_params_csv = pd.read_csv(best_params_train_path).squeeze("columns")
best_params = [i for i in best_params_csv.values.tolist()]

outcomes_df = pd.read_csv(best_outcomes_train_path)
outcomes_array = np.array(outcomes_df).transpose()

outcomes_from_csv = []
probs_from_csv = []
for i in range(0, outcomes_array.shape[0], 2):
    outcomes_from_csv.append(outcomes_array[i])
    probs_from_csv.append(outcomes_array[i + 1])
    
Classifier.outcomes = outcomes_from_csv
Classifier.probabilities = probs_from_csv

In [None]:
Classifier.class_prediction_with_error(best_params[len(Classifier.theta_phases):])
y_predicted_train = np.array(Classifier.predicted_class)
accuracy_train = accuracy_score(Classifier.y_train, y_predicted_train)
cm = confusion_matrix(Classifier.y_train, y_predicted_train)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [None]:
f = plt.figure()
f.set_figwidth(10)
f.set_figheight(8)

# Color codes
light_purple_code = '#efdeff'
dark_purple_code = '#af5cff'
light_blue_code = '#cce2ff'
dark_blue_code = '#516d91'

point_color_dict = {0:dark_purple_code, 1:dark_blue_code, 2:'black'}
thresholds = np.linspace(-1, 1, Classifier.n_classes, False)

for i in range(len(Classifier.model_output)):
    plt.errorbar(x = i, 
                y = Classifier.model_output[i],
                yerr = (Classifier.model_output_error[i]), 
                c = point_color_dict[Classifier.y_train.tolist()[i]],
                fmt = '.')

plt.hlines(thresholds, xmin = 0, xmax = len(Classifier.model_output), color = 'grey', linestyle = 'dashed')
plt.fill_between([0, 112], [thresholds[0], thresholds[0]], [thresholds[1], thresholds[1]], facecolor = light_purple_code)
plt.fill_between([0, 112], [thresholds[1], thresholds[1]], [thresholds[2], thresholds[2]], facecolor = light_blue_code)
plt.fill_between([0, 112], [thresholds[2], thresholds[2]], [1, 1], facecolor = 'lightgrey')

plt.xlim([0,len(Classifier.model_output)])
plt.ylim([-1,1])

plt.ylabel(r'Class Estimator', fontsize = 18)
plt.xlabel('')
plt.xticks([])

plt.rcParams.update({
    'savefig.dpi': 300,  # to adjust notebook inline plot size
})

plt.show()

In [None]:
Classifier.reset_outcomes()

**Test set**

In [None]:
best_params_test_path = Classifier.path + "Parameters" + "_test" + ".csv"
best_outcomes_test_path = Classifier.path + "Outcomes" + "_test" + ".csv"

# If needed: change path and redefine Classifier class
# Classifier = QuantumClassifierOnAscella(dataset = datasets.load_iris(), n_classes = 3)

In [None]:
best_params_csv = pd.read_csv(best_params_test_path).squeeze("columns")
best_params = [i for i in best_params_csv.values.tolist()]

outcomes_df = pd.read_csv(best_outcomes_test_path)
outcomes_array = np.array(outcomes_df).transpose()

outcomes_from_csv = []
probs_from_csv = []
for i in range(0, outcomes_array.shape[0], 2):
    outcomes_from_csv.append(outcomes_array[i])
    probs_from_csv.append(outcomes_array[i + 1])
    
Classifier.outcomes = outcomes_from_csv
Classifier.probabilities = probs_from_csv

In [None]:
Classifier.class_prediction_with_error(best_params[len(Classifier.theta_phases):])
y_predicted_test = np.array(Classifier.predicted_class)
accuracy_test = accuracy_score(Classifier.y_test, y_predicted_test)
cm = confusion_matrix(Classifier.y_test, y_predicted_test)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot()
plt.show()

In [None]:
f = plt.figure()
f.set_figwidth(10)
f.set_figheight(8)

# Color codes
light_purple_code = '#efdeff'
dark_purple_code = '#af5cff'
light_blue_code = '#cce2ff'
dark_blue_code = '#516d91'

point_color_dict = {0:dark_purple_code, 1:dark_blue_code, 2:'black'}
thresholds = np.linspace(-1, 1, Classifier.n_classes, False)

for i in range(len(Classifier.model_output)):
    plt.errorbar(x = i, 
                y = Classifier.model_output[i],
                yerr = (Classifier.model_output_error[i]), 
                c = point_color_dict[Classifier.y_test.tolist()[i]],
                fmt = '.')

plt.hlines(thresholds, xmin = 0, xmax = len(Classifier.model_output), color = 'grey', linestyle = 'dashed')
plt.fill_between([0, 112], [thresholds[0], thresholds[0]], [thresholds[1], thresholds[1]], facecolor=light_purple_code)
plt.fill_between([0, 112], [thresholds[1], thresholds[1]], [thresholds[2], thresholds[2]], facecolor=light_blue_code)
plt.fill_between([0, 112], [thresholds[2], thresholds[2]], [1, 1], facecolor='lightgrey')

plt.xlim([0,len(Classifier.model_output)])
plt.ylim([-1,1])

plt.ylabel(r'Class Estimator', fontsize = 18)
plt.xlabel('')
plt.xticks([])

plt.rcParams.update({
    'savefig.dpi': 300,  # to adjust notebook inline plot size
})

plt.show()