In [2]:
import cv2
import matplotlib.pyplot as plt
from google.colab.patches import cv2_imshow
import os
import glob
import numpy as np

In [3]:
import tensorflow as tf
from tensorflow.keras.layers import *
import tensorflow.keras.backend as K
from tensorflow.keras.models import *
from keras import optimizers
from keras import metrics
from tensorflow import keras

In [8]:
dir = '/content/drive/MyDrive/DL_project/dataset/'

In [9]:
classes=['freshapples',
         'freshbanana',
         'freshoranges',
         ]

In [5]:
# os.listdir(dir + 'train/' + classes[0])

In [10]:
shape = (256, 256)

In [11]:
train_data = []

for i in range(len(classes)):
  path = dir + '/train/' + classes[i] + '/*.png'
  for filename in glob.glob(path):
    img = cv2.imread(filename)
    img = cv2.resize(img, shape)
    train_data.append(img)
train_data = np.array(train_data)
print(train_data.shape)

(4740, 256, 256, 3)


In [12]:
from sklearn.model_selection import train_test_split

In [13]:
x_train, x_val = train_test_split(train_data, test_size = 0.6, random_state = 42, shuffle = True)
print(x_train.shape)
print(x_val.shape)

(1896, 256, 256, 3)
(2844, 256, 256, 3)


In [10]:
# x_train = np.array([np.array(val) for val in train_data])

In [4]:
class Sampling(Layer):
  def call(self, inputs):
    z_mean, z_log_var = inputs
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape =(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon


In [5]:
# Encoder
latent_dim = 8

encoder_inputs = Input(shape = (256, 256, 3))
x = Conv2D(8, 3, padding='same', activation='relu')(encoder_inputs)
x = Conv2D(8, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #128x128x8
x = Conv2D(16, 3, padding='same', activation='relu')(x)
x = Conv2D(16, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #64x64x16
x = Conv2D(32, 3, padding='same', activation='relu')(x)
x = Conv2D(32, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #32x32x32
x = Conv2D(64, 3, padding='same', activation='relu')(x)
x = Conv2D(64, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #16x16x64
x = Conv2D(128, 3, padding='same', activation='relu')(x)
x = Conv2D(128, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #8x8x128
x = Conv2D(256, 3, padding='same', activation='relu')(x)
x = Conv2D(256, 3, padding='same', activation='relu')(x)
x = BatchNormalization()(x)
x = MaxPooling2D()(x)                                     #4x4x256
x = Conv2D(256, 3, padding='same', activation='relu')(x)
x = MaxPooling2D((4,4))(x)

x = Flatten()(x)
x = Dense(16, activation = 'relu')(x)
z_mean = Dense(latent_dim, name = 'z_mean')(x)
z_log_var = Dense(latent_dim, name = 'z_log_var')(x)

# Sample from the latent space
z = Sampling()([z_mean, z_log_var])

encoder = Model(encoder_inputs, [z_mean, z_log_var, z], name ="encoder")
encoder.summary()

Model: "encoder"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 256, 256, 3)]        0         []                            
                                                                                                  
 conv2d (Conv2D)             (None, 256, 256, 8)          224       ['input_1[0][0]']             
                                                                                                  
 conv2d_1 (Conv2D)           (None, 256, 256, 8)          584       ['conv2d[0][0]']              
                                                                                                  
 batch_normalization (Batch  (None, 256, 256, 8)          32        ['conv2d_1[0][0]']            
 Normalization)                                                                             

In [7]:
latent_inputs = Input(shape = (latent_dim, ))
x = Dense(256, activation='relu')(latent_inputs)
x = Reshape((1, 1, 256))(x)

x = UpSampling2D((4,4))(x)                                #4x4x256
x = Conv2DTranspose(256, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #8x8x256
x = Conv2DTranspose(256, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(256, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(256, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #16x16x256
x = Conv2DTranspose(128, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(128, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(128, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #32x32x128
x = Conv2DTranspose(64, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(64, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(64, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #64x64x64
x = Conv2DTranspose(32, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(32, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(32, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #128x128x32
x = Conv2DTranspose(16, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(16, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(16, 3, padding = 'same', activation = 'relu')(x)
x = UpSampling2D()(x)                                     #256x256x16
x = Conv2DTranspose(8, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(8, 3, padding = 'same', activation = 'relu')(x)
x = Conv2DTranspose(8, 3, padding = 'same', activation = 'relu')(x)
decoder_outputs = Conv2DTranspose(3, 3, padding = 'same', activation = 'sigmoid')(x)

decoder = Model(latent_inputs, decoder_outputs, name ="decoder")
decoder.summary()

Model: "decoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 8)]               0         
                                                                 
 dense_2 (Dense)             (None, 256)               2304      
                                                                 
 reshape_1 (Reshape)         (None, 1, 1, 256)         0         
                                                                 
 up_sampling2d_7 (UpSamplin  (None, 4, 4, 256)         0         
 g2D)                                                            
                                                                 
 conv2d_transpose_20 (Conv2  (None, 4, 4, 256)         590080    
 DTranspose)                                                     
                                                                 
 up_sampling2d_8 (UpSamplin  (None, 8, 8, 256)         0   

In [14]:
# this class takes encoder and decoder models and
# define the complete variational autoencoder architecture
class VAE(Model):
	def __init__(self, encoder, decoder, **kwargs):
		super(VAE, self).__init__(**kwargs)
		self.encoder = encoder
		self.decoder = decoder

	def train_step(self, data):
		if isinstance(data, tuple):
			data = data[0]
		with tf.GradientTape() as tape:
			z_mean, z_log_var, z = encoder(data)
			reconstruction = decoder(z)
			reconstruction_loss = tf.reduce_mean(
				keras.losses.binary_crossentropy(data, reconstruction)
			)
			reconstruction_loss *= 256 * 256
			kl_loss = 1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)
			kl_loss = tf.reduce_mean(kl_loss)
			kl_loss *= -0.5
			total_loss = reconstruction_loss + kl_loss
		grads = tape.gradient(total_loss, self.trainable_weights)
		self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
		return {
			"loss": total_loss,
			"reconstruction_loss": reconstruction_loss,
			"kl_loss": kl_loss,
		}

In [15]:
def scheduler(epoch, lr):
  if epoch < 70:
    return lr
  else:
    return lr * tf.math.exp(-0.1)

lr_schedule = tf.keras.callbacks.LearningRateScheduler(scheduler)

In [16]:
checkpoint_path =  '/content/drive/MyDrive/DL_project/vae/cp.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_path)
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=False,
                                                 verbose=1)

In [17]:
x_train = x_train/255

In [None]:
# compile and train the model
vae = VAE(encoder, decoder)
vae.compile(optimizer ='rmsprop')
history = vae.fit(x_train, epochs = 100, batch_size = 64, callbacks=[lr_schedule, cp_callback])

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, label='Training loss')
plt.plot(epochs, val_loss, label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
import pandas as pd

hist_df = pd.DataFrame(history.history)
hist_csv_file = 'history.csv'
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)