In [1]:
#!pip install ipympl
#!pip install -U scikit-learn
#!pip install keras-vis

In [2]:
%reload_ext autoreload
%autoreload 2
import os
from tensorflow import keras
from matplotlib import pyplot as plt
from pennylane import numpy as np
from pennylane import numpy as np
import pennylane as qml
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
import os
from datetime import datetime
from sklearn import preprocessing

from IPython.display import display
%matplotlib inline
 
mnist_dataset = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist_dataset.load_data()

# Normalize pixel values within 0 and 1
train_images = train_images / (255)
test_images = test_images / (255)

# Add extra dimension for convolution channels
train_images = np.array(train_images[..., tf.newaxis], requires_grad=False)
test_images = np.array(test_images[..., tf.newaxis], requires_grad=False)


#name of model or experiment
model_name = "Q_Model"

n_epochs = 20   # Number of optimization epochs
n_layers = 1    # Number of random layers
n_batches = 64     # Size of the batches

np.random.seed(0)           # Seed for NumPy random number generator
tf.random.set_seed(0)       # Seed for TensorFlow random number generator


tf.config.get_visible_devices()



2024-05-16 18:13:47.950103: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [3]:


n_qubits = 4

rand_params = np.random.uniform(high=2 * np.pi, size=(n_layers, 4))

dev = qml.device("default.qubit.tf", wires=n_qubits)

keras.utils.get_custom_objects().clear()

@keras.utils.register_keras_serializable(package=(model_name+"_Layer"))
class ConvQLayer(keras.layers.Layer):
    

    #replace the contents of qnode with experiment circuit
    @qml.qnode(dev, interface='tf')
    def q_node(inputs):
        inputs *= np.pi
        # Encoding of 4 classical input values
        #Further testing of the AngleEmbedding function is needed
        qml.AngleEmbedding(inputs, wires=range(n_qubits), rotation='Y')
        # Filter from arxiv.org/abs/2308.14930

        qml.CNOT(wires=[1, 2])
        qml.CNOT(wires=[0, 3])

        # Measurement producing 4 classical output values
        return [qml.expval(qml.PauliZ(j)) for j in range(n_qubits)]

    def call(self, inputs):

        #14x14 flattened 2x2 squares
        get_subsections_14x14 = lambda im : tf.reshape(tf.unstack(tf.reshape(im,[14,2,14,2]), axis = 2),[14,14,4])

        #unpack 14x14 row by row
        list_squares_2x2 = lambda image_subsections: tf.reshape(tf.unstack(image_subsections, axis = 1), [196,4])


        #send 4 values to quantum function
        process_square_2x2 = lambda square_2x2 : self.q_node(square_2x2)

        #send all squares to the quantum function wrapper
        process_subsections = lambda squares: tf.vectorized_map(process_square_2x2,squares)

        #recompile the larger square
        separate_channels = lambda channel_stack: tf.reshape(channel_stack, [14,14,4])
        #each smaller square (channel) can be extracted as [:, :, channel]
        
        #apply function across batch
        preprocessing = lambda input: tf.vectorized_map(
            lambda image:(separate_channels(tf.transpose(process_subsections(list_squares_2x2(get_subsections_14x14(image)))))),
            input
        )

        return preprocessing(inputs)

qlayer = ConvQLayer()


#wrap preprocessing in model

@keras.utils.register_keras_serializable(package=(model_name+"_Pre_Model"))
def Pre_Model():
    """Initializes and returns a custom Keras model
    which is ready to be trained."""
    model = keras.models.Sequential([
        qlayer
    ])
    model.compile(
        optimizer='adam',
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model
pre_model = Pre_Model()

#core model

@keras.utils.register_keras_serializable(package=(model_name+"_Core_Model"))
def Q_Model():
    """Initializes and returns a custom Keras model
    which is ready to be trained."""
    model = keras.models.Sequential([
        keras.layers.Flatten(),
        keras.layers.Dense(10, activation="softmax")
    ])
    model.compile(
        optimizer='adam',
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

q_model = Q_Model()


#preprocessing
pre_train_images = pre_model.predict(train_images,batch_size=n_batches)
pre_test_images = pre_model.predict(test_images,batch_size=n_batches)

#training
q_history = q_model.fit(
    pre_train_images,
    train_labels,
    validation_data=(pre_test_images, test_labels),
    batch_size = n_batches,
    epochs=n_epochs,
    verbose=2
)


2024-05-16 18:13:50.733981: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1928] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 6795 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 2080, pci bus id: 0000:b3:00.0, compute capability: 7.5
I0000 00:00:1715883232.999161    5128 service.cc:145] XLA service 0x7903d4002340 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
I0000 00:00:1715883232.999226    5128 service.cc:153]   StreamExecutor device (0): NVIDIA GeForce RTX 2080, Compute Capability 7.5
2024-05-16 18:13:53.044212: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:268] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-05-16 18:13:53.118411: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:465] Loaded cuDNN version 8907


[1m 17/938[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m9s[0m 10ms/step 

I0000 00:00:1715883234.191099    5128 device_compiler.h:188] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


[1m938/938[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 11ms/step
[1m157/157[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 17ms/step
Epoch 1/20
938/938 - 3s - 3ms/step - accuracy: 0.8645 - loss: 0.4754 - val_accuracy: 0.9273 - val_loss: 0.2673
Epoch 2/20
938/938 - 2s - 2ms/step - accuracy: 0.9273 - loss: 0.2505 - val_accuracy: 0.9377 - val_loss: 0.2212
Epoch 3/20
938/938 - 2s - 2ms/step - accuracy: 0.9376 - loss: 0.2150 - val_accuracy: 0.9415 - val_loss: 0.2012
Epoch 4/20
938/938 - 2s - 2ms/step - accuracy: 0.9426 - loss: 0.1963 - val_accuracy: 0.9444 - val_loss: 0.1903
Epoch 5/20
938/938 - 2s - 2ms/step - accuracy: 0.9456 - loss: 0.1844 - val_accuracy: 0.9461 - val_loss: 0.1837
Epoch 6/20
938/938 - 2s - 2ms/step - accuracy: 0.9476 - loss: 0.1760 - val_accuracy: 0.9467 - val_loss: 0.1795
Epoch 7/20
938/938 - 2s - 2ms/step - accuracy: 0.9493 - loss: 0.1697 - val_accuracy: 0.9479 - val_loss: 0.1765
Epoch 8/20
938/938 - 2s - 2ms/step - accuracy: 0.9510 - loss: 0.1647 -

In [11]:
input_model = Pre_Model()

tr4 = input_model.predict(train_images[:4],batch_size=4)

q_model_scaled = keras.models.Sequential([
    keras.layers.Rescaling(scale=1./127.5, offset=-1),
    q_model.layers[0],
    q_model.layers[1]
    ])

print(np.argmax(q_model_scaled.predict((tr4[:4]+1)*127.5,batch_size=4), axis=1))

print(train_labels[:4])

full_model = keras.models.Sequential([
    keras.layers.Rescaling(scale=1./255),
    input_model.layers[0],
    q_model.layers[0],
    q_model.layers[1]
    ])

print(np.argmax(full_model.predict(train_images[:4]*255,batch_size=4), axis=1))

print(train_labels[:4])

display(full_model.summary())



[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 90ms/step
[5 0 4 1]
[5 0 4 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
[5 0 4 1]
[5 0 4 1]


None

AttributeError: 'Dense' object has no attribute 'fit'

### Visualize

In [25]:

from vis.losses import ActivationMaximization
from vis.regularizers import TotalVariation, LPNorm
from vis.input_modifiers import Jitter
from vis.optimizer import Optimizer
from vis.callbacks import GifGenerator

# The name of the layer we want to visualize
# (see model definition in vggnet.py)

output_class = [5]

losses = [
    (ActivationMaximization(full_model.layers[3], output_class), 2),
    (LPNorm(full_model.layers[0].input), 10),
    (TotalVariation(full_model.layers[0].input), 10)
]
opt = Optimizer(full_model.layers[0].input, losses)
opt.minimize(max_iter=500, verbose=True, input_modifiers=[Jitter()], callbacks=[GifGenerator('opt_progress')])


AttributeError: module 'keras.api.backend' has no attribute 'ndim'

In [27]:
dir(keras.backend.ndim)

['__annotations__',
 '__builtins__',
 '__call__',
 '__class__',
 '__closure__',
 '__code__',
 '__defaults__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__get__',
 '__getattribute__',
 '__getstate__',
 '__globals__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__kwdefaults__',
 '__le__',
 '__lt__',
 '__module__',
 '__name__',
 '__ne__',
 '__new__',
 '__qualname__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__type_params__',
 '_api_export_path',
 '_api_export_symbol_id']