## Import libraries

In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import seaborn as sns
import keras
import tensorflow as tf
import zipfile
import requests

# For Image Processing
import matplotlib.pyplot as plt
import glob
from PIL import Image 
from matplotlib.image import imread

# For Model Building
from tensorflow.keras.models import Model
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense,Flatten,Reshape,Conv2D,MaxPooling2D,Input,UpSampling2D,Lambda,BatchNormalization,Dropout,Conv2DTranspose
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.optimizers import SGD,Adam
from skimage.measure import compare_ssim as ssim
from keras import backend as K
from sklearn.model_selection import train_test_split

# For DeOldify
import io
import base64

In [None]:
# Check if GPU is active and the GPU's model
# !nvidia-smi

## Load and process images

In [None]:
# Mount your google drive where the zipped images files are located
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Ensuring that all zipped files are available
import os
for dirname, _, filenames in os.walk('/content/drive/My Drive/ML_Project/cracks'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# path to zipped & working directories
path_zip = '/content/drive/My Drive/ML_Project/cracks/'
path = '/content/'

In [None]:
# Unzipping all necessary files
!unzip '/content/drive/My Drive/ML_Project/cracks/train7.zip'
!unzip '/content/drive/My Drive/ML_Project/cracks/label7.zip'

!unzip '/content/drive/My Drive/ML_Project/cracks/train8.zip'
!unzip '/content/drive/My Drive/ML_Project/cracks/label8.zip'

!unzip '/content/drive/My Drive/ML_Project/cracks/train9.zip'
!unzip '/content/drive/My Drive/ML_Project/cracks/label9.zip'

!unzip '/content/drive/My Drive/ML_Project/cracks/train10.zip'
!unzip '/content/drive/My Drive/ML_Project/cracks/label10.zip'

print("Unzipping completed")

In [None]:
# Convert training images into numpy array values
train_path7 = "/content/train7/*"
train_path8 = "/content/train8/*"
train_path9 = "/content/train9/*"
train_path10 = "/content/train10/*"

train_images = []


for f in sorted(glob.iglob(train_path7))[:2500]:
    train_images.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(train_path8))[:2500]:
    train_images.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(train_path9))[:2500]:
    train_images.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(train_path10))[:2500]:
    train_images.append(np.asarray(Image.open(f)) )

train_images = np.array(train_images)

In [None]:
# Convert label images into numpy array values
label_path7 = "/content/label7/*"
label_path8 = "/content/label8/*"
label_path9 = "/content/label9/*"
label_path10 = "/content/label10/*"

train_labels = []

for f in sorted(glob.iglob(label_path7))[:2500]:
    train_labels.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(label_path8))[:2500]:
    train_labels.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(label_path9))[:2500]:
    train_labels.append(np.asarray(Image.open(f)) )

for f in sorted(glob.iglob(label_path10))[:2500]:
    train_labels.append(np.asarray(Image.open(f)) )

train_labels = np.array(train_labels)

## Train Test Split

In [None]:
x_train, x_test, y_train, y_test = train_test_split(train_images, train_labels, test_size=0.25, random_state=42, shuffle=True)

## Custom botCallback

In [None]:
# Input Personal Access Token for botCallback to be active

# access_token = 

class botCallback(tf.keras.callbacks.Callback):
    def __init__(self,access_token):
        self.access_token = access_token
        self.ping_url = 'https://api.telegram.org/bot'+str(self.access_token)+'/getUpdates'
        self.response = requests.get(self.ping_url).json()
        self.chat_id = 161225240


    def send_message(self,message):
        self.ping_url = 'https://api.telegram.org/bot'+str(self.access_token)+'/sendMessage?'+\
                        'chat_id='+str(self.chat_id)+\
                        '&parse_mode=Markdown'+\
                        '&text='+message
        self.response = requests.get(self.ping_url)

    def send_model(self, filepath):
            chat_id = 161225240
            bot_api = '1389473435:AAFFxnOdJe83bqfDI15EZ_rjrTQiDfKtkcI'
            file = {'document': open(filepath, 'rb')}
            teleurl = f"https://api.telegram.org/bot{bot_api}/sendDocument?chat_id={chat_id}"
            requests.post(teleurl, files = file)
        
    def send_photo(self,filepath):
        file_ = open(filepath,'rb')
        file_dict = {'photo':file_}
        self.ping_url = 'https://api.telegram.org/bot'+str(self.access_token)+'/sendPhoto?'+\
                    'chat_id='+str(self.chat_id)
        self.response = requests.post(self.ping_url,files = file_dict)
        file_.close()

    def on_train_batch_begin(self, batch, logs=None):
        pass
    
    def on_train_batch_end(self, batch, logs=None):
#         message = ' Iteration/Batch {}\n Training Accuracy : {:7.2f}\n Training Loss : {:7.2f}\n'.format(batch,logs['accuracy'],logs['mae'])
#         self.send_message(message)
        pass

    def on_test_batch_begin(self, batch, logs=None):
        pass
    
    def on_test_batch_end(self, batch, logs=None):
#         message = ' Iteration/Batch {}\n Training Accuracy : {:7.2f}\n Training Loss : {:7.2f}\n'.format(batch,logs['accuracy'],logs['mae'])
#         self.send_message(message)
        pass

    def on_epoch_begin(self, epoch, logs=None):
        pass

    def on_epoch_end(self, epoch, logs=None):
        if epoch%10==0:
            filepath = f'mse_{epoch}.h5'
            self.model.save(filepath)
            self.send_model(filepath)
            Y_test = self.model.predict(x_test, batch_size=16)
            plt.figure(figsize=(15,25))
            for i in range(0,8,2):
                plt.subplot(4,2,i+1)
                plt.xticks([])
                plt.yticks([])
                plt.imshow(x_test[i*2][:,:,0], cmap='gray')
                plt.title('Noisy image MSE')

                plt.subplot(4,2,i+2)
                plt.xticks([])
                plt.yticks([])
                plt.imshow(Y_test[i*2][:,:,0], cmap='gray')
                plt.title('Denoised by autoencoder MSE')

            plt.savefig('hi.png')
            self.send_photo('./hi.png')

In [None]:
# Uncomment and run this only if access_token variable is input from previous line
# bot_callback = botCallback(access_token)

## Custom metrics

In [None]:
# Custom Metrics
def ssim_loss(y_true,y_pred):
    return tf.image.ssim(y_true,y_pred,max_val=255)

def PSNR(y_true,y_pred):
    return tf.image.psnr(y_true,y_pred,max_val=255)

## Model architecture

In [None]:
input_layer = Input(shape=(250,400,2))

# Encoding
encoder = Sequential()
encoder.add(input_layer)
encoder.add(Conv2D(16,(5,5),activation='relu',padding='same'))
encoder.add(Conv2D(32,(5,5),activation='relu',padding='same'))
encoder.add(Conv2D(64,(5,5),activation='relu',padding='same'))

encoder.add(Conv2D(128,(5,5),activation='relu',padding='same'))

encoder.add(BatchNormalization())
encoder.add(MaxPooling2D((5,5),padding='same'))

encoder.add(Dropout(0.5))

# Decoding
decoder = Sequential()

decoder.add(Conv2D(128,(5,5),input_shape=[50,80,128],activation='relu',padding='same'))
decoder.add(Conv2D(64,(5,5),activation='relu',padding='same'))
decoder.add(Conv2D(32,(5,5),activation='relu',padding='same'))
decoder.add(Conv2D(16,(5,5),activation='relu',padding='same'))
encoder.add(BatchNormalization())
decoder.add(UpSampling2D((5,5)))

decoder.add(Conv2D(2,(5,5),activation='relu',padding='same'))

# Merge both Encoding and Decoding models into 1
noise_remover = Sequential([encoder,decoder])


# ALL CALLBACKS
learning_rate_reduction = ReduceLROnPlateau(monitor='val_loss', 
                                            patience=10, 
                                            verbose=1, 
                                            factor=0.5, 
                                            min_lr=0.000000003)

# Compiling Model
noise_remover.compile(loss='mean_squared_error',
                    optimizer=Adam(lr=0.000003),
                    metrics=[PSNR,ssim_loss,'mae'])


# Uncomment the codes below to train and save a new Autoencoder model

# Fit Model
# noise_remover.fit(x_train,y_train,epochs=500,validation_data=(x_test,y_test),
#                 batch_size=16,verbose=1,callbacks=[learning_rate_reduction,bot_callback])

# Saving the model
# noise_remover.save('/content/drive/My Drive/final_mse_weights.h5')

In [None]:
# Uncomment and run this only if access_token variable is input from previous line

# def send_message(message):
#     chat_id = 161225240
#     bot_api = '1389473435:AAFFxnOdJe83bqfDI15EZ_rjrTQiDfKtkcI'
#     teleurl = f"https://api.telegram.org/bot{bot_api}/sendMessage?chat_id={chat_id}&text={message}"
#     print('Message Sent!')
#     requests.get(teleurl)
# send_message('Finished MSE Model Training!')

## Model's metric evaluation

In [None]:
metrics = pd.DataFrame(noise_remover.history.history)

In [None]:
metrics[['mae','val_mae']].plot()

In [None]:
metrics[['loss','val_loss']].plot()

In [None]:
metrics[['PSNR','val_PSNR']].plot()

In [None]:
metrics[['ssim_loss','val_ssim_loss']].plot()

In [None]:
metrics.tail(10)

## Model's visual evaluation

In [None]:
# Print Noisey Images vs. Denoised Images for Visual Evaluation
noisey_images = x_test[:150:5]

In [None]:
noisey_images.shape

In [None]:
denoised = noise_remover(noisey_images)
denoised.shape

In [None]:
for i in range(0,30):
    plt.imshow(noisey_images[i][:,:,0],cmap='gray')
    plt.show()
    plt.imshow(denoised[i][:,:,0],cmap='gray')
    plt.show()

## Load model weights for testing


For test purposes, run the following:

1.   Import libraries
2.   Custom metrics
3.   Model architecture

**After running the above 3 codes, ensure the filepaths for .h5 file and test images are in the correct directory before proceeding. You may change the directory path if needed.**

Proceed with running the bottom codes linearly to evaluate test images, before and after denoising + colourization by DeOldify.

In [None]:
noise_remover.built = True
noise_remover.load_weights('/content/final_mse_weights.h5')

In [None]:
# View model summary
noise_remover.summary()

## Load test image

In [None]:
# Insert the directory of test image in img_path variable
img_path = '/content/test_img2.png'
test_image = Image.open(img_path).convert('LA')

In [None]:
new_size = (400,250)
test_image = test_image.resize(new_size)

In [None]:
test_image = np.asarray(test_image)

In [None]:
# test_image.shape

In [None]:
# View Test Image
plt.imshow(test_image[:,:,0], cmap='gray')

## Use model to denoise test image

In [None]:
test_image = test_image.reshape(1,250,400,2)

In [None]:
one_test = noise_remover.predict(test_image)

In [None]:
# one_test.shape

In [None]:
one_test = one_test.reshape(250,400,2)

In [None]:
# View Denoised Test Image for comparison
plt.imshow(one_test[:,:,0], cmap='gray')

## Calling of DeOldify via api to colourize restored photographs for final evaluation

In [None]:
b, d = io.BytesIO(), io.BytesIO()

In [None]:
result = one_test.astype('uint8')
result = Image.fromarray(result[:,:,0])
result.save(b, 'PNG')

r = requests.post(
    "https://api.deepai.org/api/colorizer",
    files={
        'image': base64.encodebytes(b.getvalue()).decode('ascii'),
    },
    headers={'api-key': '99173fcd-6b66-4592-a182-d23ff23b0787'} # replace this with your api keys!
)
url = r.json()['output_url']
response = requests.get(url)
img = Image.open(io.BytesIO(response.content))
# Save denoised test image
img.save('/content/denoised_test_img.png')
# Visual evaluation of denoised test image with colour on notebook
img