## Quantum-classical anomaly detection with QDEMDE

## Libraries

In [None]:
!pip install pennylane

Collecting pennylane
  Downloading PennyLane-0.34.0-py3-none-any.whl (1.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m5.8 MB/s[0m eta [36m0:00:00[0m
Collecting rustworkx (from pennylane)
  Downloading rustworkx-0.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.1/2.1 MB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
Collecting semantic-version>=2.7 (from pennylane)
  Downloading semantic_version-2.10.0-py2.py3-none-any.whl (15 kB)
Collecting autoray>=0.6.1 (from pennylane)
  Downloading autoray-0.6.8-py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.9/49.9 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
Collecting pennylane-lightning>=0.34 (from pennylane)
  Downloading PennyLane_Lightning-0.34.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (18.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import pennylane as qml
from pennylane import numpy as np
import matplotlib.pyplot as plt
from pennylane.optimize import NesterovMomentumOptimizer
from scipy.stats import norm
import numpy as onp
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
from scipy import io
from sklearn.metrics import roc_auc_score

## Classical Prediction with Complex Adaptive RFF

In [None]:
import pylab as pl
from sklearn.kernel_approximation import RBFSampler

def rff_map(x, gamma, n_components, random_state=0):
    rbf_feature = RBFSampler(gamma=gamma, n_components=n_components, random_state=random_state)
    return rbf_feature.fit_transform(x)

def gauss_kernel(x, y, gamma):
    return np.exp(-gamma * (x - y) ** 2)

In [None]:
class QFeatureMapCompAdaptRFF(tf.keras.layers.Layer):

    def __init__(
                 self,
                 input_dim: int,
                 dim: int = 100,
                 gamma: float = 1,
                 random_state = None,
                 gamma_trainable=False,
                 weights_trainable=True,
                 **kwargs
                 ):
        self.g_trainable = gamma_trainable
        self.w_trainable = weights_trainable
        super().__init__(**kwargs)
        self.input_dim = input_dim
        self.dim = dim
        self.gamma = gamma
        self.random_state = random_state

    def build(self, input_shape):
        rbf_sampler = RBFSampler(
            gamma=0.5,
            n_components=self.dim,
            random_state=self.random_state)
        x = np.zeros(shape=(1, self.input_dim))
        rbf_sampler.fit(x)
        self.gamma_val = tf.Variable(
            initial_value=self.gamma,
            dtype=tf.float32,
            trainable=self.g_trainable,
            name="rff_gamma")
        self.rff_weights = tf.Variable(
            initial_value=rbf_sampler.random_weights_,
            dtype=tf.float32,
            trainable=self.w_trainable,
            name="rff_weights")
        self.built = True

    def call(self, inputs):
        # Complex Adaptive RFF
        vals = tf.sqrt(self.gamma_val) * tf.matmul(inputs, self.rff_weights)
        vals = tf.complex(tf.cos(vals), -tf.sin(vals))
        vals = vals * tf.cast(tf.sqrt(1. / self.dim), tf.complex64)
        norms = tf.linalg.norm(vals, axis=-1)
        psi = vals / tf.expand_dims(norms, axis=-1)
        return psi


class DMRFF(tf.keras.Model):
    def __init__(self,
                 dim_x,
                 num_rff,
                 gamma=1,
                 random_state=None):
        super().__init__()
        self.rff_layer = QFeatureMapCompAdaptRFF(input_dim=dim_x, dim=num_rff, gamma=gamma, random_state=random_state, gamma_trainable=False)

    def call(self, inputs):

        # Complex Adaptive RFF
        x1 = inputs[:, 0]
        x2 = inputs[:, 1]
        phi1 = self.rff_layer(x1)
        phi2 = self.rff_layer(x2)
        dot = tf.einsum('...i,...i->...', tf.math.conj(phi1), phi2) * tf.einsum('...i,...i->...', tf.math.conj(phi2), phi1)
        dot = tf.cast(dot, tf.float32)
        return dot

def calc_rbf(dmrff, x1, x2):
    return dmrff.predict(np.concatenate([x1[:, np.newaxis, ...],
                                         x2[:, np.newaxis, ...]],
                                        axis=1),
                         batch_size=256)

# 2 Dimensions

In [None]:
def calculate_constant_qmkde(gamma=1, dimension = 1):
  sigma = (4*gamma)**(-1/2)
  coefficient = 1 /  (2*np.pi*sigma**2)**(dimension/2)
  return coefficient

In [None]:
def calculate_constant_log_qmkde(gamma=1, dimension = 1):
  sigma = (4*gamma)**(-1/2)
  coefficient = np.log(1) - (dimension/2.) *   np.log(2*np.pi*sigma**2)
  return coefficient

In [None]:
def raw_kde(x_test, x_train, gamma=1):
  sigma = (2*gamma)**(-1/2)
  euclidean_distance = np.sum(((x_test-x_train))**2, axis=1)
  exponential  = np.exp(-euclidean_distance/(2*sigma**2))
  constant_outside = 1/(np.size(x_train) * (2*np.pi*sigma**2)**(x_train.shape[1]/2))
  return constant_outside * np.sum(exponential)

In [None]:
def raw_log_kde(x_test, x_train, gamma=1):
  sigma = (2*gamma)**(-1/2)
  euclidean_distance = np.sum(((x_test-x_train))**2, axis=1)
  exponential  = np.exp(-euclidean_distance/(2*sigma**2))
  constant_outside = np.log(1) - np.log(np.size(x_train)) - (x_train.shape[1]/2) * np.log(2*np.pi*sigma**2)
  return constant_outside + np.log(np.sum(exponential))

In [None]:
def plot(X_train, X_train_density, X_test, X_test_density, name = "dataset"):
    plt.axes(frameon = 0)
    plt.grid()
    plt.scatter(X_test[:,0],  X_test[:,1], c = X_test_density , alpha = .5, s = 3, linewidths= 0.0000001)
    plt.colorbar()
    plt.title('potential 1 dataset')
    plt.savefig(f'{name}.png',dpi = 300)
    plt.show()

## Loading Cardio Data Set

In [None]:
cardio = io.loadmat("/content/cardio.mat")

cardio["X"].shape, cardio["y"].shape

((1831, 21), (1831, 1))

In [None]:
from sklearn.preprocessing import MinMaxScaler
from scipy.stats import zscore

def preprocessing_cardio(data):
  features, labels = cardio["X"], cardio["y"]
  labels = 1 - labels
  return features, labels

cardio_X, cardio_y = preprocessing_cardio(cardio)

cardio_X.shape, cardio_y.shape

((1831, 21), (1831, 1))

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(cardio_X, cardio_y, test_size=0.2, stratify=cardio_y, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, stratify=y_train, random_state=42)

print(f"shape of X_train: {X_train.shape} X_test: {X_test.shape} X_val {X_val.shape}")
n_classes = np.bincount(y_test.ravel().astype(np.int64))

print(f"classes: 0: {n_classes[0]} 1: {n_classes[1]} %-anomalies: {n_classes[0] / (n_classes[0] + n_classes[1])}")

shape of X_train: (1098, 21) X_test: (367, 21) X_val (366, 21)
classes: 0: 35 1: 332 %-anomalies: 0.09536784741144415


# Q-DEMDE method

In [None]:
sigma_kernel_training = 1
gamma_kernel_training = 1/(2*sigma_kernel_training**2)

print(sigma_kernel_training, gamma_kernel_training)

1 0.5


In [None]:
### training with gaussian training data set
def gaussian_kernel_data(dim_x_param, size_data_param, gamma_train_param = 0.5):
  X_3_temp = np.random.multivariate_normal(mean = np.zeros(dim_x_param), cov = (1/(2*gamma_train_param*dim_x_param))*np.identity(dim_x_param), size = size_data_param)
  X_4_temp = np.zeros((size_data_param, dim_x_param))
  return X_3_temp, X_4_temp

X_3, X_4 = gaussian_kernel_data(21, 40000)

X_3.shape, X_4.shape

((40000, 21), (40000, 21))

In [None]:
n_rffs = 8
dmrff = DMRFF(dim_x=21, num_rff=n_rffs, gamma=gamma_kernel_training, random_state=onp.random.randint(1, 10000))
dmrff.predict(np.concatenate([X_3[:, np.newaxis, ...],
                                         X_4[:, np.newaxis, ...]],
                                        axis=1),
                         batch_size=256)





array([0.52968204, 0.33350894, 0.68856066, ..., 0.3486627 , 0.11897445,
       0.5545847 ], dtype=float32)

In [None]:
print(f'Weights: {dmrff.rff_layer.rff_weights}')

Weights: <tf.Variable 'dmrff/q_feature_map_comp_adapt_rff/rff_weights:0' shape=(21, 8) dtype=float32, numpy=
array([[ 0.08058767, -0.50622344,  1.3648977 ,  0.12518477,  0.07727975,
         0.4959615 ,  1.7104647 ,  0.19181785],
       [-1.4302924 , -1.8972341 ,  2.5498772 , -0.03586077, -0.882153  ,
        -1.2816674 , -0.03416796, -0.6509322 ],
       [-0.9165925 ,  0.557703  , -0.3102901 ,  0.5750356 , -0.53722644,
        -0.8341377 ,  0.3218192 ,  0.20978394],
       [ 0.5246806 ,  0.8734697 , -1.1063647 , -0.52808464,  0.6800926 ,
         1.723438  ,  2.1215048 ,  1.0764236 ],
       [-1.0507231 ,  1.0122007 ,  1.0335882 , -0.1637971 ,  0.19543597,
        -0.5450009 ,  1.3252821 , -1.7506152 ],
       [-0.22328497, -1.0036465 , -0.7581642 ,  1.065141  ,  0.34329134,
         0.06904334,  0.5258007 , -1.0267732 ],
       [-1.5053595 , -0.11510926,  0.02895894, -1.0526949 , -0.08753666,
         1.9513191 ,  1.9881548 ,  0.40705755],
       [ 0.25365332,  0.4068337 ,  1.6755865

In [None]:
weights_qrff = dmrff.rff_layer.rff_weights.numpy()

weights_qrff.mean(), weights_qrff.std()

(0.2042368, 0.9653771)

In [None]:
def gauss_kernel_arr(x, y, gamma_param):
    return onp.exp(-gamma_param * onp.linalg.norm(x - y, axis=1) ** 2)

num_samples = 60_000 # num_samples original = 10000
rnd_idx1 = onp.random.randint(X_3.shape[0],size=(num_samples, ))
rnd_idx2 = onp.random.randint(X_4.shape[0],size=(num_samples, ))
x_train = onp.concatenate([X_3[rnd_idx1][:, onp.newaxis, ...],
                          X_4[rnd_idx2][:, onp.newaxis, ...]],
                         axis=1)

y_train = gauss_kernel_arr(x_train[:, 0, ...], x_train[:, 1, ...], gamma_param=gamma_kernel_training)

x_train.shape, y_train.shape

((60000, 2, 21), (60000,))

In [None]:
epochs = 4
opt = keras.optimizers.Adam(learning_rate=0.001)
dmrff.compile(optimizer=opt, loss="mse")
dmrff.fit(x_train, y_train, batch_size = 5, epochs=epochs)

Epoch 1/4




Epoch 2/4
Epoch 3/4
Epoch 4/4


<keras.src.callbacks.History at 0x797332496e90>

In [None]:
print(f'Weights: {dmrff.rff_layer.rff_weights}')

Weights: <tf.Variable 'dmrff/q_feature_map_comp_adapt_rff/rff_weights:0' shape=(21, 8) dtype=float32, numpy=
array([[ 0.04651476,  0.5082047 ,  0.35292906,  0.13418612, -0.5533088 ,
         1.1188024 ,  1.7008282 ,  0.17793581],
       [-1.2062833 , -2.318665  ,  1.8533612 ,  0.6333983 , -1.5140424 ,
        -0.46725386, -0.28763333, -0.46720308],
       [-1.3487393 , -0.1503717 ,  0.21834804,  1.0469569 , -1.327128  ,
        -0.5724581 ,  0.1598767 ,  0.9356341 ],
       [ 0.9672119 ,  0.17741428, -0.46344522,  0.3295363 ,  0.7259252 ,
         1.6963261 ,  1.5958912 ,  0.36801267],
       [-0.6921345 ,  1.1685077 , -0.12873697,  0.34068403, -0.6757964 ,
        -0.31442   ,  1.3809888 , -1.1226826 ],
       [-0.25587842, -1.478094  , -0.81711465,  1.8210952 ,  0.7793657 ,
        -0.5880266 ,  0.9309902 , -1.244175  ],
       [-2.0969706 ,  0.14382273,  0.31730476, -1.2940037 ,  1.2054616 ,
         2.053294  ,  0.93258905,  0.50095874],
       [ 0.13365753, -0.13156784,  1.3156506

In [None]:
weights_qaff = dmrff.rff_layer.rff_weights.numpy()

weights_qaff.mean(), weights_qaff.std()

(0.20532994, 1.0123316)

## Q-DEMDE QRFF

In [None]:
## Make predictions

gamma_feat = 0.0078125

weights_rff_temp = weights_qrff

def predict_features(X, var2, gamma_param):
  X_feat = onp.ones((len(X), var2.shape[1]), dtype = onp.complex128)
  X_feat[:, :] = onp.cos(np.sqrt(gamma_param)*(X @ var2)) - 1j*onp.sin(np.sqrt(gamma_param)*(X @ var2))
  X_feat *= onp.sqrt(1/(var2.shape[1]))
  return X_feat

X_feat_train = predict_features(X_train, weights_rff_temp, gamma_feat)
X_feat_test = predict_features(X_test, weights_rff_temp, gamma_feat)
X_feat_val = predict_features(X_val, weights_rff_temp, gamma_feat)

X_feat_train.shape, X_feat_test.shape, X_feat_val.shape

((1098, 8), (367, 8), (366, 8))

In [None]:
## Training
rho_train = onp.zeros((n_rffs, n_rffs))

for i in range(len(X_feat_train)):
  rho_train = rho_train + onp.outer(X_feat_train[i], onp.conjugate(X_feat_train[i]))

rho_train = rho_train / len(X_feat_train)

(onp.abs(X_feat_train[0])**2).sum()

1.0000000000000004

### QDMKDE circuit

In [None]:
num_qubits_temp = 2*int(np.log2(n_rffs))

dev_dmkde = qml.device("lightning.qubit", wires = 2*num_qubits_temp)

@qml.qnode(dev_dmkde)
def QDMKDE(x_test, U_train, lambda_train):
    upper_wires = [i for i in range(int(num_qubits_temp/2))]
    lower_wires = [i for i in range(int(num_qubits_temp/2), int(num_qubits_temp))]
    qml.MottonenStatePreparation(state_vector=x_test, wires=upper_wires)
    qml.MottonenStatePreparation(state_vector=np.sqrt(lambda_train), wires=lower_wires)
    qml.QubitUnitary(np.conjugate(U_train.T), wires=upper_wires)
    for i in range(int(num_qubits_temp//2)):
      qml.CNOT([lower_wires[i], upper_wires[i]])

    return qml.probs(wires=upper_wires)



In [None]:
## Predicting the density of test and val

lambda_train, U_train = np.linalg.eigh(rho_train)

### predict test data

prob_expected_mixed_test = onp.zeros(len(X_feat_test))

for i in range(len(prob_expected_mixed_test)):
  prob_expected_mixed_test[i] = QDMKDE(X_feat_test[i], U_train, lambda_train)[0]

prob_expected_mixed_test = calculate_constant_qmkde(gamma_feat/2, 21)*prob_expected_mixed_test

### predict val data

prob_expected_mixed_val = onp.zeros(len(X_feat_val))

for i in range(len(prob_expected_mixed_val)):
  prob_expected_mixed_val[i] = QDMKDE(X_feat_val[i], U_train, lambda_train)[0]

prob_expected_mixed_val = calculate_constant_qmkde(gamma_feat/2, 21)*prob_expected_mixed_val


prob_expected_mixed_test.shape, prob_expected_mixed_val.shape

((367,), (366,))

### Metrics

In [None]:
from sklearn.metrics import roc_curve, f1_score
from sklearn.metrics import classification_report

def classification(preds_val, preds_test, y_test, output_dict = False):
    thredhold = np.percentile(preds_val, q = 9.54)
    y_pred = preds_test > thredhold
    if output_dict == True:
      return classification_report(y_test, y_pred, digits=4, output_dict=True)
    else:
      return classification_report(y_test, y_pred, digits=4)

In [None]:
print(classification(prob_expected_mixed_val, prob_expected_mixed_test, y_test))

print(f"AUC = {round(roc_auc_score(y_test, prob_expected_mixed_test), 4)}")

              precision    recall  f1-score   support

         0.0     0.5484    0.4857    0.5152        35
         1.0     0.9464    0.9578    0.9521       332

    accuracy                         0.9128       367
   macro avg     0.7474    0.7218    0.7336       367
weighted avg     0.9085    0.9128    0.9104       367

AUC = 0.8897


## Q-DEMDE QAFF

In [None]:
## Make predictions

gamma_feat = 0.0078125

weights_qaff_temp = weights_qaff

def predict_features(X, var2, gamma_param):
  X_feat = onp.ones((len(X), var2.shape[1]), dtype = onp.complex128)
  X_feat[:, :] = onp.cos(np.sqrt(gamma_param)*(X @ var2)) - 1j*onp.sin(np.sqrt(gamma_param)*(X @ var2))
  X_feat *= onp.sqrt(1/(var2.shape[1]))
  return X_feat


X_feat_train = predict_features(X_train, weights_qaff_temp, gamma_feat)
X_feat_test = predict_features(X_test, weights_qaff_temp, gamma_feat)
X_feat_val = predict_features(X_val, weights_qaff_temp, gamma_feat)

X_feat_train.shape, X_feat_test.shape, X_feat_val.shape

((1098, 8), (367, 8), (366, 8))

In [None]:
## Training
rho_train = onp.zeros((n_rffs, n_rffs))

for i in range(len(X_feat_train)):
  rho_train = rho_train + onp.outer(X_feat_train[i], onp.conjugate(X_feat_train[i]))

rho_train = rho_train / len(X_feat_train)

(onp.abs(X_feat_train[0])**2).sum()

1.0000000000000004

### QDMKDE circuit

In [None]:
num_qubits_temp = 2*int(np.log2(n_rffs))

dev_dmkde = qml.device("lightning.qubit", wires = 2*num_qubits_temp)

@qml.qnode(dev_dmkde)
def QDMKDE(x_test, U_train, lambda_train):
    upper_wires = [i for i in range(int(num_qubits_temp/2))]
    lower_wires = [i for i in range(int(num_qubits_temp/2), int(num_qubits_temp))]
    qml.MottonenStatePreparation(state_vector=x_test, wires=upper_wires)
    qml.MottonenStatePreparation(state_vector=np.sqrt(lambda_train), wires=lower_wires)
    qml.QubitUnitary(np.conjugate(U_train.T), wires=upper_wires)
    for i in range(int(num_qubits_temp//2)):
      qml.CNOT([lower_wires[i], upper_wires[i]])

    return qml.probs(wires=upper_wires)



In [None]:
## Predicting the density of test and val

lambda_train, U_train = np.linalg.eigh(rho_train)

### predict test data

prob_expected_mixed_test = onp.zeros(len(X_feat_test))

for i in range(len(prob_expected_mixed_test)):
  prob_expected_mixed_test[i] = QDMKDE(X_feat_test[i], U_train, lambda_train)[0]

prob_expected_mixed_test = calculate_constant_qmkde(gamma_feat/2, 21)*prob_expected_mixed_test

### predict val data

prob_expected_mixed_val = onp.zeros(len(X_feat_val))

for i in range(len(prob_expected_mixed_val)):
  prob_expected_mixed_val[i] = QDMKDE(X_feat_val[i], U_train, lambda_train)[0]

prob_expected_mixed_val = calculate_constant_qmkde(gamma_feat/2, 21)*prob_expected_mixed_val


prob_expected_mixed_test.shape, prob_expected_mixed_val.shape

((367,), (366,))

### Metrics

In [None]:
from sklearn.metrics import roc_curve, f1_score
from sklearn.metrics import classification_report

def classification(preds_val, preds_test, y_test, output_dict = False):
    thredhold = np.percentile(preds_val, q = 9.54)
    y_pred = preds_test > thredhold
    if output_dict == True:
      return classification_report(y_test, y_pred, digits=4, output_dict=True)
    else:
      return classification_report(y_test, y_pred, digits=4)

In [None]:
print(classification(prob_expected_mixed_val, prob_expected_mixed_test, y_test))

print(f"AUC = {round(roc_auc_score(y_test, prob_expected_mixed_test), 4)}")

              precision    recall  f1-score   support

         0.0     0.5676    0.6000    0.5833        35
         1.0     0.9576    0.9518    0.9547       332

    accuracy                         0.9183       367
   macro avg     0.7626    0.7759    0.7690       367
weighted avg     0.9204    0.9183    0.9193       367

AUC = 0.9532
