# Efficient Mask Inpainting via Convolutional and Generative Adversarial Network Approaches
##### By Ryan Beckwith and Adam Peters

# Loading Data

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.preprocessing.image import img_to_array
import numpy as np
import cv2
import os
import glob
import matplotlib.pyplot as plt
import time
! pip install livelossplot
import livelossplot



In [None]:
# Downloading data from Kaggle
# Note: you must provide a valid kaggle.json api key

! pip install kaggle
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
! kaggle datasets download prasoonkottarathil/face-mask-lite-dataset
! unzip face-mask-lite-dataset.zip

In [None]:
SIZELARGE = 256
SIZESMALL = 128

In [None]:
def loadData(imageSize):
    masks = []
    noMasks = []

    def loadImage(file):
        image = cv2.imread(file)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = cv2.resize(image, (imageSize, imageSize))
        image = image.astype('float32') / 255.0
        return img_to_array(image)

    files = glob.glob('with_mask/*')
    files.sort()
    for i in range(1000): 
        masks.append(loadImage(files[i]))

    files = glob.glob('without_mask/*')
    files.sort()
    for i in range(1000): 
        noMasks.append(loadImage(files[i]))

    masks = np.array(masks)
    noMasks = np.array(noMasks)

    return masks, noMasks

In [None]:
masksLarge, noMasksLarge = loadData(SIZELARGE)
masksSmall, noMasksSmall = loadData(SIZESMALL)

# Callbacks for Training

In [None]:
class PerformancePlotCallback(keras.callbacks.Callback):
    def __init__(self, x_test, y_test, size):
        self.x_test = x_test
        self.y_test = y_test
        self.size = size

    def plot_image_pair(self, noMask, noMaskPred, epoch):
        plt.figure(figsize = (7,7))
        plt.subplot(1,2,1)
        plt.title("Ground Truth", fontsize = 15)
        plt.imshow(noMask.reshape(self.size, self.size, 3))
        plt.subplot(1,2,2)
        plt.title(f"Prediction for epoch {epoch}", fontsize = 15)
        plt.imshow(noMaskPred.reshape(self.size, self.size, 3))
        
    def on_epoch_end(self, epoch, logs={}):
        y_pred = self.model.predict(self.x_test)
        self.plot_image_pair(self.y_test, y_pred, epoch)

# Pooling-Free CNN

In [None]:
poolingFree = keras.Sequential()
poolingFree.add(keras.Input(shape=(SIZELARGE, SIZELARGE, 3)))  # 256x256 RGB images
poolingFree.add(layers.Conv2D(32, 3, strides=1, activation="relu", padding="same", input_shape=(SIZELARGE,SIZELARGE,3)))
poolingFree.add(layers.Conv2D(64, 5, strides=1, activation="relu", padding="same"))
poolingFree.add(layers.Conv2D(32, 3, strides=1, activation="relu", padding="same"))
poolingFree.add(layers.Dropout(.25))
poolingFree.add(layers.Dense(3, activation="relu"))
poolingFree.add(layers.Conv2D(32, 3, activation="relu", padding="same", ))
poolingFree.add(layers.Dropout(.2))
poolingFree.add(layers.Conv2D(3, 3, activation="relu", padding="same"))

In [None]:
poolingFree.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 256, 256, 32)      896       
                                                                 
 conv2d_1 (Conv2D)           (None, 256, 256, 64)      51264     
                                                                 
 conv2d_2 (Conv2D)           (None, 256, 256, 32)      18464     
                                                                 
 dropout (Dropout)           (None, 256, 256, 32)      0         
                                                                 
 dense (Dense)               (None, 256, 256, 3)       99        
                                                                 
 conv2d_3 (Conv2D)           (None, 256, 256, 32)      896       
                                                                 
 dropout_1 (Dropout)         (None, 256, 256, 32)      0

In [None]:
performance = PerformancePlotCallback(masksLarge[0].reshape((1, SIZELARGE, SIZELARGE, 3)), 
                                      noMasksLarge[0].reshape((1, SIZELARGE, SIZELARGE, 3)),
                                      SIZELARGE)
plot_losses = livelossplot.PlotLossesKeras()

In [None]:
# Compile model
poolingFree.compile(optimizer='Adam',
                    loss='mse',
                    metrics=['accuracy'])

poolingFree.fit(masksLarge[:90], noMasksLarge[:90],
                epochs=50,    
                callbacks=[plot_losses, performance],
                verbose=1,
                validation_data=(masksLarge[90:100], noMasksLarge[90:100]))

# Single-Pooling CNN

In [None]:
singlePool = keras.Sequential()
singlePool.add(keras.Input(shape=(SIZELARGE, SIZELARGE, 3)))
singlePool.add(layers.Conv2D(32, 3, strides=1, activation="relu", padding="same", input_shape=(SIZELARGE,SIZELARGE,3)))
singlePool.add(layers.MaxPooling2D(2, padding="same"))
singlePool.add(layers.Conv2D(64, 5, strides=1, activation="relu", padding="same"))
singlePool.add(layers.Dropout(.2))
singlePool.add(layers.BatchNormalization())
singlePool.add(layers.Conv2D(64, 3, strides=1, activation="relu", padding="same"))
singlePool.add(layers.Dropout(.25))
singlePool.add(layers.Conv2D(128, 3, strides=1, activation="relu", padding="same"))
singlePool.add(layers.Dropout(.25))
singlePool.add(layers.Dense(3, activation="relu"))
singlePool.add(layers.Conv2D(128, 3, activation="relu", padding="same", ))
singlePool.add(layers.UpSampling2D(2))
singlePool.add(layers.Conv2D(64, 3, activation="relu", padding="same", ))
singlePool.add(layers.Dropout(.2))
singlePool.add(layers.BatchNormalization())
singlePool.add(layers.Conv2D(32, 3, activation="relu", padding="same", ))
singlePool.add(layers.Dropout(.2))
singlePool.add(layers.Conv2D(64, 3, activation="relu", padding="same", ))
singlePool.add(layers.Dropout(.2))
singlePool.add(layers.Conv2D(3, 3, activation="relu", padding="same"))

In [None]:
singlePool.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_5 (Conv2D)           (None, 256, 256, 32)      896       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 128, 128, 32)     0         
 )                                                               
                                                                 
 conv2d_6 (Conv2D)           (None, 128, 128, 64)      51264     
                                                                 
 dropout_2 (Dropout)         (None, 128, 128, 64)      0         
                                                                 
 batch_normalization (BatchN  (None, 128, 128, 64)     256       
 ormalization)                                                   
                                                                 
 conv2d_7 (Conv2D)           (None, 128, 128, 64)     

In [None]:
performance = PerformancePlotCallback(masksSmall[0].reshape((1, SIZESMALL, SIZESMALL, 3)), 
                                      noMasksSmall[0].reshape((1, SIZESMALL, SIZESMALL, 3)),
                                      SIZESMALL)
plot_losses = livelossplot.PlotLossesKeras()

In [None]:
# Compile model
poolingFree.compile(optimizer='Adam',
                    loss='mse',
                    metrics=['accuracy'])

poolingFree.fit(masksSmall[:350], noMasksSmall[:350],
                epochs=50,    
                callbacks=[plot_losses, performance],
                verbose=1,
                validation_data=(masksSmall[350:400], noMasksSmall[350:400]))

# Encoder-Decoder CNN

In [None]:
encoder_decoder = keras.Sequential()
# Encoder unit
encoder_decoder.add(keras.Input(shape=(SIZELARGE, SIZELARGE, 3)))
encoder_decoder.add(layers.Conv2D(16, 3, strides=1, activation="relu", padding="same", input_shape=(SIZELARGE,SIZELARGE,3)))
encoder_decoder.add(layers.MaxPooling2D(2, padding="same"))
encoder_decoder.add(layers.Conv2D(32, 5, strides=1, activation="relu", padding="same"))
encoder_decoder.add(layers.MaxPooling2D(2, padding="same"))
encoder_decoder.add(layers.Conv2D(64, 3, strides=1, padding="same"))
encoder_decoder.add(layers.Dropout(.1))
encoder_decoder.add(layers.LeakyReLU(0.2))
encoder_decoder.add(layers.MaxPooling2D(2, padding="same"))
encoder_decoder.add(layers.Conv2D(64, 3, strides=1, activation="relu", padding="same"))
encoder_decoder.add(layers.MaxPooling2D(2, padding="same"))
encoder_decoder.add(layers.Conv2D(128, 3, strides=1, padding="same"))
encoder_decoder.add(layers.Dropout(.1))
encoder_decoder.add(layers.LeakyReLU(0.2))

# Decoder unit
encoder_decoder.add(layers.Conv2D(128, 3, strides=1, activation="relu", padding="same"))
encoder_decoder.add(layers.UpSampling2D(2))
encoder_decoder.add(layers.Conv2D(64, 5, strides=1, activation="relu", padding="same"))
encoder_decoder.add(layers.UpSampling2D(2))
encoder_decoder.add(layers.Conv2D(64, 3, strides=1, padding="same"))
encoder_decoder.add(layers.Dropout(.1))
encoder_decoder.add(layers.LeakyReLU(0.2))
encoder_decoder.add(layers.UpSampling2D(2))
encoder_decoder.add(layers.Conv2D(32, 3, strides=1, activation="relu", padding="same"))
encoder_decoder.add(layers.UpSampling2D(2))
encoder_decoder.add(layers.Conv2D(16, 3, strides=1, padding="same"))
encoder_decoder.add(layers.Dropout(.1))
encoder_decoder.add(layers.LeakyReLU(0.2))

# Output layer
encoder_decoder.add(layers.Conv2D(3, 3, activation="relu", padding="same"))

In [None]:
encoder_decoder.summary()

Model: "sequential_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_14 (Conv2D)          (None, 256, 256, 16)      448       
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 128, 128, 16)     0         
 2D)                                                             
                                                                 
 conv2d_15 (Conv2D)          (None, 128, 128, 32)      12832     
                                                                 
 max_pooling2d_2 (MaxPooling  (None, 64, 64, 32)       0         
 2D)                                                             
                                                                 
 conv2d_16 (Conv2D)          (None, 64, 64, 64)        18496     
                                                                 
 dropout_8 (Dropout)         (None, 64, 64, 64)       

In [None]:
performance = PerformancePlotCallback(masksLarge[0].reshape((1, SIZELARGE, SIZELARGE, 3)), 
                                      noMasksLarge[0].reshape((1, SIZELARGE, SIZELARGE, 3)),
                                      SIZELARGE)
plot_losses = livelossplot.PlotLossesKeras()

In [None]:
# Compile model
poolingFree.compile(optimizer='Adam',
                    loss='mse',
                    metrics=['accuracy'])

poolingFree.fit(masksLarge[:450], noMasksLarge[:450],
                epochs=100,    
                callbacks=[plot_losses, performance],
                verbose=1,
                validation_data=(masksLarge[450:500], noMasksLarge[450:500]))

# General GAN Implementation

In [None]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [None]:
@tf.function
def train_step(generator, discriminator, masks, noMasks):
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
      generated_images = generator(masks, training=True)
      real_output = discriminator(noMasks, training=True)
      fake_output = discriminator(generated_images, training=True)
      gen_loss = generator_loss(fake_output)
      disc_loss = discriminator_loss(real_output, fake_output)

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator,
                                            generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator,
                                                discriminator.trainable_variables))

In [None]:
def batch_generator(masks, noMasks, batch_size = 32):
    indices = np.arange(len(masks)) 
    np.random.shuffle(indices) 
    batchesMasks=[masks[indices[i * batch_size : min((i + 1) * batch_size, len(masks))]] for i in range(len(masks) // batch_size)]
    batchesNoMasks=[noMasks[indices[i * batch_size : min((i + 1) * batch_size, len(masks))]] for i in range(len(noMasks) // batch_size)]
    return batchesMasks, batchesNoMasks

def plot_image_pair(model, noMask, epoch):
    plt.figure(figsize = (7,7))
    plt.subplot(1,2,1)
    plt.title("Ground Truth", fontsize = 15)
    plt.imshow(noMask.reshape(SIZESMALL, SIZESMALL, 3))
    plt.subplot(1,2,2)
    plt.title(f"Prediction for epoch {epoch}", fontsize = 15)
    plt.imshow(tf.reshape(model(np.array([noMask])), (SIZESMALL, SIZESMALL, 3)))

def train(generator, discriminator, masks, noMasks, epochs):
  checkpoint_dir = './training_checkpoints'
  checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
  checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

  for epoch in range(epochs):
    start = time.time()

    masksBatches, noMasksBatches = batch_generator(masks, noMasks)
    for mB, nM in zip(masksBatches, noMasksBatches):
      train_step(mB, nM)

    # Plot predicted images as you go
    plot_image_pair(generator, masks[0], epoch)

    # Save the model every 15 epochs
    if (epoch + 1) % 15 == 0:
      checkpoint.save(file_prefix = checkpoint_prefix)

    print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

  # Generate after the final epoch
  display.clear_output(wait=True)

# Encoder-Decoder Generator With Pooling-Free Discriminator

In [None]:
# Pooling-free discriminator (used in both GAN models)
discriminator = tf.keras.Sequential()
discriminator.add(layers.Conv2D(64, (3, 3), strides=1, padding='same', input_shape=(SIZESMALL, SIZESMALL, 3)))
discriminator.add(layers.LeakyReLU())
discriminator.add(layers.Conv2D(128, (3, 3), strides=1, padding='same'))
discriminator.add(layers.LeakyReLU())
discriminator.add(layers.Dropout(0.25))
discriminator.add(layers.Flatten())
discriminator.add(layers.Dense(1))

In [None]:
discriminator.summary()

Model: "sequential_3"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_25 (Conv2D)          (None, 128, 128, 64)      1792      
                                                                 
 leaky_re_lu_4 (LeakyReLU)   (None, 128, 128, 64)      0         
                                                                 
 conv2d_26 (Conv2D)          (None, 128, 128, 128)     73856     
                                                                 
 leaky_re_lu_5 (LeakyReLU)   (None, 128, 128, 128)     0         
                                                                 
 dropout_12 (Dropout)        (None, 128, 128, 128)     0         
                                                                 
 flatten (Flatten)           (None, 2097152)           0         
                                                                 
 dense_2 (Dense)             (None, 1)                

In [None]:
# Train the GAN
train(encoder_decoder, discriminator, masksSmall[:32*10], noMasksSmall[:32*10], 150)

# Transposed Convolutional Generator With Pooling-Free Discriminator

In [None]:
generator = keras.Sequential()
generator.add(keras.Input(shape=(SIZESMALL, SIZESMALL, 3)))
generator.add(layers.Conv2D(32, 3, strides=1, activation="relu", padding="same", input_shape=(SIZESMALL, SIZESMALL, 3)))
generator.add(layers.MaxPooling2D(2, padding="same"))
generator.add(layers.Conv2D(64, 5, strides=1, activation="relu", padding="same"))
generator.add(layers.MaxPooling2D(2, padding="same"))
generator.add(layers.Conv2D(128, 3, strides=1, padding="same"))
generator.add(layers.Dropout(.25))
generator.add(layers.LeakyReLU(alpha=0.2))
generator.add(layers.Conv2DTranspose(64, (4, 4), strides=(2, 2), padding="same"))
generator.add(layers.LeakyReLU(alpha=0.2))
generator.add(layers.Conv2DTranspose(128, (4, 4), strides=(2, 2), activation="relu", padding="same"))
generator.add(layers.Dropout(.25))
generator.add(layers.LeakyReLU())
generator.add(layers.Conv2D(3, 3, activation="relu", padding="same"))

In [None]:
generator.summary()

Model: "sequential_4"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_27 (Conv2D)          (None, 128, 128, 32)      896       
                                                                 
 max_pooling2d_5 (MaxPooling  (None, 64, 64, 32)       0         
 2D)                                                             
                                                                 
 conv2d_28 (Conv2D)          (None, 64, 64, 64)        51264     
                                                                 
 max_pooling2d_6 (MaxPooling  (None, 32, 32, 64)       0         
 2D)                                                             
                                                                 
 conv2d_29 (Conv2D)          (None, 32, 32, 128)       73856     
                                                                 
 dropout_13 (Dropout)        (None, 32, 32, 128)      

In [None]:
# Train the GAN
train(generator, discriminator, masksSmall[:32*10], noMasksSmall[:32*10], 200)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

!sudo apt-get install texlive-xetex texlive-fonts-recommended texlive-generic-recommended
!cp "drive/MyDrive/MLFinal.ipynb" ./

!jupyter nbconvert --to html "MLFinal.ipynb"

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Reading package lists... Done
Building dependency tree       
Reading state information... Done
texlive-fonts-recommended is already the newest version (2017.20180305-1).
texlive-generic-recommended is already the newest version (2017.20180305-1).
texlive-xetex is already the newest version (2017.20180305-1).
The following packages were automatically installed and are no longer required:
  libnvidia-common-460 nsight-compute-2020.2.0
Use 'sudo apt autoremove' to remove them.
0 upgraded, 0 newly installed, 0 to remove and 42 not upgraded.
[NbConvertApp] Converting notebook MLFinal.ipynb to html
[NbConvertApp] Writing 388320 bytes to MLFinal.html
