In [None]:
import numpy as np

import tensorflow as tf

from quask.core import Ansatz, Kernel, KernelFactory, KernelType
from quask.core_implementation.pennylane_kernel import PennylaneKernel
from quask.evaluator import HaarEvaluator, LieRankEvaluator

from sklearn.svm import SVC
from sklearn.metrics import accuracy_score

import matplotlib.pyplot as plt

# Importance of Kernel Bandwidth in QML

## Fashion-MNIST Dataset

The Fashion-MNIST dataset is an image classification for distinguishing clothing items. We follow the preprocessing of the input data as done in [The power of data in Quantum Machine Learning](https://arxiv.org/abs/2011.01938). We use Principal Component Analysis (PCA) to reduce each image to an $n$-dimensional vector, the so-called **feature** vector.

In [None]:
import tensorflow as tf
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

# Rescale the images from [0,255] to the [0.0,1.0] range.
x_train, x_test = x_train/255.0, x_test/255.0

print("Number of original training examples:", len(x_train))
print("Number of original test examples:", len(x_test))

def filter_03(x, y):
    keep = (y == 0) | (y == 3)
    x, y = x[keep], y[keep]
    y = y == 0
    return x,y

x_train, y_train = filter_03(x_train, y_train)
x_test, y_test = filter_03(x_test, y_test)

print("Number of filtered training examples:", len(x_train))
print("Number of filtered test examples:", len(x_test))

print(y_train[0])

plt.imshow(x_train[0, :, :])
plt.colorbar()

In [None]:
# =============================================================================
# Import modules
# =============================================================================
# Import general purpose module(s)
import numpy as np

# Import specific purpose module(s)
import tensorflow as tf 

# =============================================================================
# Create dataset
# =============================================================================  
def create_dataset(N_TRAIN, features_DIM):
    """
    create dataset

    This function loads data from the fashion_mnist and manipulates
    data to suit our problem, e.g. normalization.

    N_TRAIN: n° of training data to teach our ML model
    features_DIM: n° of features will have each datum. It corresponds to
                  the number of qubits once the encoding is performed
    save: by default will save the dataset created. Can be disabled to 
          create dummy datasets for testing.

    """
    # print('Loading data...')
    # data loading
    (X_train, y_train), (X_test, y_test) = tf.keras.datasets.fashion_mnist.load_data()

    # convert to float in order to normalize the values of the features
    X_train = X_train.astype('float32')
    X_test = X_test.astype('float32')

    # normalization of the features from [0,255] to [0,1]
    X_train /= 255
    X_test /= 255
    
    # select only two classes to have a binary classification problem
    # =============================================================================
    # filter 0-3
    # =============================================================================
    def filter_03(x, y):
        """
        filter 0-3

        This function keeps only the dresses and t-shorts of the
        fashion mnist dataset in order to have only a binary classification problem

        x: datapoint vector
        y: target/label vector relative to the x vector

        """
        keep = (y == 0) | (y == 3)
        x, y = x[keep], y[keep]
        y = y == 0
        return x,y
    
    # apply the filter 
    X_train, y_train = filter_03(X_train, y_train)
    X_test, y_test = filter_03(X_test, y_test)

    # print("Number of filtered training examples:", len(X_train))
    # print("Number of filtered test examples:", len(X_test))
    
    # # to_categorical allows to convert each target value in a vector of len 2
    # # e.g. for the first class we have [1,0] and for the second one [0,1]
    # y_train = tf.keras.utils.to_categorical(y_train, 2)
    # y_test = tf.keras.utils.to_categorical(y_test, 2)

    # =============================================================================
    # Truncate x
    # =============================================================================
    def truncate_x(X_train, X_test, n_components=features_DIM):
        """
        truncate x

        Perform PCA on image dataset keeping the top `n_components` components.
        
        X_train: training dataset
        X_test: test dataset
        n_components: number of features we want to keep after PCA

        """
        n_points_train = tf.gather(tf.shape(X_train), 0)
        n_points_test = tf.gather(tf.shape(X_test), 0)

        # Flatten to 1D
        X_train = tf.reshape(X_train, [n_points_train, -1])
        X_test = tf.reshape(X_test, [n_points_test, -1])

        # Normalize.
        feature_mean = tf.reduce_mean(X_train, axis=0)
        X_train_normalized = X_train - feature_mean
        X_test_normalized = X_test - feature_mean

        # Truncate.
        e_values, e_vectors = tf.linalg.eigh(
          tf.einsum('ji,jk->ik', X_train_normalized, X_train_normalized))
        return tf.einsum('ij,jk->ik', X_train_normalized, e_vectors[:,-n_components:]), \
          tf.einsum('ij,jk->ik', X_test_normalized, e_vectors[:, -n_components:])

    # truncating datasets
    X_train, X_test = truncate_x(X_train, X_test, n_components=features_DIM)
    
    print(f'New datapoint dimension:', len(X_train[0]))    

    # restrict the dataset to a smaller ensemble
    N_TEST = round(N_TRAIN*0.2)
    X_train, X_test = X_train[:N_TRAIN], X_test[:N_TEST]
    y_train, y_test = y_train[:N_TRAIN], y_test[:N_TEST]

    print('Number of datapoints', len(X_train))
    
    X_train = np.reshape(X_train, (len(X_train), features_DIM))
    X_test = np.reshape(X_test, (len(X_test), features_DIM))

    print('Data loaded.')
    return X_train, X_test, y_train, y_test

In [None]:
X_train, X_test, y_train, y_test = create_dataset(N_TRAIN=800, features_DIM=5)

## The Quantum Feature Map

Actually, the number of features for one image (thus, one circuit embedding) could be bigger than the number of qubits $d$ according to the feature map we pick. In this case we again refer to a well known feature map called Instantaneous Quantum Polynomial (IQP) feature map. 

$$|\textbf{x}_i\rangle = U_{Z}(\textbf{x}_i) H^{\otimes d} U_{Z}(\textbf{x}_i) H^{\otimes d}|0^d\rangle$$

where

$$ U_{Z}(\textbf{x}_i) = \text{exp} \left( \sum_{j=1}^d \lambda x_{ij}Z_j + \sum_{j=1}^d \sum_{j'=1}^d \lambda^2 x_{ij} x_{ij'} Z_j Z_{j'} \right) \quad . $$

Then, our feature array contains both the single-qubit and two-qubit features (the product of the single-qubit features pair entangled by the map scheme).

$$ \textbf{x}_i = (x_{i1}\;, x_{i2}\;, ..., x_{id}\;, x_{i1}x_{i2}\;, x_{i1}x_{i3}\;, ..., x_{id-1}x_{id}\;) .$$

The number of additional features depends on the type of entanglement we choose. A full entanglement brings to an additional number of features equal to $\binom{d}{2}$. Finally, the total number of features for a single datapoint (as in, image) equals to $d + \binom{d}{2}$.

In [None]:
def create_pennylane_noiseless(ansatz: Ansatz, measurement: str, type: KernelType):
    return PennylaneKernel(ansatz, measurement, type, device_name="default.qubit", n_shots=None)

KernelFactory.add_implementation('pennylane_noiseless', create_pennylane_noiseless)
KernelFactory.set_current_implementation('pennylane_noiseless')
print(KernelFactory._KernelFactory__implementations)
print(KernelFactory._KernelFactory__current_implementation)

In [None]:
def UZCirquit(ansatz, bandwidth, count=0):
    d = ansatz.n_qubits
    # Z term - local
    for k in range(d):
        ansatz.change_operation(count, new_feature=k, new_wires=[k, (k+1)%d], new_generator="ZI", new_bandwidth=bandwidth)
        count += 1

    # ZZ term - full entanglement 
    for i in range(d-1):
        for j in range(i+1, d):
            ansatz.change_operation(count, new_feature=k+i, new_wires=[i, j], new_generator="ZZ", new_bandwidth=bandwidth)
            count += 1

    return ansatz, count

def HCircuit(ansatz, bandwidth=1, count=0):
    d = ansatz.n_qubits
    n_features = d + d*(d-1)/2 + 1 # we have the last feature fixed to one for non-parametrizable gates
    for i in range(d):
        ansatz.change_operation(count, new_feature=n_features, new_wires=[i, (i+1)%d], new_generator="XI", new_bandwidth=np.pi/2) # R_x(pi/2)
        ansatz.change_operation(count, new_feature=n_features, new_wires=[i, (i+1)%d], new_generator="ZI", new_bandwidth=np.pi/2) # R_z(pi/2)
        ansatz.change_operation(count, new_feature=n_features, new_wires=[i, (i+1)%d], new_generator="XI", new_bandwidth=np.pi/2) # R_x(pi/2)
        ansatz.change_operation(count, new_feature=n_features, new_wires=[i, (i+1)%d], new_generator="II", new_bandwidth=np.pi)   # e^(i pi/2)

    return ansatz, count

def IQPCirquit(ansatz, bandwidth):
    # first U_Z(x_i)
    ansatz_uz1, count_uz1 = UZCirquit(ansatz, bandwidth)
    # first layer of hadamard gates
    ansatz_uz1h1, count_uz1h1 = HCircuit(ansatz_uz1, bandwidth, count=count_uz1)
    # second U_Z(x_i)
    ansatz_uz1h1uz2, count_uz1h1uz2 = UZCirquit(ansatz_uz1h1, bandwidth, count=count_uz1h1)
    # second layer of hadamard gates
    ansatz_uz1h1uz2h2, _ = HCircuit(ansatz_uz1h1uz2, bandwidth, count=count_uz1h1uz2)
    return ansatz_uz1h1uz2h2

# we can define a new function to construct the Ansatz since we will loop for different bandwidth values
def VaryingBandwidthKernel(N_FEATURES, N_QUBITS, N_OPERATIONS, bandwidth):
    ansatz = Ansatz(n_features=N_FEATURES, n_qubits=N_QUBITS, n_operations=N_OPERATIONS)
    ansatz.initialize_to_identity()
    # we choose a IQPCircuit
    ansatz = IQPCirquit(ansatz, bandwidth)
    kernel = KernelFactory.create_kernel(ansatz, "Z" * N_QUBITS, KernelType.FIDELITY)
    return kernel

## Embedding the datapoints

The number of operations we have to perform following the `IPQCircuit` map are:

$$ \underbrace{\left(d + \frac{d(d-1)}{2} \right)}_{U_{Z}(\textbf{x}_i)} + \underbrace{d}_{H^{\otimes d}} + \underbrace{\left(d + \frac{d(d-1)}{2} \right)}_{U_{Z}(\textbf{x}_i)} + \underbrace{d}_{H^{\otimes d}} $$

which is simpy:

$$ d^2 + 3 d \quad .$$

In [None]:
# modify the datapoints for quask

def quask_features(data):
    new_data = []
    for datapoint in data:
        new_datapoint = datapoint
        for i in range(len(datapoint)-1):
            for j in range(i+1, len(datapoint)):
                new_datapoint = np.append(new_datapoint, datapoint[i]*datapoint[j])
        new_datapoint = np.append(new_datapoint, 1)
        new_data.append(new_datapoint)
    new_data = np.array(new_data)
    return new_data

In [None]:
num_qubits = [2,4]
# let us sweep for different values of the bandwidth
bandwidths = np.linspace(1e-3,1e+0, 10).tolist()

def accuracy_computation(K_train, K_test, y_train, y_test):
    # Train the SVC classifier using the precomputed kernel matrices
    svc_kern = SVC(kernel='precomputed')
    svc_kern.fit(K_train, y_train)

    # Predict and evaluate accuracy on the test set
    y_pred = svc_kern.predict(K_test)
    return accuracy_score(y_test, y_pred)

accuracy_qubit = []
for d in num_qubits:
    features_quask = d + d*(d-1)/2 + 1
    operations_quask = d**2 + 3*d 

    X_train, X_test, y_train, y_test = create_dataset(N_TRAIN=100, features_DIM=d)
    print(X_train.shape)
    X_train = quask_features(X_train)
    X_test = quask_features(X_test)
    print(X_train.shape, features_quask)
    accuracy_beta = []
    for beta in bandwidths:
        print(f"Qubits: {d}, beta: {beta}")
        kernel = VaryingBandwidthKernel(N_FEATURES=features_quask, N_QUBITS=d, N_OPERATIONS=operations_quask, bandwidth=beta)
        K_train = kernel.build_kernel(X_train, X_train)
        K_test = kernel.build_kernel(X_test, X_train)
        accuracy = accuracy_computation(K_train, K_test, y_train, y_test)
        accuracy_beta.append(accuracy)
    accuracy_qubit.append(accuracy_beta)

In [None]:
for accuracy_beta, d in zip(accuracy_qubit, num_qubits):
    plt.plot(bandwidths, accuracy_beta, label=f"{d}")
plt.xscale('log')
plt.legend()

In [None]:
num_qubits = [3,5,7,9,11]
# let us sweep for different values of the bandwidth
bandwidths = np.linspace(1e-3,1e+0, 10).tolist()
cost_he_qubit = []
cost_lre_qubit = []
for d in num_qubits:
    features_quask = int(d + d*(d-1)/2 + 1)
    operations_quask = int(d**2 + 3*d)
    cost_he_beta = []
    cost_lre_beta = []
    for beta in bandwidths:
        print(f"Qubits: {d}, beta: {beta}")
        kernel = VaryingBandwidthKernel(N_FEATURES=features_quask, N_QUBITS=d, N_OPERATIONS=operations_quask, bandwidth=beta)
        he = HaarEvaluator(n_bins=40, n_samples=10000)
        cost_he = he.evaluate(kernel=kernel, K=None, X=None, y=None)
        lre = LieRankEvaluator(T=500)
        cost_lre = lre.evaluate(kernel=kernel, K=None, X=None, y=None)
        cost_he_beta.append(cost_he)
        cost_lre_beta.append(cost_lre)
    cost_he_qubit.append(cost_he_beta)
    cost_lre_qubit.append(cost_lre_beta)

In [None]:
for cost_he, d in zip(cost_he_qubit, num_qubits):
    plt.plot(bandwidths, cost_he, label=f"{d}")
plt.xscale('log')
plt.legend()