# Autoencoder denoising

Example of Autoencoder Convolutional Neural Net (CNN) with TensorFlow (TF) 2.0 for denoising. This example builds an Autoencoder from scratch, with custom layers and training.

Sources:

*   [MIT - Introduction to Deep Learning](http://introtodeeplearning.com)
*   [PyImageSearch](https://www.pyimagesearch.com/2020/02/24/denoising-autoencoders-with-keras-tensorflow-and-deep-learning/)


`Author: Rodrigo Vimieiro`

`Date: Apr, 2020`

<table class="tfo-notebook-buttons" align="left">
  <td>
    <a target="_blank" href="https://colab.research.google.com/github/LAVI-USP/Machine-Learning/blob/master/Deep%20Learning/Auto%20Encoders/CNN_AutoEncoder_Denoising_mnist.ipynb"><img src="https://www.tensorflow.org/images/colab_logo_32px.png" />Run in Google Colab</a>
  </td>
  <td>
    <a target="_blank" href="https://github.com/LAVI-USP/Machine-Learning/blob/master/Deep%20Learning/Auto%20Encoders/CNN_AutoEncoder_Denoising_mnist.ipynb"><img src="https://www.tensorflow.org/images/GitHub-Mark-32px.png" />View source on GitHub</a>
  </td>
</table>


In [0]:
# Import Tensorflow 2.0
%tensorflow_version 2.x
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import functools

from tqdm import tqdm


## Define MNIST help functions:

In [0]:
class MNIST_Helper():
    
  def download(self):

    # Loads the MNIST dataset.
    (self.x_train, self.y_train) ,(self.x_test, self.y_test) =  tf.keras.datasets.mnist.load_data()

    self.train_dataset_shape = self.x_train.shape
    self.test_dataset_shape = self.x_test.shape


  def print_information(self):

    # Print informations about the MNIST dataset.
    print("There is %d training samples, containing images with shape of: %dx%d and %d channel" 
          % (self.train_dataset_shape[0],self.train_dataset_shape[1],self.train_dataset_shape[2],self.train_dataset_shape[3]))
    print("Train variable shape:", end='')
    print(self.train_dataset_shape)

  def pre_process(self):

    # Reshape to have 1 channel (last dimension) and normalize
    self.x_train = np.expand_dims(self.x_train, axis=-1).astype(np.float32)
    self.x_test = np.expand_dims(self.x_test, axis=-1).astype(np.float32)

    # Normalize data
    ## https://stats.stackexchange.com/questions/185853/why-do-we-need-to-normalize-the-images-before-we-put-them-into-cnn
    self.x_train /= 255
    self.x_test /= 255

    self.train_dataset_shape = self.x_train.shape
    self.test_dataset = self.x_test.shape

  def noise_injection(self):

    # Gaussian noise
    self.x_train_noise = self.x_train + tf.random.normal(self.x_train.shape, mean=0.5, stddev=0.5, dtype=np.float32)
    self.x_test_noise = self.x_test + tf.random.normal(self.x_test.shape, mean=0.5, stddev=0.5, dtype=np.float32)

    # Poisson noise (not really, lambda should be equal to mean)
    #self.x_train_noise += tf.random.poisson(self.x_train_noise.shape, lam=60/255, dtype=np.float32)
    #self.x_test_noise += tf.random.poisson(self.x_test_noise.shape, lam=60/255, dtype=np.float32)

    self.x_train_noise = np.clip(self.x_train_noise, 0, 1)
    self.x_test_noise = np.clip(self.x_test_noise, 0, 1)
  
  def create_dataset_iterable(self):

    # Create database objects
    self.train_dataset = tf.data.Dataset.from_tensor_slices((self.x_train_noise, self.x_train))
    self.test_dataset = tf.data.Dataset.from_tensor_slices((self.x_test_noise, self.x_test))
      
  def shuffle_dataset(self, dataset_size):
    
    return self.train_dataset.shuffle(dataset_size)

### Defining the Autoencoder model:

In [0]:
class autoencoder_model(tf.keras.Model):

  def __init__(self):

    super(autoencoder_model, self).__init__()
    
    self.n_outputs = 30 # Number of latent variables
    self.n_filters = 32
    self.img_input_shape = (28,28,1) # nRows X nCols X nChannels

    self.encoder = self.build_encoder()
    self.decoder = self.build_decoder()

  def build_encoder(self):

    Conv2D = functools.partial(tf.keras.layers.Conv2D, padding='same', activation='relu', strides=2)
    BatchNormalization = tf.keras.layers.BatchNormalization
    Flatten = tf.keras.layers.Flatten
    Dense = functools.partial(tf.keras.layers.Dense, activation='relu')

    model = tf.keras.Sequential([

      # Downscaling convolutions                           
      Conv2D(filters=1*self.n_filters, kernel_size=3, input_shape=self.img_input_shape),
      BatchNormalization(),
      
      Conv2D(filters=2*self.n_filters, kernel_size=3),
      BatchNormalization(),

      Flatten(),
      Dense(self.n_outputs, activation=None),

    ])
    return model

  def build_decoder(self):
    '''
    https://medium.com/apache-mxnet/transposed-convolutions-explained-with-ms-excel-52d13030c7e8
    https://stackoverflow.com/questions/53654310/what-is-the-difference-between-upsampling2d-and-conv2dtranspose-functions-in-ker
    '''

    Conv2DTranspose = functools.partial(tf.keras.layers.Conv2DTranspose, padding='same', activation='relu', strides=2)
    BatchNormalization = tf.keras.layers.BatchNormalization
    Flatten = tf.keras.layers.Flatten
    Dense = functools.partial(tf.keras.layers.Dense, activation='relu')
    Reshape = tf.keras.layers.Reshape

    # Build the decoder network using the Sequential API
    model = tf.keras.Sequential([

      Dense(7*7*2*self.n_filters,input_shape=(self.n_outputs,)),  
      Reshape(target_shape=(7, 7, 2*self.n_filters)),

      # Upscaling convolutions (inverse of encoder)
      Conv2DTranspose(filters=2*self.n_filters, kernel_size=3),
      Conv2DTranspose(filters=1*self.n_filters, kernel_size=3),
      Conv2DTranspose(filters=1, kernel_size=3, strides=1),
    ])

    return model

  def call(self, x):

    latent_space = self.encoder(x)
    img_recon = self.decoder(latent_space)

    return img_recon

### Defining the loss function:

In [0]:
def loss_function(img_recon, img_without_noise):
    
    loss = tf.math.reduce_mean(tf.square(img_recon - img_without_noise),axis=(1,2))

    return loss

### Defining the optimizer:

In [0]:
def create_optimizer(learning_rate):

  return tf.keras.optimizers.Adam(learning_rate) # define our optimizer

### Defining the trainning step:

In [0]:
@tf.function
def train_step(autoencoder, img_with_noise , img_without_noise, optimizer):

  with tf.GradientTape() as tape:
        
    # Feed images into autoencoder
    img_recon = autoencoder(img_with_noise)
        
    # Calc the loss
    loss = loss_function(img_recon, img_without_noise)

    ### Backpropagation ###
    # Get the gradients
    grads = tape.gradient(loss, autoencoder.trainable_variables)

    # Update the weights
    optimizer.apply_gradients(zip(grads, autoencoder.trainable_variables))

  return loss

### Defining the trainning loop:

In [0]:
def train_model(autoencoder, dataset, num_epochs, batch_size, learning_rate):

  optimizer = create_optimizer(learning_rate)

  train_loss_history = []
  validation_loss_history = []

  metrics_names = ['train_loss','val_loss'] 
  
  # Loop on each epoch
  for epoch in range(num_epochs):

    print("\nepoch {}/{}".format(epoch+1,num_epochs))

    progBar = tf.keras.utils.Progbar(MNIST.train_dataset_shape[0], stateful_metrics=metrics_names)

    # Loop on each batch of train dataset
    for idX, (batch_x, batch_y) in enumerate(MNIST.train_dataset.batch(batch_size)): 
        
      # Train the model
      train_loss = train_step(autoencoder, batch_x, batch_y, optimizer)

      values=[('train_loss',train_loss)]

      progBar.update(idX*batch_size, values=values) 

      train_loss_history.append(tf.math.reduce_mean(train_loss))


    # Loop on each batch of test dataset for validation
    for batch_x, batch_y in MNIST.test_dataset.batch(batch_size):

      # Foward image through the network
      img_recon = autoencoder(batch_x)

      # Calc the loss
      val_loss = loss_function(img_recon, batch_y)

      validation_loss_history.append(tf.math.reduce_mean(val_loss))


    # Update progBar with val_loss
    values=[('train_loss',train_loss),('val_loss',val_loss)]

    progBar.update(MNIST.train_dataset_shape[0], values=values, finalize=True)


    # Shuffle train dataset for next epoch
    MNIST.train_dataset = MNIST.shuffle_dataset(MNIST.train_dataset_shape[0])
  
  train_loss = [train_loss_history[x].numpy() for x in range(0,len(train_loss_history),len(train_loss_history)//num_epochs)]
  val_loss = [validation_loss_history[x].numpy() for x in range(0,len(validation_loss_history),len(validation_loss_history)//num_epochs)]
  
  return train_loss, val_loss

-----------------------------------------------------------------------
## %% Main code %%

In [0]:
# Create MNIST helper class
MNIST = MNIST_Helper()

# Download the dataset
MNIST.download()

# Pre processing
MNIST.pre_process()

# Display some dataset information
MNIST.print_information()

# Noise injection
MNIST.noise_injection()

# Create TF Dataset object 
MNIST.create_dataset_iterable()


In [0]:
### Examining the MNIST training dataset ###

idx_train_img = 24010 #@param {type:"slider", min:0, max:59999, step:1}

plt.figure(figsize=(5,5))
plt.subplot(1, 2, 1)
plt.imshow(np.squeeze(MNIST.x_train[idx_train_img]),'gray')
plt.title(np.squeeze(MNIST.y_train[idx_train_img])); plt.grid(False)

plt.subplot(1, 2, 2)
plt.imshow(np.squeeze(MNIST.x_train_noise[idx_train_img]),'gray')
plt.title(np.squeeze(MNIST.y_train[idx_train_img])); plt.grid(False)

In [0]:
autoencoder = autoencoder_model()
#autoencoder.encoder.summary()
#autoencoder.decoder.summary()

In [0]:
# Training hyperparameters
num_epochs = 15
batch_size = 20
learning_rate = 5e-4

# Train the model 
train_loss, val_loss = train_model(autoencoder, MNIST, num_epochs, batch_size, learning_rate)

In [0]:
# Plot loss results
N = np.arange(0, num_epochs)
plt.figure()
plt.plot(N, train_loss, label="train_loss")
plt.plot(N, val_loss, label="val_loss")
plt.title("Results")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="upper right")

In [0]:
idx_test_img = 4717 #@param {type:"slider", min:0, max:9999, step:1}

img_test = MNIST.x_test_noise[idx_test_img:idx_test_img+5]

img_recon = autoencoder(img_test)

plt.figure(figsize=(10,10))
plt.imshow(np.hstack(np.squeeze(MNIST.x_test[idx_test_img:idx_test_img+5])),'gray')
plt.title("Original", fontsize=15)

plt.figure(figsize=(10,10))
plt.imshow(np.hstack(np.squeeze(img_test)),'gray')
plt.title("Noise images", fontsize=15)

plt.figure(figsize=(10,10))
plt.imshow(np.hstack(np.squeeze(img_recon)),'gray')
plt.title("Reconstructed", fontsize=15)


In [0]:
#autoencoder.compile(loss="mse", optimizer=tf.keras.optimizers.Adam(learning_rate))

# train the convolutional autoencoder
#H = autoencoder.fit(MNIST.train_dataset.batch(batch_size),
#                    validation_data=MNIST.test_dataset.batch(batch_size),
#										epochs=num_epochs)

#N = np.arange(0, num_epochs)
#plt.style.use("ggplot")
#plt.figure()
#plt.plot(N, H.history["loss"], label="train_loss")
#plt.plot(N, H.history["val_loss"], label="val_loss")
#plt.title("Training Loss and Accuracy")
#plt.xlabel("Epoch #")
#plt.ylabel("Loss/Accuracy")
#plt.legend(loc="lower left")