In [None]:
import numpy as np
import os
import sys
import matplotlib.pyplot as plt
%matplotlib inline
from skimage.transform import rescale, resize
from tqdm import tqdm
import cv2

In [None]:
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

In [None]:
#LOADING THE DATA INTO A NUMPY ARRAY

x_data_train = []
x_data_train_path = []
for png in tqdm(os.listdir('Dataset/Compaq_orignal/train/images/1')):
    path = 'Dataset/Compaq_orignal/train/images/1/{}'.format(png)
    x_data_train_path.append(path)
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image,(256,256))
    image = image / 255.
#     image = image.astype('float32')
    x_data_train.append(image)
    
x_data_train = np.array(x_data_train)    

In [None]:
x_data_test = []
x_data_test_path = []
for png in tqdm(os.listdir('Dataset/Compaq_orignal/test/images/1')):
    path = 'Dataset/Compaq_orignal/test/images/1/{}'.format(png)
    x_data_test_path.append(path)
    image = cv2.imread(path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image,(256,256))
    image = image / 255.
#     image = image.astype('float32')
    x_data_test.append(image)
    
x_data_test = np.array(x_data_test)    

In [None]:
# x_data_test = x_data_train[-100:]

In [None]:
plt.imshow(x_data_train[89])

In [None]:
# KERAS IMPORTS
import tensorflow as tf
from tensorflow import keras

from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Input

from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import MaxPool2D, AvgPool2D
from tensorflow.keras.layers import UpSampling2D
# from tensorflow.keras.layers.advanced_activations import LeakyReLU
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Lambda
from tensorflow.keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Reshape

from tensorflow.keras.layers import Add, Multiply

from tensorflow.keras.losses import mse, binary_crossentropy

import tensorflow.keras.backend as K

In [None]:
#SET A SEED FOR REPRODUCABILITY
np.random.seed(20)

#NUMBER OF DIMENSIONS IN THE ENCODED LAYER
latent_dims = 256

In [None]:
#ENCODER
#BUILT WITH FUNCTIONAL MODEL DUE TO THE MULTIPLE INPUTS AND OUTPUTS

encoder_in = Input(shape=(256,256,3))   ##INPUT FOR THE IMAGE

encoder_l1 = Conv2D(filters=32, kernel_size=5, strides=1, padding='same', input_shape=(256,256,3))(encoder_in)
encoder_l1 = BatchNormalization()(encoder_l1)
encoder_l1 = Activation(LeakyReLU(0.2))(encoder_l1)

encoder_l1 = Conv2D(filters=64, kernel_size=5, strides=2, padding='same')(encoder_l1)
encoder_l1 = BatchNormalization()(encoder_l1)
encoder_l1 = Activation(LeakyReLU(0.2))(encoder_l1)


encoder_l2 = Conv2D(filters=128, kernel_size=5, strides=2, padding='same')(encoder_l1)
encoder_l2 = BatchNormalization()(encoder_l2)
encoder_l2 = Activation(LeakyReLU(0.2))(encoder_l2)

encoder_l3 = Conv2D(filters=256, kernel_size=5, strides=2, padding='same')(encoder_l2)
encoder_l3 = BatchNormalization()(encoder_l3)
encoder_l3 = Activation(LeakyReLU(0.2))(encoder_l3)


encoder_l4 = Conv2D(filters=512, kernel_size=5, strides=2, padding='same')(encoder_l3)
encoder_l4 = BatchNormalization()(encoder_l4)
encoder_l4 = Activation(LeakyReLU(0.2))(encoder_l4)

flatten = Flatten()(encoder_l4)

encoder_dense = Dense(1024)(flatten)
encoder_dense = BatchNormalization()(encoder_dense)
encoder_out = Activation(LeakyReLU(0.2))(encoder_dense)


mu = Dense(latent_dims)(encoder_out)
log_var = Dense(latent_dims)(encoder_out)


epsilon = Input(tensor=K.random_normal(shape=(K.shape(mu)[0], latent_dims)))  ##INPUT EPSILON FOR RANDOM SAMPLING

sigma = Lambda(lambda x: K.exp(0.5 * x))(log_var) # CHANGE log_var INTO STANDARD DEVIATION(sigma)
z_eps = Multiply()([sigma, epsilon])

z = Add()([mu, z_eps])

encoder=Model([encoder_in,epsilon], z)
print(encoder.summary())

In [None]:
# DECODER
# BUILT WITH SEQUENTIAL MODEL AS NO BRANCHING IS REQUIRED

decoder = Sequential()
decoder.add(Dense(1024, input_shape=(latent_dims,)))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Dense(8192))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Reshape(target_shape=(4,4,512)))

decoder.add(Conv2DTranspose(filters=256, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Conv2DTranspose(filters=128, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Conv2DTranspose(filters=64, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))


decoder.add(Conv2DTranspose(filters=32, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Conv2DTranspose(filters=16, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Conv2DTranspose(filters=8, kernel_size=5, strides=2, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation(LeakyReLU(0.2)))

decoder.add(Conv2DTranspose(filters=3, kernel_size=5, strides=1, padding='same'))
decoder.add(BatchNormalization())
decoder.add(Activation('sigmoid'))

print(decoder.summary())

In [None]:
# COMBINE ENCODER AND DECODER THE COMPLETE THE VARIATIONAL AUTO ENCODER

vae_preds = decoder(z)
vae = Model([encoder_in, epsilon], vae_preds)
vae.summary()

In [None]:
# MY LOSS FUNCTIONS

def reconstruction_loss(y_true, y_pred):
    return K.mean(K.square(y_true - y_pred))

def kl_loss(y_true, y_pred):
    kl_loss = - 0.5 * K.mean(1 + log_var - K.square(mu) - K.exp(log_var), axis=-1)
    return kl_loss

def vae_loss(y_true, y_pred):
#     return reconstruction_loss(y_true, y_pred) + 0.03 * kl_loss(y_true, y_pred)   #scaling kl_loss by 0.03 seem to help
    return reconstruction_loss(y_true, y_pred) + 0.5 * kl_loss(y_true, y_pred)


In [None]:
# kl_annealtime = 20

# class AnnealingCallback(Callback):
#     def __init__(self, weight):
#         self.weight = weight
#     def on_epoch_end (self, epoch, logs={}):
#         if epoch > klstart :
#             new_weight = min(K.get_value(self.weight) + (1./ annealtime), 1.)
#             K.set_value(self.weight, new_weight)
#         print ("Current KL Weight is " + str(K.get_value(self.weight)))


# # the starting value of weight is 0
# # define it as a keras backend variable
# weight = K.variable(0.)
# # wrap the loss as a function of weight
# def vae_loss(weight):
#     def loss (y_true, y_pred):
#         # mse loss
#         reconstruction_loss = K.sum(K.square(y_true - y_pred), axis=-1)
#         # kl loss
#         kl_loss = 1 + log_var - K.square(mu) - K.exp(log_var)
#         kl_loss = K.sum(kl_loss, axis=-1)
#         kl_loss *= -0.5
#         return reconstruction_loss + (weight * kl_loss)
#     return loss


In [None]:
# vae.compile(optimizer='adam', loss=vae_loss , metrics=[reconstruction_loss, kl_loss])
vae.compile(optimizer='adam', loss=vae_loss , metrics=[reconstruction_loss,kl_loss])
callbacks = [
    EarlyStopping(patience=10, verbose=1,monitor='val_loss'),
    ReduceLROnPlateau(monitor = 'val_loss',factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ModelCheckpoint('./models/vae_skin_keras/vae_b16/vae_oCompq_norm_kl0d5_l256.h5', verbose=1, save_best_only=True, save_weights_only=True)
]

In [None]:
history=vae.fit(x_data_train,x_data_train,
                epochs=150,
                batch_size=16,
                validation_data = (x_data_test,x_data_test),
               callbacks=callbacks)

In [None]:
print(history.history.keys())

In [None]:
def showloss(history):
        # loss plot
        plt.subplots(figsize=(15,10))
        plt.subplot(1,1,1)
        plt.plot(history.history['loss'])
        plt.plot(history.history['reconstruction_loss'])
        plt.plot(history.history['kl_loss'])
        plt.plot(history.history['val_loss'])
        plt.plot(history.history['val_reconstruction_loss'])
        plt.plot(history.history['val_kl_loss'])
        plt.title('Model loss')
        plt.xlabel('epochs')
        plt.ylabel('loss')
        plt.legend(['loss','reconstruction_loss','kl_loss','val_loss','val_reconstruction_loss','val_kl_loss'],loc = 'upper left')



showloss(history)

In [None]:
vae.load_weights('./models/vae_skin_keras/vae_b16/vae_oCompq_norm_kl0d5_l256.h5')

In [None]:
# ORIGINAL IMAGES
x_data_test = x_data_test[:10]

In [None]:
# Reconstructed images
predictions  = vae.predict(x_data_test[:10])

In [None]:
n = 10  # how many digits we will display
plt.figure(figsize=(20, 4))
for i in range(n):
    # display original
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(x_data_test[i].reshape(256, 256,3))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # display reconstruction
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(predictions[i].reshape(256, 256,3))
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()


In [None]:
# def write_images(path,predictions):
#     print(len(predictions))
#     for index,image in enumerate(predictions):
#         cv2.write("./Reconstructed/Test_reconstructions/Reconstructed_images_{}.png".format(index))