### Models Used
I have implemented three models for denoising the Cifar-10 dataset. These are: 
1. Convolutional Auto-encoders with Symmetric Skip Connections [[link]](https://arxiv.org/pdf/1606.08921.pdf)
2. WIN5, WIN5-R and WIN5-RB based on Wide inference Networks [[link]](https://arxiv.org/pdf/1707.05414.pdf)
3. RIDNet [[link](https://arxiv.org/pdf/1904.07396v2.pdf)]

### Noise Types
I have tested the model on 2 types of noises. These are: 
1. Gaussian Noise (mean=0, [var=0.01 and var=0.05])
2. Salt and Pepper Noise (s&p)

<b>Note: Please refer to the results document sent on sabyasachis@iisc.ac.in for the model results.** <br /> <br />
Note: All the models were trained using a scheduler. Running the model as per the hypermeters in the notebook may not give the same results as in the results** document sent on sabyasachis@iisc.ac.in. Please mail at abhinavnagpal12@gmail.com for the scheduler details.</b>


## Importing Libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import os
import skimage
import cv2
from skimage.metrics import mean_squared_error, peak_signal_noise_ratio, structural_similarity

import tensorflow as tf
from tensorflow.keras.datasets import cifar10
from tensorflow import keras
from tensorflow.keras import layers
from keras.utils import plot_model
from keras.layers import * #Conv2D, Input, Concatenate, Dense,Dropout, AveragePooling2D, GlobalAveragePooling3D, GlobalAveragePooling2D, GlobalAveragePooling1D, ReLU, MaxPool2D, UpSampling2D, Conv2DTranspose, BatchNormalization,add, Add, LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
from keras.regularizers import l2
%matplotlib inline

## Noise Function

In [2]:
def add_noise(data, mean, var, noise):
    sigma = var**0.5
    num,row,col,ch = data.shape

    if noise == "gaussian":
        normal = np.random.normal(mean,sigma,(num,row,col,ch))
        normal = normal.reshape(num,row,col,ch)
        noisy = data + normal
    elif noise == "s&p": 
        noisy = skimage.util.random_noise(data, mode=noise)
        
#     elif noise == "rand_black":
#         print(2)

#     elif noise == "rand_color":
#         print(3)
    

#     elif noise == "poisson":
#         mask = numpy.random.poisson(data)
#         noisy = data + mask
    
#     elif noise == "speckle":
#         normal = np.random.normal(mean,sigma,(num,row,col,ch))
#         normal = normal.reshape(num,row,col,ch)        
#         noisy = data + data*normal
    
    return noisy

## Display Noisy V/S Original

In [3]:
def show_images(noisy, original, n):
    plt.figure(figsize=(15,15))

    ids = np.random.randint(low = len(noisy), size = n)

    for i in range(len(ids)):
        plt.subplot(n, 2, (2*(i+1))-1)
        plt.imshow(noisy[ids[i]])
        plt.title('Noisy')
        plt.axis("off")

        plt.subplot(n, 2, 2*(i+1))
        plt.imshow(original[ids[i]])
        plt.title('Original')
        plt.axis("off")
        
    plt.show()

## Plot Loss Curve

In [4]:
def draw_plot(history):
    f = plt.figure(figsize=(7,5))
    f.add_subplot()
    plt.plot(history.epoch, history.history['loss'], label = "loss") # train loss
    plt.plot(history.epoch, history.history['val_loss'], label = "val_loss") # validation loss
    plt.title("Loss Curves")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.grid(alpha=0.5)
    plt.legend()
    plt.show()

## Display Noisy v/s Denoised samples

In [5]:
def display_denoised(x_test, y_test, model, num_images):
    col = 6
    row = int(num_images/col)
    cell_size = 1.5
    
    ids = np.random.randint(x_test.shape[0], size = num_images) 
    
    cifar_test = x_test[ids]
    cifar_original = y_test[ids]
    cifar_denoised = model.predict(cifar_test)
    
    f = plt.figure(figsize=(cell_size*col,cell_size*row*3))
    
    for i in range(row):
        
        for j in range(col): 
            f.add_subplot(row*3,col, (3*i*col)+(j+1)) 
            plt.imshow(cifar_original[i*col + j]) 
            plt.axis("off")
            plt.title('Original')
        
        for j in range(col): 
            f.add_subplot(row*3,col,((3*i+1)*col)+(j+1))
            plt.imshow(cifar_test[i*col + j]) 
            plt.axis("off")
            plt.title('Noisy')

        for j in range(col): 
            f.add_subplot(row*3,col,((3*i+2)*col)+(j+1))
            plt.imshow(cifar_denoised[i*col + j]) 
            plt.axis("off")
            plt.title('De-noised')


    f.suptitle("Results",fontsize=20)
    plt.show()

### Get PSNR and SSIM

In [6]:
def get_metric(model, x_test, y_test):
    results = model.predict(x_test) # predict
    y_test = y_test.astype('float32')
    s = 0
    p = 0
    for i in range(x_test.shape[0]):
        p+=peak_signal_noise_ratio(y_test[i], results[i])
        s+=structural_similarity(y_test[i], results[i],multichannel=True)
    
    return p/10000, s/10000

# Model 1: Convolutional Auto-encoders with Symmetric Skip Connections [[link]](https://arxiv.org/pdf/1606.08921.pdf)

In [7]:
class Encoder(layers.Layer):        
        
    def __init__(self, name="encoder", **kwargs):
        super(Encoder, self).__init__(name=name, **kwargs)
        self.conv_1 = Conv2D(32, 3, activation='relu', padding='same')
        self.bn1 = BatchNormalization()
        self.mp = MaxPool2D()
        self.dropout = Dropout(0.5)
        self.conv_2 = Conv2D(32, 3, padding='same')
        self.bn2 = BatchNormalization()
        self.lr = LeakyReLU()
        self.conv_3 = Conv2D(64, 3, activation='relu', padding='same')
        self.bn3 = BatchNormalization()
        
    def call(self, inputs):
        x = self.conv_1(inputs)
        x = self.bn1(x)
        x = self.mp(x)
        x = self.dropout(x)
        skip = self.conv_2(x)
        x = self.lr(skip)
        x = self.bn2(x)
        x = self.mp(x)
        x = self.dropout(x)
        x = self.conv_3(x)
        x = self.bn3(x)
        encoder_output = self.mp(x)
        return encoder_output, skip

class Decoder(layers.Layer):

    def __init__(self, name="decoder", **kwargs):
        super(Decoder, self).__init__(name=name, **kwargs)
        
        self.conv_t1 = Conv2DTranspose(64, 3,activation='relu',strides=(2,2), padding='same')
        self.bn1 = BatchNormalization()
        self.dropout = Dropout(0.5)
        self.conv_t2 = Conv2DTranspose(32, 3, activation='relu',strides=(2,2), padding='same')
        self.bn2 = BatchNormalization()
        self.conv_t3 = Conv2DTranspose(32, 3, padding='same')
        self.lr = LeakyReLU()
        self.conv_t4 = Conv2DTranspose(3, 3, activation='sigmoid',strides=(2,2), padding='same')
        self.bn3 = BatchNormalization()

    def call(self, inputs, skip):
        
        x = self.conv_t1(inputs)
        x = self.bn1(x)
        x = self.dropout(x)
        x = self.conv_t2(x)
        x = self.bn2(x)
        x = self.dropout(x)
        x = self.conv_t3(x)
        x = add([x,skip]) # adding skip connection
        x = self.lr(x)
        x = self.bn3(x)
        decoder_output = self.conv_t4(x)
        return decoder_output


class AutoEncoder(keras.Model):

    def __init__(self,original_dim,name="autoencoder",**kwargs):
        super(AutoEncoder, self).__init__(name=name, **kwargs)
        self.original_dim = original_dim
        self.encoder = Encoder()
        self.decoder = Decoder()

    def call(self, inputs):
        encoder_output, skip = self.encoder(inputs)
        decoder_output = self.decoder(encoder_output, skip)
        return decoder_output
    
    def model(self):
        x = Input(shape=(32,32,3))
        return Model(inputs=[x], outputs=self.call(x))

In [8]:
def main(show_sample = True):
    # Loading the dataset
    (x_train, _), (x_test, _) = cifar10.load_data()

    # Scale the images beteen 0 and 1
    x_train = x_train/255.0
    x_test = x_test/255.0

    # Real images are labels
    y_train = x_train
    y_test = x_test

    # Adding noise to the images
    noise = "gaussian"
    mean = 0
    var = 0.01

    x_train = add_noise(y_train, mean, var, noise)
    x_test = add_noise(y_test, mean, var, noise)

#   display noisy and original images
    num_img=5
    show_images(x_train, y_train, num_img)
    
    ae = AutoEncoder((32,32,3))
    optimizer = tf.keras.optimizers.Adam(learning_rate=0.00001)
    ae.compile(optimizer=Adam(lr=0.00001), loss='mae')
    return x_train, y_train, x_test, y_test, ae

In [None]:
x_train, y_train, x_test, y_test, ae = main() 
epochs = 65
batch_size = 256
        
history = ae.fit(x_train, y_train, validation_data = (x_test, y_test), shuffle=True, epochs=epochs, batch_size=batch_size)


In [None]:
draw_plot(history)
display_denoised(x_test, y_test, ae, num_images=12)

In [None]:
psnr, ssim = get_metric(ae, x_test, y_test)
print(psnr, ssim)

# Model 2: Wide Inference Network for Image Denoising via Learning Pixel-distribution Prior [[link]](https://arxiv.org/pdf/1707.05414.pdf)

### Type 1: WIN5

In [9]:
(x_train, _), (x_test, _) = cifar10.load_data()

# Scaling the images beteen 0 and 1
x_train = x_train/255.0
x_test = x_test/255.0

# Real images are labels
y_train = x_train
y_test = x_test

# Adding noise to the images
noise = "gaussian"
mean = 0
var = 0.01

x_train = add_noise(y_train, mean, var, noise)
x_test = add_noise(y_test, mean, var, noise)

X = Input((32,32,3))

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(X)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(3, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)

model = Model(inputs = X, outputs = x)
model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.001), loss = 'mae')
# model.summary()

Downloading data from https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz


In [None]:
epochs = 20
batch_size = 64
history = model.fit(x_train, y_train, validation_data = (x_test, y_test), epochs=epochs, batch_size=batch_size)

In [None]:
display_denoised(x_test, y_test, model, num_images=12)
draw_plot(history)

In [None]:
print(get_metric(model, x_test, y_test))

### Type 2: WIN5-R

In [None]:
(x_train, _), (x_test, _) = cifar10.load_data()

# Scaling the images beteen 0 and 1
x_train = x_train/255.0
x_test = x_test/255.0

# Real images are labels
y_train = x_train
y_test = x_test

# Adding noise to the images
noise = "gaussian"
mean = 0
var = 0.01

x_train = add_noise(y_train, mean, var, noise)
x_test = add_noise(y_test, mean, var, noise)

X = Input((32,32,3))

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(X)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = ReLU()(x)

x = Conv2D(3, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = Add()([X, x])

model = Model(inputs = X, outputs = x)
model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.001), loss = 'mae')
# model.summary()

In [None]:
epochs = 15
batch_size = 64
# model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.05), loss = 'mse')
history = model.fit(x_train, y_train, validation_data = (x_test, y_test), epochs=epochs, batch_size=batch_size)

In [None]:
display_denoised(x_test, y_test, model, num_images=12)
draw_plot(history)

In [None]:
print(get_metric(model, x_test, y_test))

### Type 3

In [None]:
(x_train, _), (x_test, _) = cifar10.load_data()

# Scaling the images beteen 0 and 1
x_train = x_train/255.0
x_test = x_test/255.0

# Real images are labels
y_train = x_train
y_test = x_test

# Adding noise to the images
noise = "gaussian"
mean = 0
var = 0.01

x_train = add_noise(y_train, mean, var, noise)
x_test = add_noise(y_test, mean, var, noise)

X = Input((32,32,3))

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(X)
x = BatchNormalization()(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = BatchNormalization()(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = BatchNormalization()(x)
x = ReLU()(x)

x = Conv2D(128, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = BatchNormalization()(x)
x = ReLU()(x)

x = Conv2D(3, 7, padding='same', kernel_regularizer=l2(1e-4), bias_regularizer=l2(1e-4))(x)
x = BatchNormalization()(x)
x = Add()([X, x])

model = Model(inputs = X, outputs = x)
model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.01), loss = 'mae')
# model.summary()

In [None]:
epochs = 15
batch_size = 64
# model.compile(optimizer = tf.keras.optimizers.SGD(learning_rate = 0.001), loss = 'mae')
history = model.fit(x_train, y_train, validation_data = (x_test, y_test), epochs=epochs, batch_size=batch_size)

In [None]:
display_denoised(x_test, y_test, model, num_images=12)
print(get_metric(model, x_test, y_test))
draw_plot(history)

# Model 3: RIDNet [[link](https://arxiv.org/pdf/1904.07396v2.pdf)]


In [None]:
def eam_block(x):
    
    # merge and run
    x1 = Conv2D(64, 3, padding='same')(x)
    x1 = ReLU()(x1)
    x1 = Conv2D(64, 3, padding='same', dilation_rate = 2)(x1)
    x1 = ReLU()(x1)

    x2 = Conv2D(64, 3, padding='same', dilation_rate = 3)(x)
    x2 = ReLU()(x2)
    x2 = Conv2D(64, 3, padding='same', dilation_rate = 4)(x2)
    x2 = ReLU()(x2)

    merge1 = Concatenate()([x1,x2])
    merge1 = Conv2D(64, 3, padding='same')(merge1)
    merge1 = ReLU()(merge1)
    merge1 = merge1+x

    # residual block
    r1  = Conv2D(64, 3, padding='same')(merge1)
    r1 = ReLU()(r1)
    r1 = Conv2D(64, 3, padding='same')(r1)
    r1 = ReLU()(r1+merge1)

    # e-residual block 
    er1  = Conv2D(64, 3, padding='same')(r1)
    er1 = ReLU()(er1)
    er1 = Conv2D(64, 3, padding='same')(er1)
    er1 = ReLU()(er1)
    er1  = Conv2D(64, 1, padding='valid')(er1)
    er1 = ReLU()(er1+r1)

    # ca-layer

    # part 1
    ca = AveragePooling2D(pool_size = (32,32), padding='valid', strides = 32//1)(er1) # Stride = (input_size//output_size)  Kernel size = input_size - (output_size-1)*stride  

    # basic block
    ca = Conv2D(4, 1, padding='valid')(ca)
    ca = ReLU()(ca)

    #basic block sig
    ca = Conv2D(64, 1, padding='valid', activation = 'sigmoid')(ca)
    z = Multiply()([er1, ca])
    
    return z

In [None]:
(x_train, _), (x_test, _) = cifar10.load_data()

# Scaling the images beteen 0 and 1
x_train = x_train/255.0
x_test = x_test/255.0

# Real images are labels
y_train = x_train
y_test = x_test

# Adding noise to the images
noise = "gaussian"
mean = 0
var = 0.01

x_train = add_noise(y_train, mean, var, noise)
x_test = add_noise(y_test, mean, var, noise)


X = Input((32,32,3))

# feature extraction
x = Conv2D(64, 3, padding='same')(X)
x = ReLU()(x)

# EAM-1
block1 = eam_block(x)

# EAM-2
block2 = eam_block(block1)

# EAM-3
block3 = eam_block(block2)

# EAM-4
block4 = eam_block(block3)

# tail 
tail = Conv2D(3, 3, padding='same')(block4)
out = tail + X

model = Model(inputs = X, outputs = out)
model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-5), loss = 'mae')
model.summary()

In [None]:
epochs = 20
batch_size = 64
#model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-5), loss = 'mae')
history = model.fit(x_train, y_train, validation_data = (x_test, y_test), epochs=epochs, batch_size=batch_size)

In [None]:
display_denoised(x_test, y_test, model, num_images=12)
print(get_metric(model, x_test, y_test))
draw_plot(history)