In [1]:
import os
import cv2
import keras
import math
import random
import pathlib
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from keras import backend as K
from keras.preprocessing import image
from keras.engine import Layer
from keras.layers import Conv2D, UpSampling2D, InputLayer, Conv2DTranspose, Input, Reshape, merge, concatenate, Activation, Dense, Dropout, Flatten, LeakyReLU
from keras.layers.normalization import BatchNormalization
from keras.callbacks import TensorBoard 
from keras.callbacks import ModelCheckpoint
from keras.models import Sequential, Model
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from skimage.color import rgb2lab, lab2rgb, rgb2gray, gray2rgb
from skimage.transform import resize
from skimage.io import imsave
from skimage import exposure

Using TensorFlow backend.


In [32]:
# Memory error, so to feed model in batches
batch_size = 20
train_dir = '../Capstone/train_images/'

In [33]:
#custom image augmentation function
def custom_preprocessing(image):
    state = random.randint(0,2)
    if state == 0:
        processed_img = exposure.equalize_adapthist((image*1.0/255), clip_limit=0.02)
    elif state == 1:
        processed_img = exposure.equalize_hist(image)
    elif state == 2:
        p2, p98 = np.percentile(image, (2,98))
        processed_img = exposure.rescale_intensity(image, in_range=(p2,p98))
    return processed_img

image_gen = ImageDataGenerator(
        horizontal_flip=True,
        vertical_flip=True,
        preprocessing_function=custom_preprocessing)

In [34]:
# LAB range L (0,100); ab (-128,127)
# a is green - red ; b is blue -yellow
# function to split training set X train, y train and produce augmented images       
def image_a_b_gen(batch_size):
    for i in image_gen.flow_from_directory(train_dir, batch_size=batch_size, class_mode=None, shuffle=False):
        lab_batch = (rgb2lab(i))
        #normalize the ab channels to 0-1
        lab_scaled = (lab_batch) / [100, 255, 255]
        X_train = lab_scaled[:,:,:,0]
        X_train = X_train.reshape(X_train.shape+(1,))
        y_train = lab_scaled[:,:,:,1:]
        yield ([X_train, y_train])

In [35]:
class DSSIMObjective:
    def __init__(self, k1=0.01, k2=0.03, max_value=1.0):
        self.__name__ = 'DSSIMObjective'
        self.k1 = k1
        self.k2 = k2
        self.max_value = max_value
        self.backend = K.backend()

    def __int_shape(self, x):
        return K.int_shape(x) if self.backend == 'tensorflow' else K.shape(x)

    def __call__(self, y_true, y_pred):
        ch = K.shape(y_pred)[-1]

        def _fspecial_gauss(size, sigma):
            #Function to mimic the 'fspecial' gaussian MATLAB function.
            coords = np.arange(0, size, dtype=K.floatx())
            coords -= (size - 1 ) / 2.0
            g = coords**2
            g *= ( -0.5 / (sigma**2) )
            g = np.reshape (g, (1,-1)) + np.reshape(g, (-1,1) )
            g = K.constant ( np.reshape (g, (1,-1)) )
            g = K.softmax(g)
            g = K.reshape (g, (size, size, 1, 1)) 
            g = K.tile (g, (1,1,ch,1))
            return g
                  
        kernel = _fspecial_gauss(11,1.5)

        def reducer(x):
            return K.depthwise_conv2d(x, kernel, strides=(1, 1), padding='valid')

        c1 = (self.k1 * self.max_value) ** 2
        c2 = (self.k2 * self.max_value) ** 2
        
        mean0 = reducer(y_true)
        mean1 = reducer(y_pred)
        num0 = mean0 * mean1 * 2.0
        den0 = K.square(mean0) + K.square(mean1)
        luminance = (num0 + c1) / (den0 + c1)
        
        num1 = reducer(y_true * y_pred) * 2.0
        den1 = reducer(K.square(y_true) + K.square(y_pred))
        c2 *= 1.0 #compensation factor
        cs = (num1 - num0 + c2) / (den1 - den0 + c2)

        ssim_val = K.mean(luminance * cs, axis=(-3, -2) )
        return K.mean( (1.0 - ssim_val ) / 2.0 )

In [36]:
ssim_loss = DSSIMObjective()

In [37]:
model = Sequential()

#Downsampling batch
model.add(InputLayer(input_shape=(256, 256, 1)))
model.add(Conv2D(64, (3, 3),activation='relu', padding='same'))                     
model.add(BatchNormalization())                                               #(bs,256, 256,64)
model.add(Conv2D(64, (3, 3), activation='relu',padding='same', strides=2))            
model.add(BatchNormalization())                                               #(bs,128,128,64)
model.add(Conv2D(128, (3, 3), activation='relu',padding='same'))                     
model.add(BatchNormalization())                                                #(bs,128,128,128)
model.add(Conv2D(128, (3, 3), activation='relu',padding='same', strides=2))          
model.add(BatchNormalization())                                                   #(bs,64,64,128)
model.add(Conv2D(256, (3, 3), activation='relu',padding='same'))                     
model.add(BatchNormalization())                                                 #(bs,64,64,256)
model.add(Conv2D(256, (3, 3), activation='relu',padding='same', strides=2))            
model.add(BatchNormalization())                                                    #(bs,32,32,256)
model.add(Conv2D(512, (3, 3), activation='relu',padding='same'))                     
model.add(BatchNormalization())                                                   #(bs,32,32,512)

#Upsampling batch
model.add(Conv2D(512, (3, 3), activation='relu', padding='same'))        
model.add(BatchNormalization())
model.add(Dropout(0.5))                                                       #(bs,32,32,512)
model.add(Conv2D(256, (3, 3), activation='relu', padding='same'))     
model.add(BatchNormalization())
model.add(Dropout(0.5))                                                       #(bs,32,32,256)
model.add(Conv2D(128, (3, 3), activation='relu', padding='same'))        
model.add(BatchNormalization())
model.add(Dropout(0.5))                                                       #(bs,32,32,128)
model.add(Conv2D(64, (3, 3), activation='relu', padding='same'))        
model.add(BatchNormalization()) 
model.add(UpSampling2D((2, 2)))                                               #(bs,64,64,64)
model.add(Conv2D(32, (3, 3), activation='relu', padding='same'))        
model.add(BatchNormalization())
model.add(UpSampling2D((2, 2)))                                               #(bs,128,128,32)
model.add(Conv2D(2, (3, 3), activation='tanh', padding='same'))         
model.add(UpSampling2D((2, 2)))                                               #(bs,256,256,2)

# Finish model
model.compile(optimizer='rmsprop', loss=ssim_loss ,metrics=['mse','mean_absolute_error'])

In [None]:
# Train model
model.fit_generator(image_a_b_gen(batch_size), steps_per_epoch=(9306//batch_size), epochs=10)

Epoch 1/10
Found 9306 images belonging to 1 classes.


  warn("This might be a color image. The histogram will be "
  .format(dtypeobj_in, dtypeobj_out))
  .format(dtypeobj_in, dtypeobj_out))


  1/465 [..............................] - ETA: 4:18:15 - loss: 0.4999 - mean_squared_error: 0.4796 - mean_absolute_error: 0.6211

In [117]:
model.save('C:/Users/n3rDx/Desktop/Homework Upload/Capstone/5th_expt_rmsprop_lab_relu_NORM_10epochs.hdf5')

In [118]:
# Load black and white images
test = []
for filename in os.listdir('../test_images/test'):
        test.append(img_to_array(load_img('C:/Users/n3rDx/Desktop/Homework Upload/Capstone/test_images/test/'+filename)))
test = np.array(test).astype('float')*1.0/255   #Original image must come in as (0,1)
lab_test = (rgb2lab(test)[:,:,:,0])/100       #output after rgb2lab should be ([0:1,-1:1, -1:1])
lab_test = lab_test.reshape(lab_test.shape+(1,))

In [122]:
# Test model
output = model.predict(lab_test)

In [128]:
# Output colorizations
for i in range(len(output)):
    cur = np.zeros((256, 256, 3))
    cur[:,:,0] = lab_test[i][:,:,0]
    cur[:,:,1:] = output[i]
    cur = (cur * [100, 255, 255])
    picture = lab2rgb(cur)
    picture *= 255
    imsave("C:/Users/n3rDx/Desktop/Homework Upload/Capstone/results/"+str(i)+".jpg", picture.astype('uint8'))