In [3]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

try:
    # %tensorflow_version only exists in Colab.
    %tensorflow_version 2.x
except Exception:
    pass

# TensorFlow ≥2.0 is required
import tensorflow as tf
from tensorflow import keras
assert tf.__version__ >= "2.0"

# Common imports
import numpy as np
import os

# to make this notebook's output stable across runs
np.random.seed(42)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt
mpl.rc('axes', labelsize=14)
mpl.rc('xtick', labelsize=12)
mpl.rc('ytick', labelsize=12)


In [1]:
#Some Data processing
from PIL import Image
for file_name in os.listdir('./img/skins'):
    file_name = os.path.join('./img/skins', file_name)
    try:
        img  = Image.open(file_name)
    except:
        os.remove(file_name)
    if img.size == (64,32):
        os.remove(file_name)

IsADirectoryError: [Errno 21] Is a directory: './img/skins/skins'

In [30]:
class Encoder(keras.models.Model):
    def sampling(latent_dim = 64,**args):
        z_mean, z_log_var = args
        epsilon_mean = 0
        epsilon_std = 1.0
        epsilon = keras.random_normal(shape=(keras.shape(z_mean)[0], latent_dim),
                                  mean=epsilon_mean, stddev=epsilon_std)
        sampled_z = z_mean + keras.exp(z_log_var * 0.5 ) * epsilon
        return sampled_z

    def __init__(self,input_shape= (64,64,3),batch_size=128, latent_dim = 64):
        super(Encoder,self).__init__()
        self.conv1 = keras.layers.Conv2D(32, 4, strides=(2, 2))
        self.bn2 = keras.layers.BatchNormalization()
        self.activ1 = keras.layers.LeakyReLU()
        self.conv2 = keras.layers.Conv2D(64, 4, strides=(2, 2))
        self.bn3 = keras.layers.BatchNormalization()
        self.activ2 =keras.layers.LeakyReLU()
        self.conv3 =  keras.layers.Conv2D(128, 4, strides=(2, 2))
        self.bn4 = keras.layers.BatchNormalization()
        self.activ3 = keras.layers.LeakyReLU(),
        self.pool1 = keras.layers.GlobalAveragePooling2D()
        self.z_mean = keras.layers.Dense(latent_dim)
        self.z_log_var = keras.layers.Dense(latent_dim)
        
    
    def call(self,input):
        conv1 = self.conv1(input)
        bn2 = self.bn2(conv1)
        activ1 = self.activ1(bn2)
        conv2 = self.conv2(activ1)
        bn3 = self.bn3(conv2)
        activ2 = self.activ2(bn3)
        conv3 = self.conv3(activ2)
        bn4 = self.bn4(conv3)
        activ3 = self.activ3(bn4)
        pool1 = self.pool1(activ3)
        z_mean = self.z_mean(pool1)
        z_log_var = self.z_log_var(pool1)
        return z_mean,z_log_var
   


In [26]:
class Decoder(keras.models.Model):
    def __init__(self,input_shape= (64,64,3),batch_size=128, latent_dim = 64):
        super(Decoder,self).__init__()    
        self.decoder_input = keras.layers.Input(shape=(latent_dim))
        self.hidden1 = keras.layers.Dense(1024)
        self.reshape1 = keras.layers.Reshape((4, 4, 128))
        self.ups1 = keras.layers.UpSampling2D((2, 2), interpolation='nearest')
        self.conv1 = keras.layers.Conv2D(64, 3, strides=1, padding='same')
        self.bn1 = keras.layers.BatchNormalization()
        self.activ1 = keras.layers.LeakyReLU()
        self.ups2 = keras.layers.UpSampling2D((2, 2), interpolation='nearest')
        self.conv2 = keras.layers.Conv2D(32, 3, strides=1, padding='same')
        self.bn2 = keras.layers.BatchNormalization()
        self.activ2 = keras.layers.LeakyReLU()
        self.ups3 = keras.layers.UpSampling2D((2, 2), interpolation='nearest')
        self.conv3 = keras.layers.Conv2D(16, 3, strides=1, padding='same')
        self.bn3 = keras.layers.BatchNormalization()
        self.activ3 = keras.layers.LeakyReLU()
        self.ups4 = keras.layers.UpSampling2D((2, 2), interpolation='nearest')(x)
        self.conv4 = keras.layers.Conv2D(3, 3, strides=1, padding='same', activation='sigmoid')(x)
    
    def call(self, input):
        decoder_input = self.decoder_input(input)
        hidden1 = self.hidden1(decoder_input)
        reshape1 = self.reshape1(hidden1)
        ups1 = self.ups1(reshape1)
        conv1 = self.conv1(ups1)
        bn1 = self.bn1(conv1)
        activ1 = self.activ1(bn1)
        ups2 = self.ups2(activ1)
        conv2 = self.conv2(ups2)
        bn2 = self.bn2(conv2)
        activ2 = self.activ2(bn2)
        ups3 = self.ups3(activ2)
        conv3 = self.conv3(ups3)
        bn3 = self.bn3(conv3)
        activ3 = self.activ3(bn3)
        ups4 = self.activ3(activ3)
        conv4 = self.conv4(ups4)
        return conv4

In [None]:
class CVAE(keras.models.Model):
  def __init__(self, latent_dim):
    super(CVAE, self).__init__()
    self.encoder = Encoder(latent_dim)
    self.decoder = Decoder(latent_dim)
  
  @tf.function
  def encode(self,x):
    z_mean, z_log_var = self.encoder(x)
    return z_mean, z_log_var
  
  def sample(self, eps=None):
    if eps is None:
      eps = tf.random.normal(shape=(100, self.latent_dim))
    return self.decode(eps, apply_sigmoid=True)
  
  def decode(self, z, apply_sigmoid=False):
    logits = self.decoder(z)
    if apply_sigmoid:
      probs = tf.sigmoid(logits)
      return probs
    return logits
  
  def reparameterize(self, mean, logvar):
    eps = tf.random.normal(shape=mean.shape)
    return eps * tf.exp(logvar * .5) + mean



In [29]:
def log_normal_pdf(sample, mean, logvar, raxis=1):
  log2pi = tf.math.log(2. * np.pi)
  return tf.reduce_sum(
      -.5 * ((sample - mean) ** 2. * tf.exp(-logvar) + logvar + log2pi),
      axis=raxis)


def compute_loss(model, x):
  z_mean, z_log_var = model.encode(x)
  z = model.reparameterize(z_mean, z_log_var)
  x_logit = model.decode(z)
  cross_ent = tf.nn.sigmoid_cross_entropy_with_logits(logits=x_logit, labels=x)
  logpx_z = -tf.reduce_sum(cross_ent, axis=[1, 2, 3])
  logpz = log_normal_pdf(z, 0., 0.)
  logqz_x = log_normal_pdf(z, z_mean, z_log_var)
  return -tf.reduce_mean(logpx_z + logpz - logqz_x)

@tf.function
def train_step(model, x, optimizer):
  """Executes one training step and returns the loss.

  This function computes the loss and gradients, and uses the latter to
  update the model's parameters.
  """
  with tf.GradientTape() as tape:
    loss = compute_loss(model, x)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

In [None]:
epochs = 10
latent_dim = 2
num_examples_to_generate = 16

# keeping the random vector constant for generation (prediction) so
# it will be easier to see the improvement.
random_vector_for_generation = tf.random.normal(
    shape=[num_examples_to_generate, latent_dim])
model = CVAE(latent_dim)

In [None]:
def generate_and_save_images(model, epoch, test_sample):
  mean, logvar = model.encode(test_sample)
  z = model.reparameterize(mean, logvar)
  predictions = model.sample(z)
  fig = plt.figure(figsize=(4, 4))

  for i in range(predictions.shape[0]):
    plt.subplot(4, 4, i + 1)
    plt.imshow(predictions[i, :, :, 0], cmap='gray')
    plt.axis('off')

  # tight_layout minimizes the overlap between 2 sub-plots
  plt.savefig('image_at_epoch_{:04d}.png'.format(epoch))
  plt.show()

In [None]:
for test_batch in test_dataset.take(1):
  test_sample = test_batch[0:num_examples_to_generate, :, :, :]

In [None]:
generate_and_save_images(model, 0, test_sample)

for epoch in range(1, epochs + 1):
  start_time = time.time()
  for train_x in train_dataset:
    train_step(model, train_x, optimizer)
  end_time = time.time()

  loss = tf.keras.metrics.Mean()
  for test_x in test_dataset:
    loss(compute_loss(model, test_x))
  elbo = -loss.result()
  display.clear_output(wait=False)
  print('Epoch: {}, Test set ELBO: {}, time elapse for current epoch: {}'
        .format(epoch, elbo, end_time - start_time))
  generate_and_save_images(model, epoch, test_sample)