In [None]:
import numpy as np
import tensorflow as tf
import tensorflow
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, Dense, Concatenate, Embedding, Add, Flatten
from tensorflow.keras.layers import Activation, LeakyReLU, Reshape
from tensorflow.keras.models import Model,load_model

import matplotlib.pyplot as plt

In [None]:
task = 'img_mix' # img_room_vq img_loc_vq img_boundary
color = 'rgba' # rgba rgb
dirc = 'LR_5564' # S_5564 L_5564 R_5564

In [None]:
def process(image):
    image = tf.cast(image/255. ,tf.float32)
    return image

def variance(image):
    image = tf.experimental.numpy.var(image)
    return image

train_data_gen = tf.keras.utils.image_dataset_from_directory(
    '%s' % (task),
    labels=None,
    label_mode=None,
    class_names=None,
    color_mode="%s" % (color),
    batch_size=32,
    image_size=(128, 128),
    shuffle=True,
    seed=7)

dataset_train = train_data_gen.map(process)

Variance (slow)

In [None]:
aa = np.zeros(shape=((32, 128, 128, 4)),dtype=np.int32)
for i,j in enumerate(dataset_train):
    if i == 0:
        aa = j
    elif 0 < i < 50000:
        aa = np.vstack((aa,j))

data_variance = np.var(aa[1:])
data_variance

In [None]:
class VectorQuantizer(layers.Layer):
    def __init__(self, num_embeddings, embedding_dim, beta=0.25, **kwargs):
        super().__init__(**kwargs)
        self.embedding_dim = embedding_dim
        self.num_embeddings = num_embeddings

        # The `beta` parameter is best kept between [0.25, 2] as per the paper.
        self.beta = beta

        # Initialize the embeddings which we will quantize.
        w_init = tf.random_uniform_initializer()
        self.embeddings = tf.Variable(
            initial_value=w_init(
                shape=(self.embedding_dim, self.num_embeddings), dtype="float32"
            ),
            trainable=True,
            name="embeddings_vqvae",
        )

    def call(self, x):
        # Calculate the input shape of the inputs and
        # then flatten the inputs keeping `embedding_dim` intact.
        input_shape = tf.shape(x)
        flattened = tf.reshape(x, [-1, self.embedding_dim])

        # Quantization.
        encoding_indices = self.get_code_indices(flattened)
        encodings = tf.one_hot(encoding_indices, self.num_embeddings)
        quantized = tf.matmul(encodings, self.embeddings, transpose_b=True)

        quantized = tf.reshape(quantized, input_shape)

        commitment_loss = tf.reduce_mean((tf.stop_gradient(quantized) - x) ** 2)
        codebook_loss = tf.reduce_mean((quantized - tf.stop_gradient(x)) ** 2)
        self.add_loss(self.beta * commitment_loss + codebook_loss)

        # Straight-through estimator.
        quantized = x + tf.stop_gradient(quantized - x)
        return quantized

    def get_code_indices(self, flattened_inputs):
        # Calculate L2-normalized distance between the inputs and the codes.
        similarity = tf.matmul(flattened_inputs, self.embeddings)
        distances = (
            tf.reduce_sum(flattened_inputs ** 2, axis=1, keepdims=True)
            + tf.reduce_sum(self.embeddings ** 2, axis=0)
            - 2 * similarity
        )

        # Derive the indices for minimum distances.
        encoding_indices = tf.argmin(distances, axis=1)
        return encoding_indices

def ResBlock(inputs,hidden):
    x = layers.Conv2D(hidden, 3, padding="same",strides=1, activation="relu")(inputs)
    x = layers.Conv2D(hidden, 3, padding="same",strides=1)(x)
    x = layers.Add()([inputs, x])
    return x

# Upsampling Block
def Upsampling(inputs, hidden, factor=1):
    x = layers.Conv2D(hidden * (factor ** 2), 3, padding="same")(inputs)
    x = layers.Conv2D(hidden * (factor ** 2), 3, padding="same")(x)
    x = layers.Add()([inputs, x])
    return x

In [None]:
latent_dim = 64

encoder_inputs = Input(shape=(128, 128, 4),name="input",dtype="float32")
x = layers.Conv2D(64, 3, activation="LeakyReLU", strides=2, padding="same")(encoder_inputs)
x = ResBlock(x,64)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(128, 3, activation="LeakyReLU", strides=2, padding="same")(x)
x = ResBlock(x,128)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(256, 3, activation="LeakyReLU", strides=3, padding="valid")(x)
x = ResBlock(x,256)
x = layers.BatchNormalization()(x)
x = layers.Conv2D(512, 3, activation="LeakyReLU", strides=2, padding='same')(x)
x = ResBlock(x,512)
x = layers.BatchNormalization()(x)
encoder_outputs = layers.Conv2D(latent_dim, 1, activation="LeakyReLU",strides=1, padding="same")(x)

encoder = Model(encoder_inputs,encoder_outputs, name="encoder")
encoder.summary()

In [None]:
latent_inputs = Input(shape=encoder.output.shape[1:])

x = layers.Conv2DTranspose(512, 3, activation="LeakyReLU", strides=1, padding="same")(latent_inputs)
x = layers.Conv2DTranspose(512, 3, activation="LeakyReLU", strides=3, padding="valid",output_padding=1)(x)
x = Upsampling(x, 512)
x = layers.Conv2DTranspose(256, 3, activation="LeakyReLU", strides=2, padding="same")(x)
x = Upsampling(x, 256)
x = layers.Conv2DTranspose(128, 3, activation="LeakyReLU", strides=2, padding="same")(x)
x = Upsampling(x, 128)
x = layers.Conv2DTranspose(64, 3, activation="LeakyReLU", strides=2, padding="same")(x)
x = Upsampling(x, 64)

decoder_outputs = layers.Conv2DTranspose(4, 3, padding="same",name="output",activation='sigmoid')(x)#
decoder = Model(latent_inputs, decoder_outputs, name="decoder")
decoder.summary()

In [None]:
def get_vqvae(latent_dim=64, num_embeddings=64):
    vq_layer = VectorQuantizer(num_embeddings, latent_dim, name="vector_quantizer")
    encod = encoder
    decod = decoder
    inputs = Input(shape=(128, 128, 4))
    encoder_outputs = encod(inputs)
    quantized_latents = vq_layer(encoder_outputs)
    reconstructions = decod(quantized_latents)
    return Model(inputs, reconstructions, name="vq_vae")

get_vqvae().summary()

In [None]:
class VQVAETrainer(Model):
    def __init__(self, train_variance, latent_dim=64, num_embeddings=64, **kwargs):
        super(VQVAETrainer, self).__init__(**kwargs)
        self.train_variance = train_variance
        self.latent_dim = latent_dim
        self.num_embeddings = num_embeddings

        self.vqvae = get_vqvae(self.latent_dim, self.num_embeddings)

        self.total_loss_tracker = tensorflow.keras.metrics.Mean(name="total_loss")
        self.reconstruction_loss_tracker = tensorflow.keras.metrics.Mean(
            name="reconstruction_loss"
        )
        self.vq_loss_tracker = tensorflow.keras.metrics.Mean(name="vq_loss")

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.vq_loss_tracker,
        ]

    def train_step(self, x):
        with tf.GradientTape() as tape:
            # Outputs from the VQ-VAE.
            reconstructions = self.vqvae(x)

            # Calculate the losses.
            reconstruction_loss = (
                tf.reduce_mean((x - reconstructions) ** 2) / self.train_variance
            )
            total_loss = reconstruction_loss + sum(self.vqvae.losses)

        # Backpropagation.
        grads = tape.gradient(total_loss, self.vqvae.trainable_variables)
        self.optimizer.apply_gradients(zip(grads, self.vqvae.trainable_variables))

        # Loss tracking.
        self.total_loss_tracker.update_state(total_loss)
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        self.vq_loss_tracker.update_state(sum(self.vqvae.losses))

        # Log results.
        return {
            "loss": self.total_loss_tracker.result(),
            "reconstruction_loss": self.reconstruction_loss_tracker.result(),
            "vqvae_loss": self.vq_loss_tracker.result(),
        }


In [None]:
class CustomLearningRateScheduler(tensorflow.keras.callbacks.Callback):

    def __init__(self, schedule):
        super(CustomLearningRateScheduler, self).__init__()
        self.schedule = schedule

    def on_epoch_begin(self, epoch, logs=None):
        if not hasattr(self.model.optimizer, "lr"):
            raise ValueError('Optimizer must have a "lr" attribute.')

        lr = float(tf.keras.backend.get_value(self.model.optimizer.learning_rate))

        scheduled_lr = self.schedule(epoch, lr)

        tf.keras.backend.set_value(self.model.optimizer.lr, scheduled_lr)
        print("\nEpoch %05d: Learning rate is %6.6f." % (epoch, scheduled_lr))

LR_SCHEDULE = [
    (0, 1e-6),
    (25, 5e-7)]

def lr_schedule(epoch, lr):

    if epoch < LR_SCHEDULE[0][0] or epoch > LR_SCHEDULE[-1][0]:
        return lr
    for i in range(len(LR_SCHEDULE)):
        if epoch == LR_SCHEDULE[i][0]:
            return LR_SCHEDULE[i][1]
    return lr

reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor='reconstruction_loss', factor=0.5,
                              patience=1, min_lr=1e-8)

In [None]:

checkpoint = tf.keras.callbacks.ModelCheckpoint('CoPLAN',"loss",0,True,
    True,"min","epoch",options=None,initial_value_threshold=None,)
vqvae_trainer = VQVAETrainer(0.1)
vqvae_trainer.compile(optimizer=tensorflow.keras.optimizers.Adam(learning_rate=1e-5))#learning_rate=1e-4
vqvae_trainer.fit(dataset_train,epochs=200,callbacks=[reduce_lr])#CustomLearningRateScheduler(lr_schedule)

In [None]:
from tensorflow.keras.preprocessing import image
encoder = vqvae_trainer.vqvae.get_layer("encoder")
quantizer = vqvae_trainer.vqvae.get_layer("vector_quantizer")
decoder = vqvae_trainer.vqvae.get_layer("decoder")
def show_subplot(original, reconstructed):
    plt.subplot(1, 2, 1)
    plt.imshow(original[:,:,0])
    plt.title("Original")
    plt.axis("off")

    plt.subplot(1, 2, 2)
    plt.imshow(reconstructed[:,:,0])
    plt.title("Reconstructed")
    plt.axis("off")

    plt.show()

img_path = r'%s\%d.png' % (task,18)

img = tf.keras.utils.load_img(
    img_path,
    grayscale=False,
    color_mode='%s' % (color),
    target_size=None,
)
x1 = image.img_to_array(img)
x = x1/255.
x2 = np.expand_dims(x, axis=0)

print(x.shape)


encoded_outputs = encoder.predict(x2)
code = quantizer(encoded_outputs)
reconstructions_test = decoder.predict(code).reshape((128, 128, 4))

show_subplot(x, reconstructions_test)

In [None]:
encoded_outputs = encoder.predict(x2)
flat_enc_outputs = encoded_outputs.reshape(-1, encoded_outputs.shape[-1])
codebook_indices = quantizer.get_code_indices(flat_enc_outputs)
codebook_indices = codebook_indices.numpy().reshape(encoded_outputs.shape[:-1])

plt.subplot(1, 2, 1)
plt.imshow(x)
plt.title("Original")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(codebook_indices[0])
plt.title("Code")
plt.axis("off")
plt.show()

In [None]:
import os

os.makedirs('VQ_Pretrained/mix_5564/vqvae_en/encoder', exist_ok=True)
os.makedirs('VQ_Pretrained/mix_5564/vqvae_de/decoder', exist_ok=True)
os.makedirs('VQ_Pretrained/mix_5564/vqvae_q', exist_ok=True)

In [None]:
encoder.save('VQ_Pretrained/mix_5564/vqvae_en/encoder.keras', overwrite=True, save_format=None, options=None)
decoder.save('VQ_Pretrained/mix_5564/vqvae_de/decoder.keras', overwrite=True, save_format=None, options=None)
np.save('VQ_Pretrained/mix_5564/vqvae_q/quantizer.npy',quantizer.embeddings.numpy())

In [None]:
encoder = load_model('VQ_Pretrained/mix_5564/vqvae_en/encoder.keras')
decoder = load_model('VQ_Pretrained/mix_5564/vqvae_de/decoder.keras')
vq_value = np.load('VQ_Pretrained/mix_5564/vqvae_q/quantizer.npy')
quantizer = VectorQuantizer(64, 64)
quantizer.embeddings = tf.Variable(
            initial_value=vq_value,
            trainable=False,
            name="embeddings_vqvae",
        )

In [None]:
from tensorflow.keras.preprocessing import image
np_codebooksL = np.zeros((80788,10,25)).astype(np.int32)
np_codebooksR = np.zeros((80788,10,25)).astype(np.int32)

type_in = np.load('Processed_data/T_type_ada.npz')['type_in']
type_in_new = np.insert(type_in,9,0,axis = 1)
for i in range(80788):
    for j,k in enumerate(type_in_new[i]):

        img_pathL = r'img_loc_sqe\%d\%d.png' % (j,i)

        img = image.load_img(img_pathL, color_mode='rgba', target_size=(128, 128))
        x1 = image.img_to_array(img)
        x = x1/255.
        x2 = np.expand_dims(x, axis=0)

        encoded_outputs = encoder.predict(x2)
        flat_enc_outputs = encoded_outputs.reshape(-1, encoded_outputs.shape[-1])
        codebook_indices = quantizer.get_code_indices(flat_enc_outputs).numpy()
        np_codebooksL[i,j] = codebook_indices

        img_pathR = r'img_room_sqe\%d\%d.png' % (j,i)

        img2 = image.load_img(img_pathR, color_mode='rgba', target_size=(128, 128))
        x11 = image.img_to_array(img2)
        xx = x11/255.
        x22 = np.expand_dims(xx, axis=0)

        encoded_outputs2 = encoder.predict(x22)
        flat_enc_outputs2 = encoded_outputs2.reshape(-1, encoded_outputs2.shape[-1])
        codebook_indices2 = quantizer.get_code_indices(flat_enc_outputs2).numpy()
        np_codebooksR[i,j] = codebook_indices2
        if k == 0:
            break
    if i % 100 == 0:
        print(i)


In [None]:
np.save('Processed_data/RPLAN_L_code.npy',np_codebooksL.astype(np.int32))
np.save('Processed_data/RPLAN_R_code.npy',np_codebooksR.astype(np.int32))