In [None]:
import keras
import tensorflow as tf
from tensorflow.keras.datasets import mnist,cifar10
from tensorflow.keras import Model,layers,backend as K
from tensorflow.keras.models import Sequential
from sklearn.metrics import mean_absolute_error
import numpy as np
import cv2
import pandas as pd
from glob import glob
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping

In [None]:
#Loading Art Dataset
#Before loading the art dataset,upload the kagal key which is the json file

# copy kaggle account file 
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# downloading data from kaggle
!kaggle datasets download -d thedownhill/art-images-drawings-painting-sculpture-engraving
# !kaggle datasets download -d sartajbhuvaji/brain-tumor-classification-mri

# unzip data
!unzip /content/art-images-drawings-painting-sculpture-engraving.zip
!unzip /content/brain-tumor-classification-mri.zip

In [None]:
def _collect_kaggle_dataset(sTraindir,sTestDir,imgHt,imgWt,aDirs):
  # preprocessing the dataset loaded from kaggle
  # create train and test lists 
  train_images = list()
  test_images = list()

  # add only directories, whose images are required to be added
  dirs = aDirs

  # upload images from directories as grey images, resize it to 128 (could be any other 8 multiple size)
  # then add it in train or test list, after adding images covert list to array for training mode
  #
  artHt= imgHt
  artWt = imgWt
  for directory in glob(sTraindir):
      if directory.split("/")[-1] in dirs:
          for img in glob(f"{directory}/*"):
              img = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
              if img is not None:
                  img = cv2.resize(img, (int(artHt), int(artWt)))
                  train_images.append(img)

  for directory in glob(sTestDir):
      if directory.split("/")[-1] in dirs:
          for img in glob(f"{directory}/*"):
              img = cv2.imread(img, cv2.IMREAD_GRAYSCALE)
              if img is not None:
                  img = cv2.resize(img, (int(artHt), int(artWt)))
                  test_images.append(img)

  train_images = np.array(train_images)
  test_images = np.array(test_images)
  return train_images,test_images

In [None]:
#Art dataset params
sArtTrainDir = "/content/dataset/dataset_updated/training_set/*"
sArtTestDir = "/content/dataset/dataset_updated/validation_set/*"
artImgHt = 256
artImgWt = 256
artDirs = ["drawings","sculpture"]
x_artTrain,x_artTest = _collect_kaggle_dataset(sArtTrainDir,sArtTestDir,artImgHt,artImgWt,artDirs)

#brain tumor mri datset params
sBtTrainDir = "/content/Training/*"
sBtTestDir = "/content/Testing/*"
btImgHt = 256
btImgWt = 256
dtDirs = ["glioma_tumor","meningioma_tumor","no_tumor","pituitary_tumor"]
x_btTrain,x_btTest = _collect_kaggle_dataset(sBtTrainDir,sBtTestDir,btImgHt,btImgWt,dtDirs)

In [None]:
_DATASET = "MNIST"  # MNIST or CIFAR10 or ART or BRAINTU
_MISSING_RATE = 0.1
_MISSING_TYPE = "MCAR" # MCAR OR MNAR

def _process_data(x_data):
    """
    Scales the dataset to the range [0, 1] and removes 40% of the pixels completely at random.
    Args:
        x_data: Dataset to be processed.
    Returns: Processed dataset with and without missing pixels, and the missing mask.
    """
    if len(x_data.shape) == 3:
        x_data = np.expand_dims(x_data, axis=3)
    x_data = x_data.astype('float32') / 255
    missing_mask = ''

    if _MISSING_TYPE == "MCAR":
      number_channels = x_data.shape[3]
      missing_mask = np.stack(
          (np.random.choice([0, 1], size=(x_data.shape[0], x_data.shape[1], x_data.shape[2]),
                            p=[1 - _MISSING_RATE, _MISSING_RATE]),) * number_channels, axis=-1)


      x_data_md = x_data * (~missing_mask.astype(bool)).astype(int) + -1.0 * missing_mask
      x_data_md[x_data_md == -1] = np.nan
    elif _MISSING_TYPE == "MNAR":
     x_data_md = x_data.copy()
     if _DATASET == "ART" or _DATASET =="BRAINTU":   
       x_data_md[:,70:76,70:76] = np.nan
     else:
       x_data_md[:,10:12,10:12] = np.nan  
     missing_mask = np.isnan(x_data_md)
    else:
      raise ValueError("Invalid missing type.")

    return x_data, x_data_md, missing_mask


if __name__ == '__main__':
    if _DATASET == "MNIST":
        source = mnist
        (x_train, _), (x_test, _) = source.load_data()
    elif _DATASET == "CIFAR10":
        source = cifar10
        (x_train, _), (x_test, _) = source.load_data()
    elif _DATASET == "ART":
      x_train = x_artTrain
      x_test = x_artTest
    elif _DATASET == "BRAINTU":
      x_train = x_btTrain
      x_test = x_btTest
    else:
        raise ValueError("Invalid dataset.")

    x_train, x_train_md, missing_mask_train = _process_data(x_train)
    x_test, x_test_md, missing_mask_test = _process_data(x_test)

In [None]:
# Function to calculate MAE
def _calculate_mae(missing_mask_test,x_test,x_test_imputed,algo):
    missing_mask_test_flat = missing_mask_test.astype(bool).flatten()
    mae = mean_absolute_error(x_test_imputed.flatten()[missing_mask_test_flat],
                            x_test.flatten()[missing_mask_test_flat])

    print(f"[{algo}] MAE for the {_DATASET} dataset and missing type {_MISSING_TYPE}: {mae:.4f}")

In [None]:
# Reshape 
img_width  = x_train.shape[1]
img_height = x_train.shape[2]
num_channels = 1 # Images are grey scale so 1 channel
input_shape = (img_height, img_width, num_channels)

In [None]:
# Creating Denoising Autoencoder model
def _DAE(input_shape):

    Input_img = K.Input(shape=input_shape)  
    
    #encoding architecture
    x = K.Conv2D(64, (3, 3), activation='relu', padding='same')(Input_img)
    x = K.MaxPool2D( (2, 2), padding='same')(x)
    x = K.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = K.MaxPool2D( (2, 2), padding='same')(x)
    x = K.Conv2D(16, (3, 3), activation='relu', padding='same')(x)
    encoded    = K.MaxPool2D( (2, 2), padding='same')(x)
    
    # decoding architecture
    x = K.Conv2D(16, (3, 3), activation='relu', padding='same')(encoded)
    x = K.UpSampling2D((2, 2))(x)
    x = K.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = K.UpSampling2D((2, 2))(x)
    x = K.Conv2D(64, (3, 3), activation='relu')(x)
    x = K.UpSampling2D((2, 2))(x)
    decoded   = K.Conv2D(1, (3, 3),activation='sigmoid', padding='same')(x)

    autoencoder = K.Model(Input_img, decoded)
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')
    return autoencoder

In [None]:
# Linear Imputation
def _linear_imputation(x_data):
    x_test_md = x_data.copy()
    iShape = x_test_md.shape
    x_test_md = x_test_md.reshape(np.prod(iShape[:-1]), iShape[-1])
    x_test_md = pd.DataFrame(x_test_md)
    x_test_imp = (x_test_md.ffill()+x_test_md.bfill())/2
    x_test_imp = x_test_imp.ffill().bfill()
    x_test_imp = x_test_imp.values.reshape(iShape)

In [None]:
# BUILDING Variational Autoencoder model

# # ================= ############
# # Encoder
# define 4 conv2D, flatten and then dense
# # ================= ############

latent_dim = 32 # Number of latent dim parameters

input_img = layers.Input(shape=input_shape, name='encoder_input')
x = layers.Conv2D(64, (5,5), activation='relu')(input_img)
x = layers.MaxPooling2D((2,2))(x)
x = layers.Conv2D(64, 3, activation='relu')(x)
x = layers.MaxPooling2D((2,2))(x)
x = layers.Conv2D(32, 3, activation='relu')(x)
x = layers.MaxPooling2D((2,2))(x)


conv_shape = K.int_shape(x) #Shape of conv to be provided to decoder
#Flatten
x = layers.Flatten()(x)
x = layers.Dense(32, activation='relu')(x)

# Two outputs, for latent mean and log variance (std. dev.)
#Use these to sample random variables in latent space to which inputs are mapped. 
z_mu = layers.Dense(latent_dim, name='latent_mu')(x)   #Mean values of encoded input
z_sigma = layers.Dense(latent_dim, name='latent_sigma')(x)  #Std dev. (variance) of encoded input

#REPARAMETERIZATION TRICK
# Define sampling function to sample from the distribution
# Reparameterize sample based on the process defined by Gunderson and Huang
# into the shape of: mu + sigma squared x eps
#This is to allow gradient descent to allow for gradient estimation accurately. 
def sample_z(args):
  z_mu, z_sigma = args
  eps = K.random_normal(shape=(K.shape(z_mu)[0], K.int_shape(z_mu)[1]))
  return z_mu + K.exp(z_sigma / 2) * eps

# sample vector from the latent distribution
# z is the labda custom layer we are adding for gradient descent calculations
  # using mu and variance (sigma)
z = layers.Lambda(sample_z, output_shape=(latent_dim, ), name='z')([z_mu, z_sigma])

#Z (lambda layer) will be the last layer in the encoder.
# Define and summarize encoder model.
encoder = Model(input_img, [z_mu, z_sigma, z], name='encoder')
print(encoder.summary())

In [None]:
# ================= ###########
# Decoder
#
# ================= #################

# decoder takes the latent vector as input
decoder_input = layers.Input(shape=(latent_dim, ), name='decoder_input')

# Need to start with a shape that can be remapped to original image shape as
#we want our final utput to be same shape original input.
#So, add dense layer with dimensions that can be reshaped to desired output shape
x = layers.Dense(conv_shape[1]*conv_shape[2]*conv_shape[3], activation='relu')(decoder_input)
# reshape to the shape of last conv. layer in the encoder, so we can 
x = layers.Reshape((conv_shape[1], conv_shape[2], conv_shape[3]))(x)
# upscale (conv2D transpose) back to original shape
# use Conv2DTranspose to reverse the conv layers defined in the encoder
x = layers.Conv2DTranspose(64, 3,padding='same', activation='relu',strides=2)(x)
x = layers.Conv2DTranspose(64, 3, activation='relu')(x)
x = layers.UpSampling2D((2,2))(x)
x = layers.Conv2DTranspose(64, 3, activation='relu',strides=1)(x)
x = layers.UpSampling2D((2,2))(x)
#x = layers.Conv2DTranspose(64, 3, activation='relu',strides=1)(x)
#Can add more conv2DTranspose layers, if desired. 
#Using sigmoid activation
x = layers.Conv2DTranspose(num_channels, 5, activation='sigmoid', name='decoder_output')(x)

# Define and summarize decoder model
decoder = Model(decoder_input, x, name='decoder')
decoder.summary()

# apply the decoder to the latent sample 
z_decoded = decoder(z)

In [None]:
def _vae_loss(x, z_decoded):
    x = K.flatten(x)
    z_decoded = K.flatten(z_decoded)
    
    # Reconstruction loss (as we used sigmoid activation we can use binarycrossentropy)
    recon_loss = tf.keras.losses.binary_crossentropy(x, z_decoded) * img_width * img_height
    # recon_loss = tf.keras.losses.binary_crossentropy(x, z_decoded) 
    
    # KL divergence
    kl_loss = -5e-1 * K.mean(1 + z_sigma - K.square(z_mu) - K.exp(z_sigma), axis=-1)
    return K.mean(recon_loss + (0.5 * kl_loss))

In [None]:
# =================
# VAE 
# =================
vae = Model(input_img, z_decoded, name='vae')
vae.add_loss(_vae_loss(input_img, z_decoded))
# Compile VAE
vae.compile(optimizer='adam', loss=None)

In [None]:
# Testing Linear Imputation
x_test_LI_imp = _linear_imputation(x_test_md)

In [None]:
# Replace NAN values with zero for traing and testing DAE and VAE
x_train_pre = np.nan_to_num(x_train_md, nan=0.0)
x_test_pre = np.nan_to_num(x_test_md, nan=0.0)
x_test_mis_mask = np.isnan(x_test_md).astype(int)

In [None]:
# callbacks for DAE and VAE
callbacks =[]
#will Reduce learning rate when a metric has stopped improving
callbacks.append(ReduceLROnPlateau(monitor='val_loss', factor=0.2,
                                               patience=10, min_lr=0))
#Stop training when a monitored metric has stopped improving
callbacks.append(EarlyStopping(monitor='val_loss', mode='min', verbose=0,
                                           patience=10))

In [None]:
# Training and Testing DAE
DAE = _DAE(input_shape)
dae_history = DAE.fit(x_train_pre,
        x_train,
        epochs = 50,
        batch_size=128,
        validation_split = 0.15,
        shuffle=True,
        callbacks=callbacks)
x_test_DAE_imp = DAE.predict(x_test_pre)
x_test_DAE_imp = x_test_pre * (~x_test_mis_mask.astype(bool)).astype(int) + x_test_DAE_imp * x_test_mis_mask

In [None]:
# Training and Testing VAE
vae_history = vae.fit(x_train_pre, 
        x_train, 
        epochs = 50, 
        batch_size = 64, 
        validation_split = 0.15,
        shuffle=True,
        callbacks=callbacks)
x_test_VAE_imp = vae.predict(x_test_pre)
x_test_VAE_imp = x_test_pre * (~x_test_mis_mask.astype(bool)).astype(int) + x_test_VAE_imp * x_test_mis_mask    

In [None]:
#plot learning curve
plt.legend(['train','val'])
plt.plot(vae_history.history['loss'])
plt.plot(vae_history.history['val_loss'])
plt.plot(dae_history.history['loss'])
plt.plot(dae_history.history['val_loss'])

In [None]:
# Calculating Mean Absolute Error
_calculate_mae(missing_mask_test,x_test,x_test_LI_imp,"LI")
_calculate_mae(missing_mask_test,x_test,x_test_DAE_imp,"DAE")
_calculate_mae(missing_mask_test,x_test,x_test_VAE_imp,"VAE")

In [None]:
#polt imputed images
iImg = 28
plt.figure(2,figsize=(2,2))
plt.title('Original Image')
plt.imshow(x_test[iImg][:,:,0])
plt.show()

plt.figure(2,figsize=(4,4))
plt.title('Missing Data Image')
plt.imshow(x_test_md[iImg][:,:,0])
plt.show()

plt.figure(2,figsize=(2,2))
plt.title('Imputed Image')
plt.imshow(x_test_VAE_imp[iImg][:,:,0])
plt.show()