# Mount drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive/') 

# Imports and Constants

## Constants

In [None]:
#@markdown Dataset
#@markdown ---
IMG_DIMS =	64#@param {type:"integer"}
NUM_CHANS = 3 #@param {type:"slider", min:1, max:4, step:1}
#number maximum of images used from the dataset (For small RAMs...)
MAX_DATA_SIZE = 50000 #@param {type:"slider", min:4000, max:50000, step:1000}
SPLIT_TRAIN_TEST = True #@param {type:"boolean"}
TRAIN_RATIO = 90 #@param {type:"slider", min:60, max:90, step:5}
#@markdown ---

#@markdown Training parameters
#@markdown ---
latent_dim = 1024 #@param {type:"slider", min:64, max:2048, step:64}
#default: 50
NUM_EPOCHS = 200 #@param {type:"slider", min:10, max:500, step:10}
#default: 128
BATCH_SIZE = 128 #@param {type:"slider", min:32, max:512, step:32}
INIT_LR = 05e-4 #@param {type:"number"}
#@markdown ---

#@markdown Benchmarks
#@markdown --
#Image quality in the benchmark images
BENCH_IMG_QUA =	64 #@param {type:"integer"}
IMG_DIR = "/content/lsun_bedroom/data0/" #@param {type:"string"}
OUTPUT_PATH = "/content/gdrive/MyDrive/Colab Files/Autoencoder/vae/" #@param {type:"string"}
SAMPLES_NUM = 8 #@param {type:"slider", min:8, max:30, step:1}
#@markdown ---


#@markdown Resizing
#@markdown ---
SAVE_RESIZED = True #@param {type:"boolean"}
RESIZED_DIR = "/content/lsun_bedroom/" #@param {type:"string"}
RESIZED_PATH=RESIZED_DIR + "trainImages_{}_{}.npy".format(IMG_DIMS, MAX_DATA_SIZE)




## Imports

In [None]:
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Lambda
from tensorflow.keras.models import Model
from tensorflow.keras import backend as K
K.clear_session()
import numpy as np

import matplotlib
matplotlib.use("Agg")

from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import cv2

import os

## Prepare Kaggle-cli

In [None]:
%%shell

#!/bin/bash
if [ -e "~/.kaggle/kaggle.json"]
then
	echo "kaggle.json already exists"
	echo "Skipping to next cell..."
else
	echo "Preparing Kaggle-cli..."
	pip install -q kaggle
	pip install -q kaggle-cli

	mkdir -p ~/.kaggle
	cp "/content/gdrive/MyDrive/Colab Files/Kaggle/kaggle.json" ~/.kaggle/
	cat ~/.kaggle/kaggle.json 
	chmod 600 ~/.kaggle/kaggle.json
fi

# Data Preprocessing

## Download and Unzip The DataSet



In [None]:
%%shell

#!/bin/bash
if [ -e "/content/lsun_bedroom.zip" ]
then
		echo "Dataset already downloaded"
		echo "Skipping to next cell..."
else
		echo "Downloading Dataset..."
		kaggle datasets download -d jhoward/lsun_bedroom
		echo "Unzipping Dataset..."
		unzip "/content/lsun_bedroom.zip" -d "/content/lsun_bedroom"
fi

## Prepare and Save The DataSet

In [None]:
def prepare_dataset(ds_exists= False, img_dim = IMG_DIMS, save_resized = SAVE_RESIZED, resized_path = RESIZED_PATH,
										max_data_size = MAX_DATA_SIZE, split_train_test = SPLIT_TRAIN_TEST, train_ratio = TRAIN_RATIO):
	if ds_exists:
		totalImages = np.load(resized_path)
		if split_train_test:
			# take first train_len images (stop at the train_len'th image), the rest are the test images
			train_test=int(max_data_size*(train_ratio/100))
			return totalImages[:train_test,:,:,:],totalImages[train_test:,:,:,:]
		else:
			return totalImages, np.array()
	else:
		images=[]
		length=1
		dim=0
		for file_name in [os.path.join(dp, f) for dp, dn, fn in os.walk(IMG_DIR) for f in fn]:
			img = cv2.imread(file_name)
			resized = cv2.resize(img,(img_dim,img_dim))
			images.append(resized)
			length += 1
			if length>max_data_size:
				break
		totalImages=np.array(images)
		print(totalImages)
		del images
		totalImages = totalImages.astype("float")/255
		if save_resized:
			np.save(resized_path,totalImages)
			print("Resized images saved in path: ", RESIZED_PATH)
		if split_train_test:
			train_test=int(max_data_size*(train_ratio/100))
			print (train_test)
			return totalImages[:train_test,:,:,:],totalImages[train_test:,:,:,:]
		else:
			return totalImages, np.array()

In [None]:
if os.path.isfile(RESIZED_PATH):
	print(".npy file already exists")
	print("Reading the existing file...")
	trainImages,testImages = prepare_dataset(ds_exists = True)
else:
	trainImages, testImages = prepare_dataset()
	print("Datasaet pre")
print(trainImages.shape)

# VAE and Training

## Models

### Functions

In [None]:
def sampling(mu_log_variance):
	mu, log_variance = mu_log_variance
	epsilon = K.random_normal(K.shape(mu), mean=0.0, stddev=1.0)
	random_sample = mu + K.exp(log_variance/2) * epsilon
	return random_sample

def loss_func(encoder_mu, encoder_log_variance):
	def vae_reconstruction_loss(y_true, y_predict):
		reconstruction_loss_factor = 1000
		reconstruction_loss = K.mean(K.square(y_true-y_predict), axis=[1, 2, 3])
		return reconstruction_loss_factor * reconstruction_loss

	def vae_kl_loss(encoder_mu, encoder_log_variance):
		kl_loss = -0.5 * K.sum(1.0 + encoder_log_variance - K.square(encoder_mu) - K.exp(encoder_log_variance), axis=[1,2,3])
		return kl_loss

	def vae_kl_loss_metric(y_true, y_predict):
		kl_loss = -0.5 * K.sum(1.0 + encoder_log_variance - K.square(encoder_mu) - K.exp(encoder_log_variance), axis=1)
		return kl_loss

	def vae_loss(y_true, y_predict):
		reconstruction_loss = vae_reconstruction_loss(y_true, y_predict)
		kl_loss = vae_kl_loss(y_true, y_predict)

		loss = reconstruction_loss + kl_loss
		return loss

	return vae_loss

### Encoder

In [None]:
inputShape = (IMG_DIMS, IMG_DIMS, NUM_CHANS)

input = Input(inputShape)

enc = Conv2D(filters=NUM_CHANS, kernel_size=(3, 3), padding="same", strides=1)(input)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=32, kernel_size=(3,3), padding="same", strides=1)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=64, kernel_size=(3,3), padding="same", strides=2)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=64, kernel_size=(3,3), padding="same", strides=2)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=128, kernel_size=(3,3), padding="same", strides=2)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=128, kernel_size=(3,3), padding="same", strides=2)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

enc = Conv2D(filters=128, kernel_size=(3,3), padding="same", strides=1)(enc)
enc = BatchNormalization()(enc)
enc = LeakyReLU()(enc)

shape_before_flatten = K.int_shape(enc)
enc_flatten = Flatten()(enc)
 
enc_mu = Dense(units=latent_dim)(enc_flatten)
enc_log_variance = Dense(units=latent_dim)(enc_flatten)



enc_output = Lambda(sampling)([enc_mu, enc_log_variance])

# build the encoder model
encoder = Model(input, enc_output)
print(encoder.summary())



### Decoder

In [None]:
dec_input = Input(shape=(latent_dim,))
dec = Dense(np.prod(shape_before_flatten[1:]))(dec_input)
dec = Reshape(target_shape=shape_before_flatten[1:])(dec)


dec = Conv2DTranspose(filters=128, kernel_size=(3, 3), padding="same", strides=1)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)

dec = Conv2DTranspose(filters=128, kernel_size=(3, 3), padding="same", strides=2)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)

dec = Conv2DTranspose(filters=128, kernel_size=(3, 3), padding="same", strides=2)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)

dec = Conv2DTranspose(filters=64, kernel_size=(3, 3), padding="same", strides=2)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)

dec = Conv2DTranspose(filters=64, kernel_size=(3, 3), padding="same", strides=2)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)


dec = Conv2DTranspose(filters=32, kernel_size=(3, 3), padding="same", strides=1)(dec)
dec = BatchNormalization()(dec)
dec = LeakyReLU()(dec)

dec = Conv2DTranspose(filters=NUM_CHANS, kernel_size=(3, 3), padding="same", strides=1)(dec)
dec = LeakyReLU()(dec)


# build the decoder model
decoder = Model(dec_input, dec)

print(decoder.summary())

### VAE

In [None]:
vae = Model(input, decoder(encoder(input)))
vae.summary()

vae.compile(optimizer=Adam(lr=INIT_LR), loss=loss_func(enc_mu, enc_log_variance))

## Training

In [None]:
H = vae.fit(trainImages, trainImages, epochs=NUM_EPOCHS, batch_size=BATCH_SIZE, shuffle=True, validation_data=(testImages, testImages))
vae.save(OUTPUT_PATH + "model.h5")

In [None]:
# construct a plot that plots and saves the training history
N = np.arange(0, NUM_EPOCHS)
plt.style.use("ggplot")
plt.figure()
plt.plot(N, H.history["loss"], label="train_loss")
plt.plot(N, H.history["val_loss"], label="val_loss")
plt.title("Training Loss and Accuracy")
plt.xlabel("Epoch #")
plt.ylabel("Loss/Accuracy")
plt.legend(loc="lower left")
plt.savefig(OUTPUT_PATH + "graph-4.png")

# use the convolutional autoencoder to make predictions on the
# testing images, then initialize our list of output images
print("[INFO] making predictions...")
decoded = vae.predict(testImages[:SAMPLES_NUM])
outputs = None

# loop over our number of output samples
for i in range(0, SAMPLES_NUM):
	# grab the original image and reconstructed image
	original = (testImages[i] * 255).astype("uint8")
	recon = (decoded[i] * 255).astype("uint8")

	# stack the original and reconstructed image side-by-side
	output = np.hstack([original, recon])

	# if the outputs array is empty, initialize it as the current side-by-side image display
	if outputs is None:
		outputs = output
	# otherwise, vertically stack the outputs
	else:
		outputs = np.vstack([outputs, output])

# save the outputs image
cv2.imwrite(OUTPUT_PATH + "output_samples-4.png", outputs)
print("Done.")