In [None]:
import keras
from keras.applications.inception_resnet_v2 import InceptionResNetV2
from keras.preprocessing import image
from keras.engine import Layer
from keras.applications.inception_resnet_v2 import preprocess_input
from keras.layers import Conv2D, UpSampling2D, InputLayer, Conv2DTranspose, Input, Reshape, merge, concatenate, Activation, Dense, Dropout, Flatten, Softmax
from keras.layers.normalization import BatchNormalization
from keras.callbacks import TensorBoard 
from keras.models import Sequential, Model
from keras.layers.core import RepeatVector, Permute
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img
from keras.callbacks import *
from skimage.color import rgb2lab, lab2rgb, rgb2gray, gray2rgb
from skimage.transform import resize
from skimage.io import imsave
import numpy as np
import os
import random
import tensorflow as tf
import cv2
import shutil
import io
from tensorflow.image import psnr
from google.colab import drive 
from google.colab import files


In [None]:
drive.mount('/content/gdrive/')

Mounted at /content/gdrive/


# Initializando os parametros, funçãos e modelos

---




In [None]:
#Training/Testing Variables
epo = 500   #Number of epochs for Training
noImages = 70
noTestImages = 30
batch = 35   #Size of Gradient Descent Batch
customLoss = False 
imageDir =  'gdrive/My Drive/100Pool/'
testDir = 'gdrive/My Drive/100Pool_Test/'

if customLoss:
  resultDir = 'gdrive/My Drive/MYCNN_CUSTOM/Pictures_' + str(batch) + '_' + str(noImages) + '_' + str(epo) 
  testResultDir = 'gdrive/My Drive/MYCNN_CUSTOM/Testpictures_' + str(batch) + '_' + str(noImages) + '_' + str(epo) 
else:
  resultDir = 'gdrive/My Drive/MYCNN_PSNR/Pictures_' + str(batch) + '_' + str(noImages) + '_' + str(epo) 
  testResultDir = 'gdrive/My Drive/MYCNN_PSNR/Testpictures_' + str(batch) + '_' + str(noImages) + '_' + str(epo) 




In [None]:
# Prediction is assumed between -1 and 1, this Loss Function penalts color values around 0 (brownish scale) 

def customLossFunction(yTrue, yPred):
  weighted_mse =   (((yTrue - yPred) ** 2)*(1+0.3/(abs(yPred)+0.15)))
  return tf.reduce_mean(weighted_mse, axis = [1,2,3])

def custom_PSNR(yTrue,yPred):
  return -psnr(yTrue, yPred,2)

In [None]:
# Criando e initializando o modelo
embed_input = Input(shape=(1000,))

#Encoder
encoder_input = Input(shape=(256, 256, 1,))
encoder_output = Conv2D(64, (3,3), activation='relu', padding='same', strides=2)(encoder_input)
encoder_output = Conv2D(128, (3,3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(128, (3,3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(256, (3,3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3,3), activation='relu', padding='same', strides=2)(encoder_output)
encoder_output = Conv2D(512, (3,3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(512, (3,3), activation='relu', padding='same')(encoder_output)
encoder_output = Conv2D(256, (3,3), activation='relu', padding='same')(encoder_output)

#Fusion
lsd_output = RepeatVector(32 * 32)(embed_input) 
lsd_output = Reshape(([32, 32, 1000]))(lsd_output)
lsd_output = concatenate([encoder_output, lsd_output], axis=3) 
lsd_output = Conv2D(256, (1, 1), activation='relu', padding='same')(lsd_output) 

#Decoder
decoder_output = Conv2D(128, (3,3), activation='relu', padding='same')(lsd_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(64, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)
decoder_output = Conv2D(32, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(16, (3,3), activation='relu', padding='same')(decoder_output)
decoder_output = Conv2D(2, (3, 3), activation='tanh', padding='same')(decoder_output)
decoder_output = UpSampling2D((2, 2))(decoder_output)


model = Model(inputs=[encoder_input, embed_input], outputs=decoder_output)

if customLoss:
  model.compile(optimizer = 'adamax', loss = customLossFunction)
else:
  model.compile(optimizer = 'adamax', loss = custom_PSNR)

In [None]:
# Carraga pre-trained Classifier
inception = InceptionResNetV2(weights=None, include_top=True)
inception.load_weights('gdrive/My Drive/Inception_Weights/inception_resnet_v2_weights_tf_dim_ordering_tf_kernels.h5')

In [None]:
# Funcao para rescalar imagens para classifier
def create_inception_embedding(rawimage):
    grayscaled_rgb = gray2rgb(rgb2gray(rawimage))  
    grayscaled_rgb_resized = resize(grayscaled_rgb,(299,299,3), mode = 'constant')
    grayscaled_rgb_resized = preprocess_input(grayscaled_rgb_resized) 

    grayscaled_rgb_resized = grayscaled_rgb_resized[np.newaxis,...]
    return grayscaled_rgb_resized

In [None]:
# Framework para salvar checkpoints

if customLoss:
  cppath="/content/gdrive/My Drive/MYCNN_CUSTOM/Model" + "_" + str(batch) + "_"  + str(noImages) + "_" + str(epo) + ".hdf5"
else:
  cppath="/content/gdrive/My Drive/MYCNN_PSNR/Model" + "_" + str(batch) + "_"  + str(noImages) + "_" + str(epo) + ".hdf5"
cp = ModelCheckpoint(cppath,save_freq='epoch',  verbose=1, mode='min')
hist = History()
callbacks_list = [cp,hist]

# Preparando os dados

In [None]:
# Collectando imagens

nameDict = list()
dataArray = np.zeros((noImages,256,256,3))
embed = np.zeros((noImages,1000))
i = 0
 

for subdir, dirs, filesit in os.walk(imageDir):
  for file in filesit:
    rawimage = img_to_array(load_img(subdir +'/' +  file))
    rawimage = np.array(rawimage, dtype=float)
    
    embedim = inception.predict(create_inception_embedding(rawimage))
    
    image = cv2.copyMakeBorder(rawimage,3,3,3,3, cv2.BORDER_REPLICATE)
    image = rgb2lab(1.0/255*image)
    nameDict.append(file[:-4])
    

    embed[i,:] = embedim
    dataArray[i,:,:,:] = image
    i+=1
    
    if i == noImages:
      break



In [None]:
# Organiza estruturas de dados
XData = dataArray[:,:,:,0]
XData = XData.reshape((noImages,256,256,1))
YData = dataArray[:,:,:,1:]/128

In [None]:
# Se precisa carregar um modelo de checkpoint
#model = keras.models.load_model(cppath, custom_objects = {'customLossFunction' : customLossFunction, 'custom_PSNR' : custom_PSNR})

# Treinando o modelo

In [None]:
model.fit(x = [XData,embed], y = YData, batch_size = batch, epochs = 25, callbacks = callbacks_list)

Epoch 1/25
Epoch 00001: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 2/25
Epoch 00002: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 3/25
Epoch 00003: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 4/25
Epoch 00004: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 5/25
Epoch 00005: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 6/25
Epoch 00006: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 7/25
Epoch 00007: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 8/25
Epoch 00008: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 9/25
Epoch 00009: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 10/25
Epoch 00010: saving model to /content/gdrive/My Drive/MYCNN_CUSTOM/Model_10_70_50.hdf5
Epoch 11/25
Epoch 0

<tensorflow.python.keras.callbacks.History at 0x7f0c4ddeb2e8>

In [None]:
histFile = io.open( file = cppath[:-5] + "History" + "_" + str(batch) + "_"  + str(noImages) + "_" + str(epo) + ".txt", mode = 'w+' )
histFile.write(str(hist.history['loss']))
histFile.close()

In [None]:
print(model.evaluate([XData,embed], YData, batch_size=batch))
output = model.predict([XData,embed])
output *= 128

0.009371713735163212


# Reconstruindo os resultados

In [None]:
os.mkdir(resultDir)

In [None]:
# Output colorizations
cur = np.zeros((noImages,256, 256, 3))
cur[:,:,:,0] = XData[:,:,:,0]
cur[:,:,:,1:] = output[:]
for i in range(0,noImages):

  imsave(resultDir +'/' + nameDict[i] + "_result.png", lab2rgb(cur[i,:,:,:]))
  imsave(resultDir +'/'+ nameDict[i] + "_gray.png", rgb2gray(lab2rgb(cur[i,:,:,:])))
  imsave(resultDir +'/'+ nameDict[i] + "_original.png", lab2rgb(dataArray[i,:,:,:]))
  
  

  return xyz2rgb(lab2xyz(lab, illuminant, observer))
  return xyz2rgb(lab2xyz(lab, illuminant, observer))
  return xyz2rgb(lab2xyz(lab, illuminant, observer))


In [None]:
shutil.make_archive(resultDir, 'zip', resultDir)
files.download(resultDir +'.zip') 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

# Predict the Test Image


In [None]:
# Carregando o modelo
model = keras.models.load_model(cppath, custom_objects = {'customLossFunction' : customLossFunction, 'custom_PSNR' : custom_PSNR})

In [None]:
# Collectando imagens

test_nameDict = list()
test_Array = np.zeros((noTestImages,256,256,3))
test_embed = np.zeros((noTestImages,1000))
i = 0
 

for subdir, dirs, filesit in os.walk(testDir):
  for file in filesit:
    rawimage = img_to_array(load_img(subdir +'/' +  file))
    rawimage = np.array(rawimage, dtype=float)
    
    embedim = inception.predict(create_inception_embedding(rawimage))
    
    image = cv2.copyMakeBorder(rawimage,3,3,3,3, cv2.BORDER_REPLICATE)
    image = rgb2lab(1.0/255*image)
    test_nameDict.append(file[:-4])


    test_embed[i,:] = embedim
    test_Array[i,:,:,:] = image
    i+=1
    
    if i == noTestImages:
      break



In [None]:
# Organiza estruturas de dados
X_test = test_Array[:,:,:,0]
X_test = X_test.reshape((noTestImages,256,256,1))
Y_test = test_Array[:,:,:,1:]/128

In [None]:
print(model.evaluate([X_test,test_embed], Y_test, batch_size=noTestImages))
output = model.predict([X_test,test_embed])
output *= 128

-22.550500869750977


In [None]:
os.mkdir(testResultDir)

In [None]:
# Output colorizations
cur = np.zeros((noTestImages,256, 256, 3))
cur[:,:,:,0] = X_test[:,:,:,0]
cur[:,:,:,1:] = output[:]
for i in range(0,noTestImages):

  imsave(testResultDir +'/' + test_nameDict[i] + "_result.png", lab2rgb(cur[i,:,:,:]))
  imsave(testResultDir +'/'+ test_nameDict[i] + "_gray.png", rgb2gray(lab2rgb(cur[i,:,:,:])))
  imsave(testResultDir +'/'+ test_nameDict[i] + "_original.png", lab2rgb(test_Array[i,:,:,:]))
  
  

  return xyz2rgb(lab2xyz(lab, illuminant, observer))


In [None]:
shutil.make_archive(testResultDir, 'zip', testResultDir)
files.download(testResultDir +'.zip') 

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>