In [1]:
# Importing the used packages
## Numpy for arrays

import numpy as np
## Pyplot from Matplotlib for vizualising the test results in plots
import matplotlib.pyplot as plt
%matplotlib inline

## Drive from Google.Colab because I was storing my database in google drive
from google.colab import drive

import tensorflow as tf

import keras
from keras import backend as k
## Keras.* for building my fully connected dense neural network 
from keras.models import  save_model, load_model, Model
from keras.optimizers import Adadelta

## keras.callbacks.* for Earlystopping (modelcheckpoint is needed to load back the weights)
from keras.callbacks import Callback
from keras.layers.convolutional import Conv2D
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.activations import relu as Relu
from keras.activations import sigmoid as Sigmoid
from keras.layers import UpSampling2D, Input, concatenate, BatchNormalization
from keras.utils.generic_utils import get_custom_objects
## Mean_squared_error from sklearn.metrics for calculating mean squared error
from sklearn.metrics import mean_squared_error

import skimage.color as skcolor


from PIL import Image
## Math for Sqrt
import math
import h5py
import os
import zipfile
import requests
import datetime
import scipy
import random

from matplotlib.colors import LinearSegmentedColormap
from timeit import default_timer as timer

Using TensorFlow backend.


In [2]:
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
# Function padding the int to the given size
## length:           int, the length of
## return value:     String, %0(length)d
def formatint(length):
  return str('%0' + str('%d'% length)+'d')

In [0]:
#Function for making folder if it does not exist in the given path.
#@ Used packages:
  #@ os
## path:     String, filepath needed to be verified
def MakePath(path):
  #Creating folder if it does not exist.
  if not os.path.exists(path):
    os.makedirs(path)

In [0]:
#Function for printing text and the time
#@ Used packages:
  #@ datetime
## string:        String, the printed text
## return value:  datetime, the printings time
def PrintTime(string):
    time = datetime.datetime.now()
    print(string+ str(time));
    return time

In [0]:
# Class for visualizing
#@ Used packages:
  #@ numpy as np
  #@ os
  #@ from PIL Image
  #@ skimage.Color as skcolor
class Visualization():
    # Function for converting RGB image array to grayscale image array.
    ## imageArray:               np.array, (None,None,3) RGB
    ## return value:             np.array, Grayscaled image
    def img_grayscale(self, imageArray):
        imgArr = np.empty([1])
        for image in imageArray:
            pil_imgray = image.convert('LA')
            img        = np.array(list(pil_imgray.getdata(band = 0)), int)
            img.shape  = (pil_imgray.size[1], pil_imgray.size[0])
            imgArr     = np.append(imgArr, img)
        imgArr = np.delete(imgArr,0)
        imgArr.shape = (len(imageArray),pil_imgray.size[1], pil_imgray.size[0])
        return imgArr;
      
    # Function for making visualization for the images.
    ## transformed_dataset_path:   String, folder path of transformed dataset
    ## width:                      int, number of images on one edge
    ## size:                       int, size of 1:1 ratio image
    def image_mosaic(self, transformed_dataset_path, width, size):

        ## Making a list of filenames of the dataset directory.
        filename_list = os.listdir(transformed_dataset_path)
        ## Creating an empty list for the loaded images.
        imagearray = []
        ## Loading the first width*width number of images, and appending them to the list.
        for i in range(width*width):
            im = Image.open(transformed_dataset_path + filename_list[i])
            imagearray.append(im)

        ## Initializing canvas.
        canvas = np.ones((size * width, size * width * 2,3));
        ## Resizing images.
        for i in range(len(imagearray)):
            imagearray[i]=imagearray[i].resize((size, size),Image.ANTIALIAS)
        ## Making GrayImages.
        grayimage = self.img_grayscale(imagearray)
        ## Writing the RGB images to the canvas right side.
        for i in range(width):
            for j in range(width):
                canvas[i * size:(i + 1) * size,j * size:(j + 1) * size, 0::] = np.array(imagearray[i + width * j])
        ## Writing grayscale images to the canvas left side.
        for i in range(width):
            for j in range(width, 2 * width):
                canvas[i * size:(i + 1) * size, j * size:(j + 1) * size, 0]=np.array(grayimage[i + width * (j - width)])
                canvas[i * size:(i + 1) * size, j * size:(j + 1) * size, 1]=np.array(grayimage[i + width * (j - width)])
                canvas[i * size:(i + 1) * size, j * size:(j + 1) * size, 2]=np.array(grayimage[i + width * (j - width)])

        ## Displaying the mosaic.
        canvas = canvas.astype(np.uint8);
        mosaic = Image.fromarray(canvas)
        mosaic.show()
        
    # Function
    ## image:          np.array, (None, None,3) LAB image
    ## idim:           integer, which dimension needed to be returned
    ## return value:   np.array, (None,None,3) RGB image
    def extract_single_dim_from_LAB_convert_to_RGB(self, image, idim):
        ## Initializing z with input image shape
        z = np.zeros(image.shape)
        ## Need brightness to plot the image along 1st or 2nd axis
        if idim != 0 :
            z[:, :, 0] = 60     
        z[:, :, idim] = image[:, :, idim]
        z = skcolor.lab2rgb(z)
        return(z)
      
    # Function
    ## image:          np.array, (None, None,3) LAB image
    ## return value:   np.array, (None,None,3) RGB image
    def extract_AB_from_LAB_convert_to_RGB(self, image):
        ## Initializing z with input image shape
        z = np.zeros(image.shape)
        ## Need brightness to plot the image along 1st or 2nd axis
        z[:, :, 0] = 60
        z[:, :, 1::] = image[:, :, 1::]
        z = skcolor.lab2rgb(z)
        return(z)
      
    # Function for visualizing LAB colorspace shows Lightness (grayscale), A, B, AB, LAB
    ## lab:     np.array, (None, None, 3) LAB image
    def LABColorVisualize(self, lab):
        ## Initializing the plot
        fig = plt.figure(figsize=(13,5))
        count = 1 
        
        ax = fig.add_subplot(1,5,count)        
        ## Converting Lightness to RGB
        lab_rgb_gray = self.extract_single_dim_from_LAB_convert_to_RGB(lab,0) 
        ax.imshow(lab_rgb_gray); ax.axis("off")
        ax.set_title("L: lightness")
        count += 1
        
        ax = fig.add_subplot(1,5,count)
        ## Converting A to RGB
        lab_rgb_a = self.extract_single_dim_from_LAB_convert_to_RGB(lab,1) 
        ax.imshow(lab_rgb_a); ax.axis("off")
        ax.set_title("A: color spectrums green to red")
        count += 1

        ax = fig.add_subplot(1,5,count)
        ## Converting B to RGB
        lab_rgb_b = self.extract_single_dim_from_LAB_convert_to_RGB(lab,2) 
        ax.imshow(lab_rgb_b); ax.axis("off")
        ax.set_title("B: color spectrums blue to yellow")
        count += 1

        ax = fig.add_subplot(1,5,count)
        ## Converting AB to RGB
        lab_rgb_ab = self.extract_AB_from_LAB_convert_to_RGB(lab) 
        ax.imshow(lab_rgb_ab); ax.axis("off")
        ax.set_title("AB")
        count += 1
        
        ax = fig.add_subplot(1,5,count)
        ## Converting LAB image to RGB
        lab_rgb_lab = skcolor.lab2rgb(lab)
        ax.imshow(lab_rgb_lab); ax.axis("off")
        ax.set_title("LAB")
        count += 1
        plt.show()

In [0]:
# Function for making the U-net model
## input_size:      integer tuple, default value (128, 128, 1)
## return value:    keras.model the u-net model.
#@ Used packages:
  #@ Conv2D
  #@ Input
  #@ BatchNormalization
  #@ concatenate
  #@ Upsampling2D
  #@ Model
def unet(input_size = (128, 128, 1)):
    ## 2D convolutional layers parameters
    Conv2D_param = {
        'kernel_size'       : (3, 3),
        'activation'        : 'relu',
        'padding'           : 'same',
        'kernel_initializer': 'he_normal'
    }
    '''
    We used the default input_size values for represantating the network layers size.
    '''
  ## Encoder
    ## Defining inputsize
    inputs = Input(input_size)
    
    ## Convolutional layer with BatchNormalization (128, 128, 1) -> (64, 64, 64)
    conv1 = Conv2D(64, strides = 2, **Conv2D_param)(inputs)
    conv1 = BatchNormalization(axis = 3, momentum = 0.9)(conv1)
    
    ## Convolutional layer with BatchNormalization (64, 64, 64) -> (64, 64, 128)
    conv2 = Conv2D(128, **Conv2D_param)(conv1)
    conv2 = BatchNormalization(axis = 3, momentum = 0.9)(conv2)
    
    ## Convolutional layer with BatchNormalization (64, 64, 64) -> (32, 32, 128)
    conv3 = Conv2D(128, strides = 2, **Conv2D_param)(conv2)
    conv3 = BatchNormalization(axis = 3, momentum = 0.9)(conv3)
    
    ## Convolutional layer with BatchNormalization (32, 32, 128) -> (32, 32, 256)
    conv4 = Conv2D(256, **Conv2D_param)(conv3)
    conv4 = BatchNormalization(axis = 3, momentum = 0.9)(conv4)
    
    ## Convolutional layer with BatchNormalization (32, 32, 256) -> (16, 16, 256)
    conv5 = Conv2D(256, strides = 2, **Conv2D_param)(conv4)
    conv5 = BatchNormalization(axis = 3, momentum = 0.9)(conv5)
    
  ## Bottleneck
    ## Convolutional layer with BatchNormalization (16, 16, 256) -> (16, 16, 512)
    conv6 = Conv2D(512, **Conv2D_param)(conv5)
    conv6 = BatchNormalization(axis = 3, momentum = 0.9)(conv6)
    
  ## Decoder
    ## Concatenate two layers (conv5, conv6) (16, 16, 256 + 512)
    merge7 = concatenate([conv5,conv6], axis = 3)
    
    ## UpSampling (32, 32, 768)
    up7    = UpSampling2D(size = (2,2))(merge7)
    
    ## Convolutional layer with BatchNormalization (32, 32, 768) -> (32, 32, 256)
    conv7  = Conv2D(256, **Conv2D_param)(up7)
    conv7  = BatchNormalization(axis = 3, momentum = 0.9)(conv7)
    
    ## Concatenate two layers (conv4, conv7) (32, 32, 256 + 256)
    merge8 = concatenate([conv4,conv7], axis = 3)
    
    ## Convolutional layer with BatchNormalization (32, 32, 512) -> (32, 32, 256)
    conv8  = Conv2D(256, **Conv2D_param)(merge8)
    conv8  = BatchNormalization(axis = 3, momentum = 0.9)(conv8)
    
    ## Concatenate two layers (conv3, conv8) (32, 32, 128 + 256)
    merge9 = concatenate([conv3,conv8], axis = 3)
    
    ## UpSampling (64, 64, 384)
    up9    = UpSampling2D(size = (2,2))(merge9)   
    
    ## Convolutional layer with BatchNormalization (64, 64, 384) -> (64, 64, 128)
    conv9  = Conv2D(128, **Conv2D_param)(up9)
    conv9  = BatchNormalization(axis = 3, momentum = 0.9)(conv9)
    
    ## Concatenate two layers (conv2, conv9) (64, 64, 128 + 128)
    merge10 = concatenate([conv2,conv9], axis = 3)
    
    ## Convolutional layer with BatchNormalization (64, 64, 256) -> (64, 64, 128)
    conv10  = Conv2D(128, **Conv2D_param)(merge10)
    conv10  = BatchNormalization(axis = 3, momentum = 0.9)(conv10)
    
    ## Concatenate two layers (conv1, conv10) (64, 64, 64 + 128)
    merge11 = concatenate([conv1,conv10], axis = 3)
    
    ## UpSampling (128, 128, 192)
    up11    = UpSampling2D(size = (2,2))(merge11)
    
    ## Convolutional layer with BatchNormalization (128, 128, 192) -> (128, 128, 64)
    conv11  = Conv2D(64, **Conv2D_param)(up11)
    conv11  = BatchNormalization(axis = 3, momentum = 0.9)(conv11)
    
    ## Convolutional layer with BatchNormalization (128, 128, 64) -> (128, 128, 32)
    conv12 = Conv2D(32, **Conv2D_param)(conv11)
    conv12 = BatchNormalization(axis = 3, momentum = 0.9)(conv12)
    
    ## Convolutional layer with BatchNormalization and sigmoid acitvation (128, 128, 32) -> (128, 128, 2)
    conv13 = Conv2D(2, kernel_size = (3,3),padding = 'same', activation = 'sigmoid')(conv12)
    
    ## Build model
    model = Model(inputs = inputs, outputs = conv13)
    
    ## Return model
    return model

In [0]:
# Function for PSNR loss 
def PSNRLoss(y_true, y_pred):
    """
    PSNR is Peek Signal to Noise Ratio, which is similar to mean squared error.
    It does not have a max value so we set our loss to 1/PSNR this way our loss is 0 when PSNR is infinity
    """
    return 1/tf.image.psnr(y_true, y_pred,1)
# Saving PSNRLoss so we can load our model
get_custom_objects().update({"PSNRLoss": PSNRLoss})

# Function for SSIM loss
def ssim(y_true,y_pred):
    ''' 
    SSIM is structural similarity index. Max value is 1 when to to inputid image is the same.
    So for our loss we make 1-ssim. This way our loss is 0 when the images are the same
    '''
    return 1-tf.image.ssim(y_pred,y_true,1)
get_custom_objects().update({"ssim": ssim})

In [0]:
# Function for loading model
## path:          string, the file path where the model is saved
## model_name:    string, the name of the model
## return value:  
def loadModel(path, model_name, model_number = None ):
    ## Setting model_name uppercase
    model_name = model_name.upper() 
    ## Loading previous models
    previous_models = [x for x in os.listdir(path) if model_name in x]
    
    if model_number != None:
      if len(previous_models) >= model_number:
        ## Print feedback
        print('Loading model:', previous_models[model_number])
        ## Return with the loaded model
        return load_model(path + previous_models[model_number])
      else:
        ## Print feedback
        print('Warning: There is no previous models with this name: ' + str(model_name) + ' or with this numner: ' + str(model_number))       
    else:
      ## Does not have previous models
      if len(previous_models) == 0: 
          
          ## Print feedback
          print('Warning: There is no previous models with this name:', model_name)
          
      ## Does have previous models
      else: 
          ## Print feedback
          print('Loading model:', previous_models[-1])
          ## return with the loaded model
          return load_model(path + previous_models[-1])

In [0]:
# Function for iterating through the database
## data_h5df:           h5df, the dataset
## batch_size:          integer, the batch size
## random_seed:         integer, seed for initializing random
## return value:        (X_batch, Y_batch) Scaled input and output for the network (batch_size, 128, 128, 1) (batch_size, 128, 128, 2)
#@ Used Packages:
  #@ numpy as np
def DataGenerator(data_h5df, batch_size = 64, random_seed = 1):
  ## Array of indices
  indices = list(data_h5df.keys())
  ## Infiniti generator
  while True:
        ## Setting random seed
        np.random.seed(random_seed) 
        ## Shuffle indices before evry epoch
        np.random.shuffle(indices)
        ## Iterating through all the elements
        for i in range(0, len(indices) - batch_size + 1, batch_size):
            ## Batch indices , batch_size the size
            batch_indices = indices[i:i + batch_size]
            ## Placeholder for the LAB image data
            lab_batch = np.empty((batch_size,*np.array(val[batch_indices[0]]).shape))
            ## Saving the batch number of image data
            for iterator in range(batch_size):
              lab_batch[iterator] = np.array(data_h5df[batch_indices[iterator]]).astype('float32')
            ## Scaling inputs /100
            X_batch = ((lab_batch[:,:,:,0:1])/100.0).astype('float32')
            ## Scaling outputs +128 / 255
            Y_batch = ((lab_batch[:,:,:,1:3] + 128.0 ) / 255.0).astype('float32')
            ## Yielding batch
            yield (X_batch,Y_batch)

In [0]:
# Class for a custom Callback
#@ Used Packages:
  #@ os
  #@ numpy as np
#@ Used functions:
  #@ MakePath
  #@ fomatint
class WeightsSaver(Callback):
    ## Initializing the class
    def __init__(self, model_name,loss_name, save_path, preds_array, save_preds_per_n_batch):
        ## Epoch, batch counter
        self.epoch = 0
        self.batch = 0
        ## Model, loss name set to uppercase
        self.model_name = model_name.upper()
        self.loss_name = loss_name.upper()
        ## Storing 
        self.N = save_preds_per_n_batch
        self.path = save_path+'/'+model_name.upper() +'/'+ loss_name.upper()
        self.model_path =  self.path + '/model/'
        self.pred_path =  self.path + '/images/'
        self.preds = preds_array
        ## Making path for models and images
        MakePath(save_path + '/' + model_name.upper() + '/')
        MakePath(self.path)
        MakePath(self.model_path)
        MakePath(self.pred_path)
        
        ## Setting previous model name
        previous_models = [n for n in os.listdir(self.model_path) if self.model_name in n]
        if len(previous_models) == 0:
          self.model_number = 0
        else:
          self.model_number = int(previous_models[-1][-10:-5])


    ## Function called when epoch ends, save the model 
    def on_epoch_end(self, epoch, logs={}):
        self.batch = 0
        self.epoch += 1
        self.model_number += 1
        
        name = (self.model_path + self.model_name + '_' + formatint(5) + '.h5df') % self.model_number
        save_model(self.model, name)
    ## Function called when batch end, if N-th batch save the images
    def on_batch_end(self, batch, logs={}):
        self.batch += 1 

        if self.batch % self.N == 0:
            y_pred = self.model.predict(self.preds[:,:,:,0].astype('float32').reshape(*(self.preds[:,:,:,0].shape),1)/100.0)*255-128
            for iterator in range(y_pred.shape[0]):
              name = self.path+'/images/pred%(pred_n)03d_image%(epoch)03d_%(batch)05d' % {'pred_n':iterator,'epoch':self.model_number,'batch':self.batch}
              image=np.empty((128,128,3))
              image[:,:,0]=self.preds[iterator,:,:,0]
              image[:,:,1::]=y_pred[iterator]
              image=skcolor.lab2rgb(image)
              im=Image.fromarray((image*255).astype('uint8'))
              im.save(name+'.jpg')    
        

In [0]:
## Loading dataset
data= h5py.File('/content/gdrive/My Drive/DeepLearning/images_hdf5/dataset.h5df', 'r') 
## Datasets
train = data['train']
val = data['val']
## Setting up generators
training_generator = DataGenerator(train,64)
validation_generator = DataGenerator(val,64)

In [0]:
## Setting up model
u_model = unet(input_size = (128,128,1))

In [0]:
test = data['test']
pred_test = np.empty((17,128,128,3))

for iterator in range(17):
  pred_test[iterator]=np.array(test[list(test.keys())[iterator]],dtype='float64')

mc = WeightsSaver('customunet1','PSNRLoss','/content/gdrive/My Drive/DeepLearning', pred_test,100)

model_params = {'generator': training_generator,
                'steps_per_epoch': len(list(data['train'].keys()))//64,
                'validation_steps':  len(list(data['val'].keys()))//64,
                'validation_data': validation_generator,
                'callbacks':[mc],
                'epochs': 20
               }
u_model_load = loadModel(mc.model_path,'customunet1')
if(u_model_load != None):
  u_model = u_model_load

u_model.compile(loss=PSNRLoss , metrics=['accuracy'],optimizer=Adadelta(lr=1, rho=0.95, epsilon=None, decay=0.0))
# Train model on dataset
u_model.fit_generator(**model_params)

In [0]:
pred_test = np.empty((17,128,128,3))

for iterator in range(17):
  pred_test[iterator]=np.array(test[list(test.keys())[iterator]],dtype='float64')
mc = WeightsSaver('customunet','ssim', '/content/gdrive/My Drive/DeepLearning', pred_test,100)
test = data['test']

model_params = {'generator': training_generator,
                'steps_per_epoch': len(list(data['train'].keys()))//64,
                'validation_steps':  len(list(data['val'].keys()))//64,
                'validation_data': validation_generator,
                'callbacks':[mc],
                'epochs': 6}
u_model_load = loadModel(mc.model_path,'customunet')
if(u_model_load != None):
  u_model = u_model_load

u_model.compile(loss=ssim , metrics=['accuracy'],optimizer=Adadelta(lr=1, rho=0.95, epsilon=None, decay=0.0))
# Train model on dataset
u_model.fit_generator(**model_params)


In [0]:
vi = Visualization()
test = data['test']
test_im = np.array(test[list(test.keys())[15]],dtype = 'float64')

preds = (u_model.predict(test_im[:,:,0].reshape(1, 128, 128, 1) / 100) * 255 - 128)
predicted_images          = np.empty((128, 128, 3))
predicted_images[:, :, 0]   = test_im[:, :, 0]
predicted_images[:, :, 1::] = preds
vi.LABColorVisualize(predicted_images)

real_images = np.empty((128,128,3))
real_images = test_im
vi.LABColorVisualize(real_images)