## Quantum Generative Classification with Mixed States on $28 \times 28$ Binary (3, 6) MNIST. Training is performed first discriminative and then generative.

Diego Useche Reyes

## GPU

In [None]:
!nvidia-smi

/bin/bash: line 1: nvidia-smi: command not found


## Libraries

In [None]:
!pip install tensorcircuit

Collecting tensorcircuit
  Downloading tensorcircuit-0.12.0-py3-none-any.whl.metadata (29 kB)
Collecting tensornetwork-ng (from tensorcircuit)
  Downloading tensornetwork_ng-0.5.0-py3-none-any.whl.metadata (7.0 kB)
Downloading tensorcircuit-0.12.0-py3-none-any.whl (342 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m342.0/342.0 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading tensornetwork_ng-0.5.0-py3-none-any.whl (243 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m243.3/243.3 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensornetwork-ng, tensorcircuit
Successfully installed tensorcircuit-0.12.0 tensornetwork-ng-0.5.0


In [None]:
from functools import partial
import numpy as np
import tensorflow as tf
import tensorcircuit as tc
from tensorcircuit import keras



In [None]:
tc.set_backend("tensorflow")
tc.set_dtype("complex128")

('complex128', 'float64')

In [None]:
from tensorflow.keras import layers, Model
from tensorflow.keras.models import Sequential
from keras.optimizers import SGD
from sklearn.metrics import accuracy_score

print(tf.__version__)
print(tf.config.list_logical_devices())

2.17.0
[LogicalDevice(name='/device:CPU:0', device_type='CPU')]


## Utils functions

In [None]:
# this function takes the number of classes and of qubits of the qmc pure, and extract the indices
# of the bit strings that correpond to the classes prediction
## Example, qmc prediction bit string ["00", "01", "10", "11"]
## the classes prediction is encoded in ["00", "10"], then it returns [0, 2]

def _indices_qubits_classes(num_qubits_param, num_classes_param):
  num_qubits_classes_temp = int(np.ceil(np.log2(num_classes_param)))
  a = [np.binary_repr(i, num_qubits_param) for i in range(2**num_qubits_param)]
  b = [(np.binary_repr(i, num_qubits_classes_temp) + "0"*(num_qubits_param - num_qubits_classes_temp)) for i in range(num_classes_param)]
  indices_temp = []
  for i in range(len(a)):
    if a[i] in b:
      indices_temp.append(i)

  return indices_temp

_indices_qubits_classes(4, 4)

[0, 4, 8, 12]

## MNIST Dataset

In [None]:
import collections

"""
Module for MNIST Digits Dataset preprocessing.
https://www.tensorflow.org/quantum/tutorials/mnist

Python 3.10.11
"""

def filter_by_classes(x, y, classes=[3,6]):
    """
    Function that filters the MNIST Digits Dataset and returns samples on 'classes'.
    Parameters:
        x: Sample images.
        y: Sample labels.
        classes: List of classes to filter.
    Returns:
        x: x filtered by 'classes'.
        y: x filtered by 'classes'.
    """
    if not all(np.isin(classes, range(0, 10))):
        return ValueError("Classes must be a list of digits (0-9).")
    x, y = x[np.isin(y, classes)], y[np.isin(y, classes)]
    if len(classes)==2:
        return x, y==classes[-1]
    else:
        return x, y

def remove_contradicting(xs, ys):

    mapping = collections.defaultdict(set)
    orig_x = {}
    # Determine the set of labels for each unique image:
    for x,y in zip(xs,ys):
       orig_x[tuple(x.flatten())] = x
       mapping[tuple(x.flatten())].add(y)

    new_x = []
    new_y = []
    for flatten_x in mapping:
      x = orig_x[flatten_x]
      labels = mapping[flatten_x]
      if len(labels) == 1:
          new_x.append(x)
          new_y.append(next(iter(labels)))
      else:
          # Throw out images that match more than one label.
          pass

    num_uniq_3 = sum(1 for value in mapping.values() if len(value) == 1 and True in value)
    num_uniq_6 = sum(1 for value in mapping.values() if len(value) == 1 and False in value)
    num_uniq_both = sum(1 for value in mapping.values() if len(value) == 2)

    print("Number of unique images:", len(mapping.values()))
    print("Number of unique 3s: ", num_uniq_3)
    print("Number of unique 6s: ", num_uniq_6)
    print("Number of unique contradicting labels (both 3 and 6): ", num_uniq_both)
    print()
    print("Initial number of images: ", len(xs))
    print("Remaining non-contradicting unique images: ", len(new_x))

    return np.array(new_x), np.array(new_y)

def preprocess_mnist_digits(classes=[3,6]):
    """"
    Function that downloads the MNIST Digits dataset with TensorFlow and performs the following tasks:
        1. Normalizes pixel values from (0, 255) to (0, 1).
        2. By default, returns only 2 classes of digits for classification (this can be deactivated or modified by the 'classes' parameter).
        3. Resizes samples to 4x4 images.
        4. Removes samples that belong to multiple classes simultaneously.
        5. Converts images to binary."
    Parameters:
    Returns:
    """

    # Download dataset
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

    # Rescale the images from [0,255] to the [0.0,1.0] range.
    x_train, x_test = x_train[..., np.newaxis]/255.0, x_test[..., np.newaxis]/255.0

    # Filter to get only '3's and '6's
    x_train, y_train = filter_by_classes(x_train, y_train, classes=classes)
    x_test, y_test = filter_by_classes(x_test, y_test, classes=classes)

    print("Number of filtered training examples:", len(x_train))
    print("Number of filtered test examples:", len(x_test))

    x_train_nocon, y_train_nocon = remove_contradicting(x_train, y_train)

    THRESHOLD = 0.5

    # Converts non contradicting samples to binary via threshold and converting bool to float.
    x_train_bin = np.array(x_train_nocon > THRESHOLD, dtype=np.float32)
    x_test_bin = np.array(x_test > THRESHOLD, dtype=np.float32)

    return x_train_bin, y_train_nocon, x_test_bin, y_test

X_train, y_train, X_test, y_test = preprocess_mnist_digits()
X_train *= np.pi
X_test *= np.pi
y_train = np.where(y_train==False, 0, 1)
y_test = np.where(y_test==False, 0, 1)
y_train_oh = tf.reshape(tf.keras.backend.one_hot(y_train, 2), (-1,2))
y_test_oh = tf.reshape(tf.keras.backend.one_hot(y_train, 2), (-1,2))

print(X_train.shape, y_train.shape, X_test.shape, y_test.shape, y_train_oh.shape, y_test_oh.shape)

Number of filtered training examples: 12049
Number of filtered test examples: 1968
Number of unique images: 12049
Number of unique 3s:  5918
Number of unique 6s:  6131
Number of unique contradicting labels (both 3 and 6):  0

Initial number of images:  12049
Remaining non-contradicting unique images:  12049
(12049, 28, 28, 1) (12049,) (1968, 28, 28, 1) (1968,) (12049, 2) (12049, 2)


## Model 1: Mixed QMC variational, QEFF, APA, generative with Le-Net Conv layer. Tranining: first discriminative then generative.

In [None]:
### Quantum variational KDC with QEFF

import tensorcircuit as tc
from tensorcircuit import keras
import tensorflow as tf

from functools import partial
import numpy as np
import math as m
from scipy.stats import entropy, spearmanr

tc.set_backend("tensorflow")
tc.set_dtype("complex128")

pi = tf.constant(m.pi)
class VQKDC_MIXED_QEFF_LENET:
    r"""
    Defines the ready-to-use Quantum measurement classification (QMC) model implemented
    in TensorCircuit using the TensorFlow/Keras API. Any additional argument in the methods has to be Keras-compliant.

    Args:
        auto_compile: A boolean to autocompile the model using default settings. (Default True).
        var_pure_state_size:
        gamma:

    Returns:
        An instantiated model ready to train with ad-hoc data.

    """
    def __init__(self, n_qeff_qubits, n_ancilla_qubits, num_classes_qubits, num_classes_param, dim_lenet_out_param, input_dim_param, gamma, n_training_data,  reduction = "none", training_type = "generative", batch_size = 16, learning_rate = 0.0005, random_state = 15, auto_compile=True):

        self.circuit = None
        self.gamma = gamma
        self.num_classes = num_classes_param
        self.num_classes_qubits = num_classes_qubits
        self.n_qeff_qubits = n_qeff_qubits
        self.n_ancilla_qubits = n_ancilla_qubits
        self.n_total_qubits_temp = self.num_classes_qubits + self.n_qeff_qubits + self.n_ancilla_qubits
        self.num_ffs = 2**self.n_qeff_qubits
        self.n_training_data = n_training_data
        self.dim_lenet_out = dim_lenet_out_param
        self.input_dim = input_dim_param
        self.var_pure_state_parameters_size = 2*(2**self.n_total_qubits_temp - 1)
        self.reduction  = reduction
        self.training_type = training_type
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.qeff_weights = tf.random.normal((self.dim_lenet_out, int(self.num_ffs*1-1)), mean = 0.0, stddev = 2.0/np.sqrt(self.num_ffs - 1), dtype=tf.dtypes.float64, seed = random_state)

        layer = keras.QuantumLayer(
            partial(self.layer),
            [(self.var_pure_state_parameters_size,)]
            )

        ## build the Let-net model
        self.model = Sequential()
        self.model.add(tf.keras.layers.Conv2D(filters=20, kernel_size=(5, 5), padding="same", activation='relu', input_shape=(self.input_dim, self.input_dim, 1)))
        self.model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
        self.model.add(tf.keras.layers.Conv2D(filters=50, kernel_size=(5, 5), padding="same", activation='relu'))
        self.model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
        self.model.add(tf.keras.layers.Flatten())
        self.model.add(tf.keras.layers.Dense(units=84, activation='relu'))
        self.model.add(tf.keras.layers.Dense(units=self.dim_lenet_out, activation = None)) # original "softmax"

        # add the quantum layer

        self.model.add(layer)
        print(self.model.summary())

        if auto_compile:
            self.compile()

    def layer(
            self,
            x_sample_param,
            var_pure_state_param,
        ):
        r"""
        Defines a Density Matrix Kernel Density Estimation quantum layer for learning with fixed qaff (Meaning of qaff?). (This function was originally named dmkde_mixed_variational_density_estimation_fixed_qaff)

        Args:
            U_dagger:
            var_pure_state_param:

        Returns:
            The probabilities of :math:`|k\rangle`, `|1\rangle`, ..., `|k\rangle` state for kernel density classification of the classes.
        """

        ### indices pure state
        index_it = iter(np.arange(len(var_pure_state_param)))

        ### indices qeff
        index_iter_qeff = iter(np.arange(self.qeff_weights.shape[1]))

        ### indices classes, of ms
        n_qubits_classes_qeff_temp = self.num_classes_qubits + self.n_qeff_qubits
        index_qubit_states = _indices_qubits_classes(n_qubits_classes_qeff_temp, self.num_classes) # extract indices of the bit string of classes


        # Instantiate a circuit with the calculated number of qubits.
        self.circuit = tc.Circuit(self.n_total_qubits_temp)

        def circuit_base_ry_n(qc_param, num_qubits_param, target_qubit_param):
            if num_qubits_param == 1:
                qc_param.ry(0, theta = var_pure_state_param[next(index_it)])
            elif num_qubits_param == 2:
                qc_param.ry(target_qubit_param, theta=var_pure_state_param[next(index_it)])
                qc_param.cnot(0, target_qubit_param)
                qc_param.ry(target_qubit_param, theta=var_pure_state_param[next(index_it)])
                return
            else:
                circuit_base_ry_n(qc_param, num_qubits_param-1, target_qubit_param)
                qc_param.cnot(num_qubits_param-2, target_qubit_param)
                circuit_base_ry_n(qc_param, num_qubits_param-1, target_qubit_param)
                target_qubit_param -= 1

        def circuit_base_rz_n(qc_param, num_qubits_param, target_qubit_param):
            if num_qubits_param == 1:
                qc_param.rz(0, theta = var_pure_state_param[next(index_it)])
            elif num_qubits_param == 2:
                qc_param.rz(target_qubit_param, theta=var_pure_state_param[next(index_it)])
                qc_param.cnot(0, target_qubit_param)
                qc_param.rz(target_qubit_param, theta=var_pure_state_param[next(index_it)])
                return
            else:
                circuit_base_rz_n(qc_param, num_qubits_param-1, target_qubit_param)
                qc_param.cnot(num_qubits_param-2, target_qubit_param)
                circuit_base_rz_n(qc_param, num_qubits_param-1, target_qubit_param)
                target_qubit_param -= 1

        # Learning pure state
        for i in range(1, self.n_total_qubits_temp+1):
            circuit_base_ry_n(self.circuit, i, i-1)

        # Learning pure state complex phase
        for j in range(1, self.n_total_qubits_temp+1):
            circuit_base_rz_n(self.circuit, j, j-1)

        # Value to predict

        x_sample_temp = tf.expand_dims(x_sample_param, axis=0)
        phases_temp = (tf.cast(tf.sqrt(self.gamma), tf.float64)*tf.linalg.matmul(tf.cast(x_sample_temp, tf.float64), self.qeff_weights))[0]
        init_qubit_qeff_temp = self.num_classes_qubits # qubit at which the qaff mapping starts it starts after the qubits of the classes

        def circuit_base_rz_qeff_n(qc_param, num_qubits_param, target_qubit_param, init_qubit_param):
          if num_qubits_param == 1:
            qc_param.rz(init_qubit_param, theta = phases_temp[next(index_iter_qeff)] )
          elif num_qubits_param == 2:
            qc_param.rz(target_qubit_param + init_qubit_param, theta = phases_temp[next(index_iter_qeff)])
            qc_param.cnot(init_qubit_param, target_qubit_param + init_qubit_param)
            qc_param.rz(target_qubit_param + init_qubit_param, theta = phases_temp[next(index_iter_qeff)])
            return
          else:
            circuit_base_rz_qeff_n(qc_param, num_qubits_param-1, target_qubit_param, init_qubit_param)
            qc_param.cnot(num_qubits_param-2 + init_qubit_param, target_qubit_param + init_qubit_param)
            circuit_base_rz_qeff_n(qc_param, num_qubits_param-1, target_qubit_param, init_qubit_param)
            target_qubit_param -= 1

        # Applying the QEFF feature map

        for i in reversed(range(1, self.n_qeff_qubits + 1)):
          circuit_base_rz_qeff_n(self.circuit, i, i - 1, init_qubit_qeff_temp)

        for i in range(init_qubit_qeff_temp, init_qubit_qeff_temp + self.n_qeff_qubits):
          self.circuit.H(i)

        # Trace out ancilla qubits, find probability of [000] state for density estimation
        measurement_state = tc.quantum.reduced_density_matrix(
                        self.circuit.state(),
                        cut=[m for m in range(n_qubits_classes_qeff_temp, self.n_total_qubits_temp)])
        measurements_results = tc.backend.real(tf.stack([measurement_state[index_qubit_states[i], index_qubit_states[i]] for i in range(self.num_classes)]))
        if self.training_type == "discriminative":
          measurements_results = measurements_results / tf.reduce_sum(measurements_results, axis = -1)
        print(self.training_type)
        return measurements_results

    def custom_categorical_crossentropy(self, y_true, y_pred):
      ## code generated with the aid of chat gpt
      """
      Compute the categorical cross-entropy loss with mean reduction.

      Args:
      y_true: Tensor of true labels, shape (batch_size, num_classes).
      y_pred: Tensor of predicted probabilities, shape (batch_size, num_classes).

      Returns:
      Scalar tensor representing the mean loss over the batch.
      """
      # Ensure predictions are clipped to avoid log(0)
      epsilon_two = 1e-7  # small constant to avoid division by zero
      y_pred = tf.clip_by_value(y_pred, epsilon_two, np.inf)  # clip values to avoid log(0) originaly 1.0 - epsilon

      # Compute the categorical cross-entropy loss for each sample
      loss = -tf.reduce_sum(y_true * tf.math.log(y_pred), axis=-1)

      if self.reduction == "none":
        return loss
      elif self.reduction == "mean":
        # Compute the mean loss over the batch
        mean_loss = tf.reduce_mean(loss)
        return mean_loss
      elif self.reduction == "sum":
        # Compute the sum loss over the batch
        sum_loss = tf.reduce_sum(loss)
        return sum_loss
      else:
        return loss

    def compile(
            self,
            optimizer=tf.keras.optimizers.Adam,
            **kwargs):
        r"""
        Method to compile the model.

        Args:
            optimizer:
            **kwargs: Any additional argument.

        Returns:
            None.
        """
        self.model.compile(
            loss = self.custom_categorical_crossentropy,
            optimizer=optimizer(self.learning_rate),
            metrics=["accuracy"],
            **kwargs
        )

    def fit(self, x_train, y_train, batch_size=16, epochs = 30, **kwargs):
        r"""
        Method to fit (train) the model using the ad-hoc dataset.

        Args:
            x_train:
            y_train:
            batch_size:
            epochs:
            **kwargs: Any additional argument.

        Returns:
            None.
        """

        self.model.fit(x_train, y_train, batch_size = self.batch_size, epochs = epochs, **kwargs)

    def predict(self, x_test):
      r"""
      Method to make predictions with the trained model.

      Args:
          x_test:

      Returns:
          The predictions of the conditional density estimation of the input data.
      """
      return (tf.experimental.numpy.power((self.gamma/(pi)), self.dim_lenet_out/2.)*\
          self.model.predict(x_test)).numpy()

    def extract_lenet_features(self, x_test):
      r"""
      Method to extract the features of the Lenet.

      Args:
          x_test:

      Returns:
          Features of the Lenet.
      """
      extract = Model(self.model.inputs, self.model.layers[-2].output)
      new_model = Sequential()
      new_model.add(extract)
      return new_model.predict(x_test)

    def freeze_lenet(self):
      r"""
      Method to frezze the layers of the Lenet.

      Args:
          self:

      Returns:
          None.
      """
      for layer in self.model.layers[:-1]:
          layer.trainable = False

    def set_training_type(self, new_training_type):
      r"""
      Method to change the training type of the method during training.

      Args:
          new_training_strategy:

      Returns:
          None.
      """
      self.training_type = new_training_type

In [None]:
# Define a LeNet CNN feature extraction model
input_shape = X_train.shape[1:]
INPUT_DIM = input_shape[0]
NUM_QUBITS_FFS = 4 ## originally N_QEFFS = 16
NUM_CLASSES = 2
NUM_CLASSES_QUBITS = 1
DIM_LENET_OUT = 30 # originally 16
GAMMA = 2**(-2) ## originally 2**(0)
EPOCHS = 5
N_TRAINING_DATA = X_train.shape[0]
BATCH_SIZE = 32
LEARNING_RATE = 0.0005 ## originally 0.0005
RANDOM_STATE_QEFF = 123
NUM_ANCILLA_QUBITS = 1
REDUCTION = "none"
TRAINING_TYPE = "generative"

EPOCHS, NUM_QUBITS_FFS, NUM_ANCILLA_QUBITS, GAMMA, RANDOM_STATE_QEFF, RANDOM_STATE_QEFF, LEARNING_RATE, N_TRAINING_DATA, REDUCTION, TRAINING_TYPE

(5, 4, 1, 0.25, 123, 123, 0.0005, 12049, 'none', 'generative')

In [None]:
## this code creates a discriminative model
vqkdc = VQKDC_MIXED_QEFF_LENET(n_qeff_qubits = NUM_QUBITS_FFS, n_ancilla_qubits =  NUM_ANCILLA_QUBITS, num_classes_qubits = NUM_CLASSES_QUBITS, num_classes_param = NUM_CLASSES, dim_lenet_out_param = DIM_LENET_OUT, input_dim_param = INPUT_DIM, gamma=GAMMA, n_training_data = N_TRAINING_DATA,  reduction = REDUCTION, training_type = TRAINING_TYPE, batch_size = BATCH_SIZE, learning_rate = LEARNING_RATE, random_state = RANDOM_STATE_QEFF, auto_compile = False)
optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
vqkdc.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
vqkdc.fit(X_train, y_train_oh, epochs = EPOCHS)

generative


None
Epoch 1/5
generative
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m124s[0m 98ms/step - accuracy: 0.7050 - loss: 1.3841
Epoch 2/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m36s[0m 96ms/step - accuracy: 0.9982 - loss: 0.0308
Epoch 3/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 79ms/step - accuracy: 0.9992 - loss: 0.0138
Epoch 4/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m29s[0m 78ms/step - accuracy: 0.9997 - loss: 0.0099
Epoch 5/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 79ms/step - accuracy: 0.9995 - loss: 0.0045


In [None]:
# this code frezzes the weights of the Le net layer, and then sets the model to be trained in a generative way
for layer in vqkdc.model.layers[:-1]:
    layer.trainable = False
vqkdc.compile()
vqkdc.fit(X_train, y_train_oh, epochs = EPOCHS)

Epoch 1/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 37ms/step - accuracy: 0.5797 - loss: 4.3470
Epoch 2/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 36ms/step - accuracy: 0.5027 - loss: 3.0319
Epoch 3/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 35ms/step - accuracy: 0.5107 - loss: 2.3592
Epoch 4/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 37ms/step - accuracy: 0.5087 - loss: 1.9316
Epoch 5/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 40ms/step - accuracy: 0.5048 - loss: 1.6590


In [None]:
vqkdc.fit(X_train, y_train_oh, epochs = EPOCHS)

Epoch 1/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 36ms/step - accuracy: 0.5034 - loss: 1.3823
Epoch 2/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 36ms/step - accuracy: 0.5096 - loss: 1.1684
Epoch 3/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 37ms/step - accuracy: 0.5024 - loss: 1.0728
Epoch 4/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 37ms/step - accuracy: 0.4983 - loss: 0.9834
Epoch 5/5
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 37ms/step - accuracy: 0.9865 - loss: 0.8967


In [None]:
vqkdc.fit(X_train, y_train_oh, epochs = 2)

Epoch 1/2
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 45ms/step - accuracy: 0.9996 - loss: 0.8509
Epoch 2/2
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m17s[0m 37ms/step - accuracy: 0.9979 - loss: 0.8230


In [None]:
y_pred = vqkdc.predict(X_test)

accuracy_score(y_test, np.argmax(y_pred, axis=1))

generative
[1m61/62[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 23ms/stepgenerative
[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m107s[0m 918ms/step


0.9979674796747967

## Model 2: Mixed QMC variational, QEFF, HEA, generative with Le-Net Conv layer. Tranining: first discriminative then generative.

The number of parameters of the Hardware efficient ansatz is given by,


```
HEA_size = n_qubits * (num_layers_hea + 1) * 2
```



In [None]:
### Quantum variational KDC with QEFF

import tensorcircuit as tc
from tensorcircuit import keras
import tensorflow as tf

from functools import partial
import numpy as np
import math as m
from scipy.stats import entropy, spearmanr

tc.set_backend("tensorflow")
tc.set_dtype("complex128")

pi = tf.constant(m.pi)

class VQKDC_MIXED_QEFF_HEA_LENET:
    r"""
    Defines the ready-to-use Quantum measurement classification (QMC) model implemented
    in TensorCircuit using the TensorFlow/Keras API. Any additional argument in the methods has to be Keras-compliant.

    Args:
        auto_compile: A boolean to autocompile the model using default settings. (Default True).
        var_pure_state_size:
        gamma:

    Returns:
        An instantiated model ready to train with ad-hoc data.

    """
    def __init__(self, n_qeff_qubits, n_ancilla_qubits, num_classes_qubits, num_classes_param, dim_lenet_out_param,  input_dim_param, gamma, n_training_data, reduction = "none", training_type = "generative", num_layers_hea = 3, batch_size = 16, learning_rate = 0.0005, random_state = 15, auto_compile=True):

        self.circuit = None
        self.gamma = gamma
        self.num_layers_hea = num_layers_hea
        self.num_classes = num_classes_param
        self.num_classes_qubits = num_classes_qubits
        self.n_qeff_qubits = n_qeff_qubits
        self.n_ancilla_qubits = n_ancilla_qubits
        self.n_total_qubits_temp = self.num_classes_qubits + self.n_qeff_qubits + self.n_ancilla_qubits
        self.num_ffs = 2**self.n_qeff_qubits
        self.n_training_data = n_training_data
        self.dim_lenet_out = dim_lenet_out_param
        self.input_dim = input_dim_param
        self.var_hea_ansatz_size = int(self.n_total_qubits_temp*(self.num_layers_hea+1)*2)
        self.reduction  = reduction
        self.training_type = training_type
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.qeff_weights = tf.random.normal((self.dim_lenet_out, int(self.num_ffs*1-1)), mean = 0.0, stddev = 2.0/np.sqrt(self.num_ffs - 1), dtype=tf.dtypes.float64, seed = random_state)

        layer = keras.QuantumLayer(
            partial(self.layer),
            [(self.var_hea_ansatz_size,)]
            )

        ## build the Let-net model
        self.model = Sequential()
        self.model.add(tf.keras.layers.Conv2D(filters=20, kernel_size=(5, 5), padding="same", activation='relu', input_shape=(self.input_dim, self.input_dim, 1)))
        self.model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
        self.model.add(tf.keras.layers.Conv2D(filters=50, kernel_size=(5, 5), padding="same", activation='relu'))
        self.model.add(tf.keras.layers.AveragePooling2D(pool_size=(2, 2), strides=2))
        self.model.add(tf.keras.layers.Flatten())
        self.model.add(tf.keras.layers.Dense(units=84, activation='relu'))
        self.model.add(tf.keras.layers.Dense(units=self.dim_lenet_out, activation = None)) # original "softmax"

        # add the quantum layer

        self.model.add(layer)
        print(self.model.summary())

        if auto_compile:
            self.compile()

    def layer(
            self,
            x_sample_param,
            var_hea_ansatz_param,
        ):
        r"""
        Defines a Density Matrix Kernel Density Estimation quantum layer for learning with fixed qaff (Meaning of qaff?). (This function was originally named dmkde_mixed_variational_density_estimation_fixed_qaff)

        Args:
            U_dagger:
            var_pure_state_param:

        Returns:
            The probabilities of :math:`|k\rangle`, `|1\rangle`, ..., `|k\rangle` state for kernel density classification of the classes.
        """

        ### indices pure state hea
        index_iter_hea  = iter(np.arange(len(var_hea_ansatz_param)))

        ### indices qeff
        index_iter_qeff = iter(np.arange(self.qeff_weights.shape[1]))

        ### indices classes, of ms
        n_qubits_classes_qeff_temp = self.num_classes_qubits + self.n_qeff_qubits
        index_qubit_states = _indices_qubits_classes(n_qubits_classes_qeff_temp, self.num_classes) # extract indices of the bit string of classes


        # Instantiate a circuit with the calculated number of qubits.
        self.circuit = tc.Circuit(self.n_total_qubits_temp)

        def hea_ansatz(qc_param, num_qubits_param, num_layers_param):
          # encoding
          for i in range (0, num_qubits_param):
            qc_param.ry(i, theta = var_hea_ansatz_param[next(index_iter_hea)])
            qc_param.rz(i, theta = var_hea_ansatz_param[next(index_iter_hea)])
          # layers
          for j in range(num_layers_param):
            for i in range (0, num_qubits_param-1):
              qc_param.CNOT(i, i+1)

            for i in range (0, num_qubits_param):
              qc_param.ry(i, theta= var_hea_ansatz_param[next(index_iter_hea)])
              qc_param.rz(i, theta= var_hea_ansatz_param[next(index_iter_hea)])

        ## learning pure state with HEA
        hea_ansatz(self.circuit, self.n_total_qubits_temp, self.num_layers_hea)

        # Value to predict

        x_sample_temp = tf.expand_dims(x_sample_param, axis=0)
        phases_temp = (tf.cast(tf.sqrt(self.gamma), tf.float64)*tf.linalg.matmul(tf.cast(x_sample_temp, tf.float64), self.qeff_weights))[0]
        init_qubit_qeff_temp = self.num_classes_qubits # qubit at which the qaff mapping starts it starts after the qubits of the classes

        def circuit_base_rz_qeff_n(qc_param, num_qubits_param, target_qubit_param, init_qubit_param):
          if num_qubits_param == 1:
            qc_param.rz(init_qubit_param, theta = phases_temp[next(index_iter_qeff)] )
          elif num_qubits_param == 2:
            qc_param.rz(target_qubit_param + init_qubit_param, theta = phases_temp[next(index_iter_qeff)])
            qc_param.cnot(init_qubit_param, target_qubit_param + init_qubit_param)
            qc_param.rz(target_qubit_param + init_qubit_param, theta = phases_temp[next(index_iter_qeff)])
            return
          else:
            circuit_base_rz_qeff_n(qc_param, num_qubits_param-1, target_qubit_param, init_qubit_param)
            qc_param.cnot(num_qubits_param-2 + init_qubit_param, target_qubit_param + init_qubit_param)
            circuit_base_rz_qeff_n(qc_param, num_qubits_param-1, target_qubit_param, init_qubit_param)
            target_qubit_param -= 1

        # Applying the QEFF feature map

        for i in reversed(range(1, self.n_qeff_qubits + 1)):
          circuit_base_rz_qeff_n(self.circuit, i, i - 1, init_qubit_qeff_temp)

        for i in range(init_qubit_qeff_temp, init_qubit_qeff_temp + self.n_qeff_qubits):
          self.circuit.H(i)

        # Trace out ancilla qubits, find probability of [000] state for density estimation
        measurement_state = tc.quantum.reduced_density_matrix(
                        self.circuit.state(),
                        cut=[m for m in range(n_qubits_classes_qeff_temp, self.n_total_qubits_temp)])
        measurements_results = tc.backend.real(tf.stack([measurement_state[index_qubit_states[i], index_qubit_states[i]] for i in range(self.num_classes)]))
        if self.training_type == "discriminative":
          measurements_results = measurements_results / tf.reduce_sum(measurements_results, axis = -1)
        return measurements_results

    def custom_categorical_crossentropy(self, y_true, y_pred):
      ## code generated with the aid of chat gpt
      """
      Compute the categorical cross-entropy loss with mean reduction.

      Args:
      y_true: Tensor of true labels, shape (batch_size, num_classes).
      y_pred: Tensor of predicted probabilities, shape (batch_size, num_classes).

      Returns:
      Scalar tensor representing the mean loss over the batch.
      """
      # Ensure predictions are clipped to avoid log(0)
      epsilon_two = 1e-7  # small constant to avoid division by zero
      y_pred = tf.clip_by_value(y_pred, epsilon_two, np.inf)  # clip values to avoid log(0) originaly 1.0 - epsilon

      # Compute the categorical cross-entropy loss for each sample
      loss = -tf.reduce_sum(y_true * tf.math.log(y_pred), axis=-1)

      if self.reduction == "none":
        return loss
      elif self.reduction == "mean":
        # Compute the mean loss over the batch
        mean_loss = tf.reduce_mean(loss)
        return mean_loss
      elif self.reduction == "sum":
        # Compute the sum loss over the batch
        sum_loss = tf.reduce_sum(loss)
        return sum_loss
      else:
        return loss

    def compile(
            self,
            optimizer=tf.keras.optimizers.Adam,
            **kwargs):
        r"""
        Method to compile the model.

        Args:
            optimizer:
            **kwargs: Any additional argument.

        Returns:
            None.
        """
        self.model.compile(
            loss = self.custom_categorical_crossentropy,
            optimizer=optimizer(self.learning_rate),
            metrics=["accuracy"],
            **kwargs
        )

    def fit(self, x_train, y_train, batch_size=16, epochs = 30, **kwargs):
        r"""
        Method to fit (train) the model using the ad-hoc dataset.

        Args:
            x_train:
            y_train:
            batch_size:
            epochs:
            **kwargs: Any additional argument.

        Returns:
            None.
        """

        self.model.fit(x_train, y_train, batch_size = self.batch_size, epochs = epochs, **kwargs)

    def predict(self, x_test):
      r"""
      Method to make predictions with the trained model.

      Args:
          x_test:

      Returns:
          The predictions of the conditional density estimation of the input data.
      """
      return (tf.experimental.numpy.power((self.gamma/(pi)), self.dim_lenet_out/2.)*\
          self.model.predict(x_test)).numpy()

    def extract_lenet_features(self, x_test):
      r"""
      Method to extract the features of the Lenet.

      Args:
          x_test:

      Returns:
          Features of the Lenet.
      """
      extract = Model(self.model.inputs, self.model.layers[-2].output)
      new_model = Sequential()
      new_model.add(extract)
      return new_model.predict(x_test)

    def freeze_lenet(self):
      r"""
      Method to frezze the layers of the Lenet.

      Args:
          self:

      Returns:
          None.
      """
      for layer in self.model.layers[:-1]:
          layer.trainable = False

    def set_training_type(self, new_training_type):
      r"""
      Method to change the training type of the method during training.

      Args:
          new_training_strategy:

      Returns:
          None.
      """
      self.training_type = new_training_type

In [None]:
# Define a LeNet CNN feature extraction model
input_shape = X_train.shape[1:]
INPUT_DIM = input_shape[0]
NUM_QUBITS_FFS = 4 ## originally N_QEFFS = 16
NUM_CLASSES = 2
NUM_CLASSES_QUBITS = 1
NUM_TOTAL_QUBITS = NUM_QUBITS_FFS + NUM_ANCILLA_QUBITS + NUM_CLASSES_QUBITS
NUM_LAYERS_HEA = int(np.round(((2**NUM_TOTAL_QUBITS-1)/NUM_TOTAL_QUBITS)-1))
DIM_LENET_OUT = 30 # originally 16
GAMMA = 2**(-2) ## originally 2**(0)
EPOCHS = 4 ## originally 8
N_TRAINING_DATA = X_train.shape[0]
BATCH_SIZE = 32
LEARNING_RATE = 0.0005 ## originally 0.0005
RANDOM_STATE_QEFF = 123
NUM_ANCILLA_QUBITS = 1
REDUCTION = "none"
TRAINING_TYPE = "generative"

EPOCHS, NUM_QUBITS_FFS, NUM_ANCILLA_QUBITS, GAMMA, RANDOM_STATE_QEFF, RANDOM_STATE_QEFF, LEARNING_RATE, N_TRAINING_DATA, REDUCTION, TRAINING_TYPE

(4, 4, 1, 0.25, 123, 123, 0.0005, 12049, 'none', 'generative')

In [None]:
## this code creates a discriminative model
vqkdc_hea = VQKDC_MIXED_QEFF_HEA_LENET(n_qeff_qubits = NUM_QUBITS_FFS, n_ancilla_qubits =  NUM_ANCILLA_QUBITS, num_classes_qubits = NUM_CLASSES_QUBITS, num_classes_param = NUM_CLASSES, dim_lenet_out_param = DIM_LENET_OUT, input_dim_param = INPUT_DIM, gamma=GAMMA, n_training_data = N_TRAINING_DATA,  reduction = REDUCTION, training_type = TRAINING_TYPE, num_layers_hea = NUM_LAYERS_HEA, batch_size = BATCH_SIZE, learning_rate = LEARNING_RATE, random_state = RANDOM_STATE_QEFF, auto_compile = False)
optimizer = tf.keras.optimizers.Adam(LEARNING_RATE)
vqkdc_hea.model.compile(optimizer=optimizer, loss='categorical_crossentropy', metrics=['accuracy'])
vqkdc_hea.fit(X_train, y_train_oh, epochs = EPOCHS)

None
Epoch 1/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m87s[0m 79ms/step - accuracy: 0.8607 - loss: 0.2689
Epoch 2/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 79ms/step - accuracy: 0.9989 - loss: 0.0167
Epoch 3/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m41s[0m 80ms/step - accuracy: 1.0000 - loss: 0.0016
Epoch 4/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m34s[0m 90ms/step - accuracy: 1.0000 - loss: 0.0011


In [None]:
# this code frezzes the weights of the Le net layer, and then sets the model to be trained in a generative way
for layer in vqkdc_hea.model.layers[:-1]:
    layer.trainable = False
vqkdc_hea.compile()
vqkdc_hea.fit(X_train, y_train_oh, epochs = EPOCHS)

Epoch 1/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m25s[0m 38ms/step - accuracy: 0.9992 - loss: 2.5462
Epoch 2/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 38ms/step - accuracy: 0.9970 - loss: 1.2786
Epoch 3/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 39ms/step - accuracy: 0.9964 - loss: 1.0120
Epoch 4/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m20s[0m 38ms/step - accuracy: 0.9975 - loss: 0.9136


In [None]:
vqkdc_hea.fit(X_train, y_train_oh, epochs = EPOCHS)

Epoch 1/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 55ms/step - accuracy: 0.9985 - loss: 0.8621
Epoch 2/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m33s[0m 35ms/step - accuracy: 0.9982 - loss: 0.8350
Epoch 3/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 36ms/step - accuracy: 0.9981 - loss: 0.8190
Epoch 4/4
[1m377/377[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 36ms/step - accuracy: 0.9971 - loss: 0.8141


In [None]:
y_pred = vqkdc_hea.predict(X_test)

accuracy_score(y_test, np.argmax(y_pred, axis=1))

[1m62/62[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m73s[0m 616ms/step


0.9984756097560976