# Beta neighbours model

In this tutorial we will explain how our beta model works. To do so, we will follow the [beta pipeline](https://github.com/NEASQC/WP6_QNLP/blob/dev/neasqc_wp61/data/data_processing/use_beta_neighbors.py), and we will re-write the classes appearing in [beta folder](https://github.com/NEASQC/WP6_QNLP/tree/dev/neasqc_wp61/models/quantum/beta) so that it can be understood what the model is doing. 

What we have called beta model consists on a quantum implementation of the [1-nearest neighbour classifier](https://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm). Each sentence will have a vector assigned, which will be the probabilities of the post-selected qubits of a trained pre-alpha-lambeq circuit. Therefore, if we have a quantum circuit with two post selected qubits, this will be a 4-dimensional vector in the beta model, containing the probabilities of the states 00, 01, 10, 11 of the two post-selected qubits. 

In [25]:
import sys
import numpy as np 
import json
import qiskit 
from qiskit_aer import AerSimulator
import cmath
sys.path.append("./../../models/quantum/beta/")
sys.path.append("./../../data/data_processing/")
from QuantumKNearestNeighbours import QuantumKNearestNeighbours as qkn
from save_json_output import save_json_output
from collections import Counter

Therefore, the first thing that need to be done is loading the results of a previous pre-alpha-lambeq experiment. We will load the random seed which was used for the pre alpha lambeq experiment, the number of runs of the algorithm performed and the training and testing dataset used. 

In [26]:
input_path = "./../../benchmarking/results/raw/pre_alpha_lambeq_20230727-190241.json"
f = open(input_path)
results_pre_alpha = json.load(f)
seed = results_pre_alpha['input_args']['seed']
runs = results_pre_alpha['input_args']['runs']
train_labels_dir = './../../data/datasets/toy_dataset_train.tsv'
test_labels_dir = './../../data/datasets/toy_dataset_test.tsv'
train_labels = qkn.load_labels(train_labels_dir)
test_labels = qkn.load_labels(test_labels_dir)


As an example, we will use as vectors the first run of our input. We will also set a value k for the number of neighbours we will want to evaluate to assign a prediction. It is recommended to use an odd number in order to avoid ties when selecting the most common label among the neighbors. 

In [27]:
train_vectors = results_pre_alpha['vectors_train'][1]
test_vectors = results_pre_alpha['vectors_test'][1]
ntrain = len(train_vectors)
ntest = len(test_vectors)
k = 5



The quantum part of our algorithm will come when measuring the distances between the vectors. To compute this quantum distance, we have implemented the ***QuantumDistance*** class. What this class does is to perform a [SWAP test](https://en.wikipedia.org/wiki/Swap_test#:~:text=The%20swap%20test%20is%20a,Watrous%2C%20and%20Ronald%20de%20Wolf.) in order to compute the distance between two vectors. More details can be found in the comments of the class below. 

In [28]:
class QuantumDistance:
    """
    Class for implementing a simplistic version of quantum distance
    """
    def __init__(self, x1 : np.array, x2 : np.array) -> None:
        """
        Initialiser of the class

        Parameters
        ----------
        x1 : np.array
            First vector
        x2 : np.array
            Second vector
        """
        self.x1norm = self.normalise_vectors(x1)
        self.x2norm = self.normalise_vectors(x2)
        self.circuit = self.build_circuit(
            self.x1norm, self.x2norm)
        self.counts = self.get_results_qc_shots(self.circuit)
        self.dist = self.euclidean_probability_relation(self.counts)
        self.real_dist = self.euclidean_distance(
            self.x1norm, self.x2norm)

    def normalise_vectors(
        self, x : np.array) -> np.array:
        """
        Normalises a vector [x1, x2] so that (x1**2 + x2**2) =1 

        Parameters 
        ----------
        x : np.array
            Vector we want to normalise
        
        Returns
        -------
        x_norm : np.array
            Normalised vector
        """
        x_norm = np.array([])
        Z = np.sum([x[j]**2 for j in range(len(x))])
        # Normalisation constant
        for i in range(len(x)):
            x_norm = np.append(
                x_norm, x[i]/np.sqrt(Z))
        return x_norm
    
    def euclidean_distance(
        self, x1 : np.array, x2 : np.array
    ) -> float:
        """
        Computes the real euclidean distance between two vectors

        Parameters
        ----------
        x1 : np.array
            First vector
        x2 : np.array
            Second vector
        
        Returns
        -------
        euclidean_distance : float
            The real euclidean distance
        """
        euclidean_distance = np.sqrt(
            np.sum((x1[i]-x2[i])**2 for i in range(len(x1))))
        return euclidean_distance
    
    def build_circuit(
        self, x1 : np.array, x2 : np.array
    ) -> qiskit.QuantumCircuit:
        """
        Builds the circuit with the encoding of the two vectors and the SWAP
        test

        Parameters
        ----------
        x1 : np.array
            First vector
        x2 : np.array
            Second vector

        Returns
        -------
        qc : qiskit.QuantumCircuit
            The circuit implementing the SWAP test    
        """
        theta1 = 2*np.arcsin(x1[1])
        theta2 = 2*np.arcsin(x2[1])

        qc = qiskit.QuantumCircuit(3,1)
        qc.ry(theta1, 1)
        qc.ry(theta2, 2)
        qc.barrier()
        qc.h(0)
        qc.cswap(0,1,2)
        qc.h(0)
        qc.measure(0,0)
        return qc
    
    def get_results_qc_shots(
        self, qc : qiskit.QuantumCircuit, shots = 2**10,
        backend = AerSimulator()
    ) -> dict:
        """
        Gets the results of running the circuit in dictionary format

        Parameters
        ----------
        qc : qiskit.QuantumCircuit
            The circuit we want to analyse
        shots : int, default : 2**10
            The number of shots to perform
        backend : callable, default : AerSimulator
            The quantum backend where circuits are run
        
        Returns
        -------
        counts : dict
            Dictionary containing the number of times each
            state appears
        """
        qc_compiled = qiskit.transpile(qc, backend)
        job = backend.run(qc_compiled, shots = shots)
        results = job.result()
        counts = results.get_counts(qc_compiled)
        return counts
    
    def euclidean_probability_relation(
        self, counts
    ) -> float:
        """
        For normalised vectors in the SWAP test, 
        computes the relation of the probability between 
        obtaining a 0 in the control qubit and the euclidean
        distance.

        Parameters
        ----------
        counts : dict
            Dictionary with the quantum states as keys 
            and the number of times they were obtained as values
        
        Returns
        -------
        dist : float 
            Euclidean distance computed from SWAP test
        """
        if '1' in counts.keys():
            p0 = counts['0']/ (counts['0'] + counts['1'])
        else : 
            p0 = 1
    
        dist = np.sqrt(2 - 2 * abs(cmath.sqrt(2 * p0 -1)))
        return dist
    
    




Once a method for computing the quantum distances has been defined, we will implement the model. First of all, for each test vector we will compute the quantum distance to all the training vectors. 

In [29]:
distances_list = [] 
for i, test_sample in enumerate(test_vectors):
    distances = []
    for train_sample in (train_vectors):
        distances.append(QuantumDistance(test_sample, train_sample).dist)
    distances_list.append(distances)


  np.sum((x1[i]-x2[i])**2 for i in range(len(x1))))


After that, we will compute the indexes of the k-closest vectors to each test vector. 

In [30]:
closest_indexes_list = []
for i in range(ntest):
    closest_indexes = sorted(
        range(len(distances_list[i])), key = lambda j : distances_list[i][j])[:k]
    closest_indexes_list.append(closest_indexes)

Then, we will do majority vote among the k-closest vectors to decide which label is assigned to each test vector.

In [31]:
predictions = []
for i in range(ntest):
    closest_labels = [] 
    for j in closest_indexes_list[i]:
        closest_labels.append(train_labels[j])
    c = Counter(closest_labels)
    label, count = c.most_common()[0]
    predictions.append(label)

Finally, we can compute the accuracy obtained in this case. 

In [32]:
correct_pred = 0
for i,pred in enumerate(predictions):
    if pred == test_labels[i]:
        correct_pred += 1 
print('The accuracy is equal to :', correct_pred/ntest)

The accuracy is equal to : 0.45
