In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_probability as tfp

import larq as lq

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

2022-11-18 08:30:56.422129: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-11-18 08:30:56.567948: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2022-11-18 08:30:57.173545: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2022-11-18 08:30:57.173605: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or 

In [2]:
class VectorQuantizer(layers.Layer):
    def __init__(self, num_embeddings, embedding_dim, beta=0.25, **kwargs):
        super().__init__(**kwargs)
        self.embedding_dim = embedding_dim
        self.num_embeddings = num_embeddings

        # The `beta` parameter is best kept between [0.25, 2] as per the paper.
        self.beta = beta

        # Initialize the embeddings which we will quantize.
        w_init = tf.random_uniform_initializer()
        self.embeddings = tf.Variable(
            initial_value=w_init(
                shape=(self.embedding_dim, self.num_embeddings), dtype="float32"
            ),
            trainable=True,
            name="embeddings_vqvae",
        )

    def call(self, x):
        # Calculate the input shape of the inputs and
        # then flatten the inputs keeping `embedding_dim` intact.
        input_shape = tf.shape(x)
        flattened = tf.reshape(x, [-1, self.embedding_dim])

        # Quantization.
        encoding_indices = self.get_code_indices(flattened)
        encodings = tf.one_hot(encoding_indices, self.num_embeddings)
        quantized = tf.matmul(encodings, self.embeddings, transpose_b=True)

        # Reshape the quantized values back to the original input shape
        quantized = tf.reshape(quantized, input_shape)

        # Calculate vector quantization loss and add that to the layer. You can learn more
        # about adding losses to different layers here:
        # https://keras.io/guides/making_new_layers_and_models_via_subclassing/. Check
        # the original paper to get a handle on the formulation of the loss function.
        commitment_loss = tf.reduce_mean((tf.stop_gradient(quantized) - x) ** 2)
        codebook_loss = tf.reduce_mean((quantized - tf.stop_gradient(x)) ** 2)
        self.add_loss(self.beta * commitment_loss + codebook_loss)

        # Straight-through estimator.
        quantized = x + tf.stop_gradient(quantized - x)
        return quantized

    def get_code_indices(self, flattened_inputs):
        # Calculate L2-normalized distance between the inputs and the codes.
        similarity = tf.matmul(flattened_inputs, self.embeddings)
        distances = (
            tf.reduce_sum(flattened_inputs ** 2, axis=1, keepdims=True)
            + tf.reduce_sum(self.embeddings ** 2, axis=0)
            - 2 * similarity
        )

        # Derive the indices for minimum distances.
        encoding_indices = tf.argmin(distances, axis=1)
        return encoding_indices

In [3]:
def get_encoder(latent_dim=4):
    encoder_inputs = keras.Input(shape=(28, 28, 1))
    x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(
        encoder_inputs
    )
    x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
    encoder_outputs = layers.Conv2D(latent_dim, 1, padding="same")(x)
    return keras.Model(encoder_inputs, encoder_outputs, name="encoder")


def get_decoder(latent_dim=4):
    latent_inputs = keras.Input(shape=get_encoder(latent_dim).output.shape[1:])
    x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(
        latent_inputs
    )
    x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
    decoder_outputs = layers.Conv2DTranspose(1, 3, padding="same")(x)
    return keras.Model(latent_inputs, decoder_outputs, name="decoder")

In [4]:
def get_vqvae(latent_dim=4, num_embeddings=16):
    vq_layer = VectorQuantizer(num_embeddings, latent_dim, name="vector_quantizer")
    encoder = get_encoder(latent_dim)
    decoder = get_decoder(latent_dim)
    inputs = keras.Input(shape=(28, 28, 1))
    encoder_outputs = encoder(inputs)
    quantized_latents = vq_layer(encoder_outputs)
    reconstructions = decoder(quantized_latents)
    return keras.Model(inputs, reconstructions, name="vq_vae")


get_vqvae().summary()

2022-11-18 08:30:58.610925: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:980] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-11-18 08:30:58.647537: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:980] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-11-18 08:30:58.647773: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:980] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-11-18 08:30:58.648270: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

Model: "vq_vae"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_4 (InputLayer)        [(None, 28, 28, 1)]       0         
                                                                 
 encoder (Functional)        (None, 7, 7, 4)           19076     
                                                                 
 vector_quantizer (VectorQua  (None, 7, 7, 4)          64        
 ntizer)                                                         
                                                                 
 decoder (Functional)        (None, 28, 28, 1)         21121     
                                                                 
Total params: 40,261
Trainable params: 40,261
Non-trainable params: 0
_________________________________________________________________


In [5]:
class VQVAETrainer(keras.models.Model):
    def __init__(self, train_variance, latent_dim=4, num_embeddings=16, **kwargs):
        super(VQVAETrainer, self).__init__(**kwargs)
        self.train_variance = train_variance
        self.latent_dim = latent_dim
        self.num_embeddings = num_embeddings

        self.vqvae = get_vqvae(self.latent_dim, self.num_embeddings)

        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.vq_loss_tracker = keras.metrics.Mean(name="vq_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.vq_loss_tracker,
        ]

    def train_step(self, x):
        with tf.GradientTape() as tape:
            # Outputs from the VQ-VAE.
            reconstructions = self.vqvae(x)

            # Calculate the losses.
            reconstruction_loss = (
                tf.reduce_mean((x - reconstructions) ** 2) / self.train_variance
            )
            total_loss = reconstruction_loss + sum(self.vqvae.losses)

        # Backpropagation.
        grads = tape.gradient(total_loss, self.vqvae.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.vqvae.trainable_variables))

        # Loss tracking.
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.vq_loss_tracker.update_state(sum(self.vqvae.losses))

        # Log results.
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "vqvae_loss": self.vq_loss_tracker.result(),
        }

In [6]:
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

x_train = np.expand_dims(x_train, -1)
x_test = np.expand_dims(x_test, -1)
x_train_scaled = (x_train / 255.0) - 0.5
x_test_scaled = (x_test / 255.0) - 0.5
data_variance = np.var(x_train / 255.0)

In [76]:
vqvae_trainer = VQVAETrainer(data_variance, latent_dim=4, num_embeddings=16)
vqvae_trainer.compile(optimizer=keras.optimizers.Adam())
vqvae_trainer.fit(x_train_scaled, epochs=10, batch_size=128)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7f7cf86da050>

In [110]:
encoder = vqvae_trainer.vqvae.get_layer("encoder")
decoder = vqvae_trainer.vqvae.get_layer("decoder")
quantizer = vqvae_trainer.vqvae.get_layer("vector_quantizer")

# encoded_outputs = encoder.predict(test_images)
# flat_enc_outputs = encoded_outputs.reshape(-1, encoded_outputs.shape[-1])
# codebook_indices = quantizer.get_code_indices(flat_enc_outputs)
# codebook_indices = codebook_indices.numpy().reshape(encoded_outputs.shape[:-1])

encoded_outputs = encoder.predict(x_train)
flat_enc_outputs = encoded_outputs.reshape(-1, encoded_outputs.shape[-1])
codebook_indices = quantizer.get_code_indices(flat_enc_outputs)
e_train = codebook_indices.numpy().reshape(encoded_outputs.shape[:-1])
# e_train = keras.utils.to_categorical(e_train)

encoded_outputs = encoder.predict(x_test)
flat_enc_outputs = encoded_outputs.reshape(-1, encoded_outputs.shape[-1])
codebook_indices = quantizer.get_code_indices(flat_enc_outputs)
e_test = codebook_indices.numpy().reshape(encoded_outputs.shape[:-1])
# e_test = keras.utils.to_categorical(e_test)

# quantizer.
# (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# x_train = np.expand_dims(x_train, -1)
# x_test = np.expand_dims(x_test, -1)
# x_train_scaled = (x_train / 255.0) - 0.5
# x_test_scaled = (x_test / 255.0) - 0.5
# data_variance = np.var(x_train / 255.0)

# (train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

# train_images = train_images.reshape((60000, 28, 28, 1))
# test_images = test_images.reshape((10000, 28, 28, 1))

# Normalize pixel values to be between -1 and 1
# train_images, test_images = train_images / 127.5 - 1, test_images / 127.5 - 1



In [111]:
# N = np.shape(e_train)[0];
# b_train = np.zeros((N,28,28))
# for i_sample in range(N):
#     for x in range(7):
#         for y in range(7):
#             b_train[i_sample][4*x:4*x+4,4*y:4*y+4] = 2*np.reshape(e_train[i_sample][x,y],(4,4))-1
# N = np.shape(e_test)[0];
# b_test = np.zeros((N,28,28))
# for i_sample in range(N):
#     for x in range(7):
#         for y in range(7):
#             b_test[i_sample][4*x:4*x+4,4*y:4*y+4] = 2*np.reshape(e_test[i_sample][x,y],(4,4))-1

In [112]:
N = np.shape(e_train)[0];
b_train = np.zeros((N,7,7,4))
for i_sample in range(N):
    for x_block in range(7):
        for y_block in range(7):
            word = e_train[i_sample][x_block,y_block]
            for i_bit in range(4):
                b_train[i_sample][x_block,y_block][i_bit] = (word >> i_bit) & 1
N = np.shape(e_test)[0];
b_test = np.zeros((N,7,7,4))
for i_sample in range(N):
    for x_block in range(7):
        for y_block in range(7):
            word = e_test[i_sample][x_block,y_block]
            for i_bit in range(4):
                b_test[i_sample][x_block,y_block][i_bit] = (word >> i_bit) & 1
b_train, b_test = 2*b_train - 1, 2*b_test - 1

In [153]:
# All quantized layers except the first will use the same options
kwargs = dict(input_quantizer="ste_sign",
              kernel_quantizer="ste_sign",
              kernel_constraint="weight_clip")

model = tf.keras.models.Sequential()

# In the first layer we only quantize the weights and not the input
model.add(lq.layers.QuantConv2D(64, (7, 7),
                                # kernel_quantizer="ste_sign",
                                # kernel_constraint="weight_clip",
                                input_quantizer="ste_sign",
                                use_bias=False,
                                input_shape=(7, 7, 4)))
# model.add(tf.keras.layers.MaxPooling2D((2, 2)))
# model.add(tf.keras.layers.BatchNormalization(scale=False))

# model.add(lq.layers.QuantConv2D(64, (3, 3), use_bias=False, **kwargs))
# model.add(tf.keras.layers.MaxPooling2D((2, 2)))
# model.add(tf.keras.layers.BatchNormalization(scale=False))

# model.add(lq.layers.QuantConv2D(64, (3, 3), use_bias=False, **kwargs))
model.add(tf.keras.layers.BatchNormalization(scale=False))
model.add(tf.keras.layers.Flatten())

model.add(lq.layers.QuantDense(64, use_bias=False, **kwargs))
model.add(tf.keras.layers.BatchNormalization(scale=False))
model.add(lq.layers.QuantDense(10, use_bias=False, **kwargs))
model.add(tf.keras.layers.BatchNormalization(scale=False))
model.add(tf.keras.layers.Activation("softmax"))

In [154]:
lq.models.summary(model)

+sequential_30 stats--------------------------------------------------------------------------------------+
| Layer                   Input prec.         Outputs  # 1-bit  # 32-bit  Memory  1-bit MACs  32-bit MACs |
|                               (bit)                      x 1       x 1    (kB)                          |
+---------------------------------------------------------------------------------------------------------+
| quant_conv2d_54                   1  (-1, 1, 1, 64)        0     12544   49.00           0        12544 |
| batch_normalization_94            -  (-1, 1, 1, 64)        0       128    0.50           0            0 |
| flatten_21                        -        (-1, 64)        0         0       0           0            0 |
| quant_dense_44                    1        (-1, 64)     4096         0    0.50        4096            0 |
| batch_normalization_95            -        (-1, 64)        0       128    0.50           0            0 |
| quant_dense_45            

In [155]:
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.fit(b_train, y_train, batch_size=64, epochs=20)

test_loss, test_acc = model.evaluate(b_test, y_test)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [156]:
print(f"Test accuracy {test_acc * 100:.2f} %")

Test accuracy 87.57 %
