From -- https://openreview.net/pdf?id=Sy2fzU9gl. I will use another matrics from https://arxiv.org/pdf/1802.05983.pdf once, I have implemented FactorVAE. 

In [1]:
import tensorflow as tf 
tf.enable_eager_execution()

In [2]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import os

In [3]:
## Code from Deepmind's Github
load_data = np.load("dsprites-dataset/dsprites_ndarray_co1sh3sc6or40x32y32_64x64.npz", encoding='bytes')
imgs = load_data['imgs']
latents_values = load_data['latents_values']
latents_classes = load_data['latents_classes']
metadata = load_data['metadata'][()]

print('Metadata: \n', metadata)

Metadata: 
 {b'date': b'April 2017', b'description': b'Disentanglement test Sprites dataset.Procedurally generated 2D shapes, from 6 disentangled latent factors.This dataset uses 6 latents, controlling the color, shape, scale, rotation and position of a sprite. All possible variations of the latents are present. Ordering along dimension 1 is fixed and can be mapped back to the exact latent values that generated that image.We made sure that the pixel outputs are different. No noise added.', b'version': 1, b'latents_names': (b'color', b'shape', b'scale', b'orientation', b'posX', b'posY'), b'latents_possible_values': {b'orientation': array([0.        , 0.16110732, 0.32221463, 0.48332195, 0.64442926,
       0.80553658, 0.96664389, 1.12775121, 1.28885852, 1.44996584,
       1.61107316, 1.77218047, 1.93328779, 2.0943951 , 2.25550242,
       2.41660973, 2.57771705, 2.73882436, 2.89993168, 3.061039  ,
       3.22214631, 3.38325363, 3.54436094, 3.70546826, 3.86657557,
       4.02768289, 4.18879

In [4]:
save_path = 'save/VAESprite'

In [5]:
class VAESprite(tf.keras.Model):
    """Same Architecture"""
    def __init__(self, latent_dim):
        super().__init__()
        self.latent_dim = latent_dim
        
        self.encoder = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(64, 64, 1)),
            tf.keras.layers.Conv2D(filters=32, kernel_size=4, strides=(2, 2), activation=tf.nn.elu),
            tf.keras.layers.Conv2D(filters=32, kernel_size=4, strides=(2, 2), activation=tf.nn.elu),
            tf.keras.layers.Conv2D(filters=64, kernel_size=4, strides=(2, 2), activation=tf.nn.elu),
            tf.keras.layers.Conv2D(filters=64, kernel_size=4, strides=(2, 2), activation=tf.nn.elu),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(latent_dim + latent_dim),
        ])
        
        self.decoder = tf.keras.Sequential([
            tf.keras.layers.InputLayer(input_shape=(latent_dim,)),
            tf.keras.layers.Dense(units=2*2*64, activation=tf.nn.relu),
            tf.keras.layers.Reshape(target_shape=(2, 2, 64)),
            tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=4, strides=(2, 2), padding="SAME", activation=tf.nn.elu),
            tf.keras.layers.Conv2DTranspose(filters=64, kernel_size=4, strides=(2, 2), padding="SAME", activation=tf.nn.elu),
            tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=4, strides=(4, 4), padding="SAME", activation=tf.nn.elu),
            tf.keras.layers.Conv2DTranspose(filters=32, kernel_size=2, strides=(2, 2), padding="SAME", activation=tf.nn.elu),
            tf.keras.layers.Conv2DTranspose(filters=1, kernel_size=1, strides=(1, 1), padding="SAME"),
        ])
        
    def sample(self):
        latent = tf.random_normal(shape=(1, self.latent_dim))
        return latent, tf.nn.sigmoid(self.decoder(latent))
    
    def call(self, img, is_sigmoid=False):
        """Reuse the code from the Google Example"""
        mean, log_var = tf.split(self.encoder(img), num_or_size_splits=2, axis=1)
        
        normal = tf.random_normal(shape=mean.shape)
        latent = normal * tf.exp(log_var * .5) + mean
        
        out = self.decoder(latent)
        if is_sigmoid:
            out = tf.nn.sigmoid(out)
        return latent, out, mean, log_var

In [6]:
vae = VAESprite(10)

In [7]:
optimizer = tf.train.AdamOptimizer(learning_rate=5e-5)
saver = tf.train.Checkpoint(optimizer=optimizer,
                            model=vae,
                            optimizer_step=tf.train.get_or_create_global_step())
saver.restore(tf.train.latest_checkpoint(save_path))

<tensorflow.python.training.checkpointable.util.CheckpointLoadStatus at 0x11e86f208>

In [8]:
latents_sizes = metadata[b'latents_sizes']
latents_bases = np.concatenate((latents_sizes[::-1].cumprod()[::-1][1:],
                                np.array([1,])))

def latent_to_index(latents):
    return np.dot(latents, latents_bases).astype(int)

def sample_latent(size=1):
    samples = np.zeros((size, latents_sizes.size))
    for lat_i, lat_size in enumerate(latents_sizes):
        samples[:, lat_i] = np.random.randint(lat_size, size=size)

    return samples

def show_images_grid(imgs_, num_images=25):
    ncols = 1
    nrows = num_images
    _, axes = plt.subplots(ncols, nrows, figsize=(nrows * 2, ncols * 2))
    axes = axes.flatten()

    for ax_i, ax in enumerate(axes):
        if ax_i < num_images:
            ax.imshow(imgs_[ax_i], cmap='Greys_r',  interpolation='nearest')
            ax.set_xticks([])
            ax.set_yticks([])
        else:
            ax.axis('off')

In [45]:
number_training_data = 10000
number_testing_data = 100

In [46]:
testing_data = []
all_training_data = []

In [57]:
for attr in range(1, 5):
    training_data = []
    for i in range(number_training_data):
        if i % 500 == 0:
            print(f"At {i}")
        latents_sampled = sample_latent(size=70)

        latents_sampled[:, attr] = 1
        indices_sampled = latent_to_index(latents_sampled)
        imgs_sampled = imgs[indices_sampled]
        imgs_sampled_tensor = tf.expand_dims(tf.convert_to_tensor(imgs_sampled, tf.float32), axis=-1)

        latent, _, _, _ = vae(imgs_sampled_tensor)
        left, right = tf.split(latent, 2)

        final_latent = tf.reduce_mean(tf.abs(left - right), axis=0)
        training_data.append(final_latent.numpy())

    training_data = np.stack(training_data)
    all_training_data.append(training_data)
    print("---------------")

At 0


KeyboardInterrupt: 

In [None]:
for i in range(number_testing_data):
    if i % 10 == 0:
        print(f"At {i}")
    latents_sampled = sample_latent(size=70)

    # Getting Oval Only
    latents_sampled[:, 1] = 1
    indices_sampled = latent_to_index(latents_sampled)
    imgs_sampled = imgs[indices_sampled]
    imgs_sampled_tensor = tf.expand_dims(tf.convert_to_tensor(imgs_sampled, tf.float32), axis=-1)

    latent, _, _, _ = vae(imgs_sampled_tensor)
    left, right = tf.split(latent, 2)

    final_latent = tf.reduce_mean(tf.abs(left - right), axis=0)
    testing_data.append(final_latent.numpy())
    
testing_data = np.stack(testing_data)
testing_data.shape

In [50]:
np.save('linear_classifier_train/VAE_Training_Linear.npy', training_data)
np.save('linear_classifier_train/VAE_Testing_Linear.npy', testing_data)

In [52]:
TRAIN_BUF = 60000
BATCH_SIZE = 100

In [54]:
train_dataset = tf.data.Dataset.from_tensor_slices(training_data).shuffle(TRAIN_BUF).batch(BATCH_SIZE)