In [1]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Flatten, Reshape, Lambda, Dense, Conv2D, Conv2DTranspose, LeakyReLU, BatchNormalization, Dropout, Activation
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import os
from glob import glob

In [3]:
# run params
section = 'vae'
run_id = '0001'
data_name = 'faces'
RUN_FOLDER = 'run/{}/'.format(section)
RUN_FOLDER += '_'.join([run_id, data_name])

if not os.path.exists(RUN_FOLDER):
    os.mkdir(RUN_FOLDER)
    os.mkdir(os.path.join(RUN_FOLDER, 'viz'))
    os.mkdir(os.path.join(RUN_FOLDER, 'images'))
    os.mkdir(os.path.join(RUN_FOLDER, 'weights'))

mode =  'build' #'load' #


DATA_FOLDER = '../data/celeb/'

In [4]:
INPUT_DIM = (128,128,3)
BATCH_SIZE = 32

filenames = np.array(glob(os.path.join(DATA_FOLDER, '*/*.jpg')))

NUM_IMAGES = len(filenames)

In [5]:
data_gen = ImageDataGenerator(rescale=1./255)

data_flow = data_gen.flow_from_directory(DATA_FOLDER
                                         , target_size = INPUT_DIM[:2]
                                         , batch_size = BATCH_SIZE
                                         , shuffle = True
                                         , class_mode = 'input'
                                         , subset = "training"
                                            )

Found 202599 images belonging to 1 classes.


In [19]:
#Variational AutoEncoder class

class VAE():
    def __init__(self, input_shape,
                encoder_filters,
                encoder_kernel_size,
                encoder_strides,
                decoder_filters,
                decoder_kernel_size,
                decoder_strides,
                use_batchnorm,
                use_dropout,
                z_dim
                ):
        self.input_shape = input_shape
        self.encoder_filters = encoder_filters
        self.encoder_kernel_size = encoder_kernel_size
        self.encoder_strides = encoder_strides
        self.decoder_filters = decoder_filters
        self.decoder_kernel_size = decoder_kernel_size
        self.decoder_strides = decoder_strides
        self.use_batchnorm = use_batchnorm
        self.use_dropout = use_dropout
        self.z_dim = z_dim
        self._build()
    
    def _build_encoder(self):
        #build the encoder model
        encoder_input = Input(shape=self.input_shape)
        x = encoder_input
        for i in range(len(self.encoder_filters)):
            x = Conv2D(filters=self.encoder_filters[i],
                      kernel_size=self.encoder_kernel_size[i],
                      strides=self.encoder_strides[i],
                      padding='same')(x)
            if self.use_batchnorm:
                x = BatchNormalization()(x)
            
            x = LeakyReLU()(x)
            
            if self.use_dropout:
                x = Dropout(rate=0.5)(x)
        
        self.shape_before_flattening = K.int_shape(x)[1:]
        x = Flatten()(x)
        encoder_mu = Dense(self.z_dim)(x)
        encoder_log_var = Dense(self.z_dim)(x)
        encoder_intermediate_model = Model(encoder_input, (encoder_mu, encoder_log_var))
        
        def sample(args):
            mu, log_var = args
            epsilon = K.random_normal(shape=K.shape(mu), mean=0.0, stddev=1.0)
            z = mu + K.exp(log_var/2)*epsilon
            return z
        
        sampling_layer = Lambda(sample)([encoder_mu, encoder_log_var])
        encoder_output = sampling_layer
        encoder_model = Model(encoder_input, encoder_output)
        
        return encoder_input, encoder_output, encoder_mu, encoder_log_var, encoder_intermediate_model, encoder_model
    
    def _build_decoder(self):
        
        decoder_input = Input(shape=(self.z_dim,))
        x = Dense(np.prod(self.shape_before_flattening))(decoder_input)
        x = Reshape(self.shape_before_flattening)(x)
        
        for i in range(len(self.decoder_filters)):
            x = Conv2DTranspose(filters=self.decoder_filters[i],
                           kernel_size=self.decoder_kernel_size[i],
                           strides=self.decoder_strides[i],
                           padding='same')(x)
            
            if i < len(self.decoder_filters)-1:
                if self.use_batchnorm:
                    x = BatchNormalization()(x)

                x = LeakyReLU()(x)

                if self.use_dropout:
                    x = Dropout(rate=0.5)(x)
            else:
                x = Activation('sigmoid')(x)
                
        decoder_output = x
        
        decoder_model = Model(decoder_input, decoder_output)
        
        return decoder_input, decoder_output, decoder_model
                
        
            
    
    def _build(self):
        #build the encoder and decoder models
        
        self.encoder_input, \
        self.encoder_output, \
        self.encoder_mu, \
        self.encoder_log_var, \
        self.encoder_intermediate_model, \
        self.encoder_model = self._build_encoder()
        
        self.decoder_input, \
        self.decoder_output, \
        self.decoder_model = self._build_decoder()
        
        #build the VAE model from the above two pieces
        model_output = self.decoder_model(self.encoder_output)
        self.model = Model(self.encoder_input, model_output)
        
    def compile(self, learning_rate, r_loss_factor=1000):
        
        def vae_r_loss(y_true, y_pred):
            return K.mean(K.square(y_true-y_pred), axis=[1,2,3])
        
        def vae_kl_loss(y_true, y_pred):
            kl_loss =  -0.5 * K.sum(1 + self.encoder_log_var - K.square(self.encoder_mu) - K.exp(self.encoder_log_var), axis = 1)
            return kl_loss
        
        def vae_loss(y_true, y_pred):
            r_loss = vae_r_loss(y_true, y_pred) * r_loss_factor
            kl_loss = vae_kl_loss(y_true, y_pred)
            return  r_loss + kl_loss
        
        self.model.compile(loss=vae_loss, optimizer=Adam(learning_rate=learning_rate), metrics=['accuracy'])
        
    def train(self,x_train, batch_size, epochs):
        self.model.fit(x_train, x_train, batch_size=batch_size, epochs=epochs)
        
    
        

In [20]:
vae = VAE(input_shape=INPUT_DIM,
                 encoder_filters=[32, 64, 64, 64], 
                 encoder_kernel_size = [3, 3, 3, 3],
                 encoder_strides=[2, 2, 2, 2],
                 decoder_filters=[64, 64, 32, 3],
                 decoder_kernel_size=[3, 3, 3, 3],
                 decoder_strides=[2, 2, 2, 2],
                use_batchnorm=True,
                use_dropout=True,
                z_dim=200)

In [21]:
vae.encoder_model.summary()

Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv2d_4 (Conv2D)               (None, 64, 64, 32)   896         input_3[0][0]                    
__________________________________________________________________________________________________
batch_normalization_7 (BatchNor (None, 64, 64, 32)   128         conv2d_4[0][0]                   
__________________________________________________________________________________________________
leaky_re_lu_7 (LeakyReLU)       (None, 64, 64, 32)   0           batch_normalization_7[0][0]      
____________________________________________________________________________________________

In [22]:
vae.encoder_intermediate_model.summary()

Model: "model_4"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_3 (InputLayer)            [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv2d_4 (Conv2D)               (None, 64, 64, 32)   896         input_3[0][0]                    
__________________________________________________________________________________________________
batch_normalization_7 (BatchNor (None, 64, 64, 32)   128         conv2d_4[0][0]                   
__________________________________________________________________________________________________
leaky_re_lu_7 (LeakyReLU)       (None, 64, 64, 32)   0           batch_normalization_7[0][0]      
____________________________________________________________________________________________

In [23]:
vae.decoder_model.summary()

Model: "model_6"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_4 (InputLayer)         [(None, 200)]             0         
_________________________________________________________________
dense_5 (Dense)              (None, 4096)              823296    
_________________________________________________________________
reshape_1 (Reshape)          (None, 8, 8, 64)          0         
_________________________________________________________________
conv2d_transpose_4 (Conv2DTr (None, 16, 16, 64)        36928     
_________________________________________________________________
batch_normalization_11 (Batc (None, 16, 16, 64)        256       
_________________________________________________________________
leaky_re_lu_11 (LeakyReLU)   (None, 16, 16, 64)        0         
_________________________________________________________________
dropout_11 (Dropout)         (None, 16, 16, 64)        0   

In [24]:
vae.compile(learning_rate=0.001, r_loss_factor=10000)

In [25]:
LEARNING_RATE = 0.0005
R_LOSS_FACTOR = 10000
EPOCHS = 200
PRINT_EVERY_N_BATCHES = 100
INITIAL_EPOCH = 0

vae.model.fit_generator(
            data_flow
            , shuffle = True
            , epochs = EPOCHS
            )

Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200

KeyboardInterrupt: 

In [26]:
#reconstruct inputs from test set
n_to_show = 10
example_idx = np.random.choice(range(len(x_test)), n_to_show)
example_images = x_test[example_idx]

z_points = vae.encoder_model.predict(example_images)

reconst_images = vae.decoder_model.predict(z_points)

fig = plt.figure(figsize=(15, 3))
fig.subplots_adjust(hspace=0.4, wspace=0.4)

for i in range(n_to_show):
    img = example_images[i].squeeze()
    ax = fig.add_subplot(2, n_to_show, i+1)
    ax.axis('off')
    ax.text(0.5, -0.35, str(np.round(z_points[i],1)), fontsize=10, ha='center', transform=ax.transAxes)   
    ax.imshow(img, cmap='gray_r')

for i in range(n_to_show):
    img = reconst_images[i].squeeze()
    ax = fig.add_subplot(2, n_to_show, i+n_to_show+1)
    ax.axis('off')
    ax.imshow(img, cmap='gray_r')


NameError: name 'x_test' is not defined