## Model Schema II

In [141]:
# Importing libraries
# Math and file management
import numpy as np
import re
from tqdm.auto import tqdm
import sys
import os
import urllib.request
import tarfile
import pickle
import fnmatch
import random
# For Model Building
from keras.layers import Conv2D, MaxPooling2D, Activation,BatchNormalization, UpSampling2D, Dropout, Flatten, Dense, Input, LeakyReLU, Conv2DTranspose,AveragePooling2D, Concatenate
from keras.preprocessing import image
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Model
from keras.models import load_model
from keras.optimizers import Adam
import keras.backend as K
from keras_contrib.layers.normalization.instancenormalization import InstanceNormalization
# For image processing
import skimage.color as imcolor
import PIL as Image
import matplotlib.pyplot as plt
# For processing time measurement
import time

In [142]:
# **THIS STEP SHOULD BE MADE AD HOC DEPENDING ON THE SOURCE OF THE DATASET**
# IN THIS CASE WE HAVE TO CHANGE THE DIRECTORY TO THE PATH WHERE THE DATASET IS LOCATED
def extract(pattern, compression_format, path):
    cwd=os.chdir(path)
    os.walk(cwd)
    for root, dirs, files in os.walk(path):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                tar = tarfile.open(name, compression_format)
                tar.extractall()
                tar.close()
    return 

In [None]:
extract('*.tar.gz',
        'r:gz',
        r'\Académico\Posgrados\2019 - Maestría en Ciencia de Datos e Innovación Empresarial\Tesis\Datasets')

In [144]:
def image_read(file, size=(256,256)):
    '''
    This function loads and resizes the image to the passed size and transforms that image into an array
    Default image size is set to be 256x256
    '''
    img = image.load_img(file, target_size=size)
    img = image.img_to_array(img)
    return img

In [145]:
def image_convert(image_paths,size=256,channels=3):
    '''
    Redimensions images to Numpy arrays of a certain size and channels. Default values are set to 256x256x3 for coloured
    images.
    Parameters:
    file_paths: a path to the image files
    size: an int or a 2x2 tuple to define the size of an image
    channels: number of channels to define in the numpy array
    '''
    # If size is an int
    if isinstance(size, int):
        # build a zeros matrix of the size of the image
        all_images_to_array = np.zeros((len(image_paths), size, size, channels), dtype='int64')
        for ind, i in enumerate(image_paths):
            # reads image
            img = image_read(i)
            all_images_to_array[ind] = img.astype('int64')
        print('All Images shape: {} size: {:,}'.format(all_images_to_array.shape, all_images_to_array.size))
    else:
        all_images_to_array = np.zeros((len(image_paths), size[0], size[1], channels), dtype='int64')
        for ind, i in enumerate(image_paths):
            img = read_img(i)
            all_images_to_array[ind] = img.astype('int64')
        print('All Images shape: {} size: {:,}'.format(all_images_to_array.shape, all_images_to_array.size))
    return all_images_to_array

In [146]:
def find(pattern, path):
    result = []
    for root, dirs, files in os.walk(path):
        for name in files:
            if fnmatch.fnmatch(name, pattern):
                result.append(os.path.join(root, name))
    return result

In [147]:
image_paths=find('*.jpg', r'\Académico\Posgrados\2019 - Maestría en Ciencia de Datos e Innovación Empresarial\Tesis\Datasets\images2')

In [148]:
X_train=image_convert(image_paths)

All Images shape: (10, 256, 256, 3) size: 1,966,080


In [149]:
def rgb_to_lab(img, l=False, ab=False):
    """
    Takes in RGB channels in range 0-255 and outputs L or AB channels in range -1 to 1
    """
    img = img / 255
    lum = imcolor.rgb2lab(img)[:,:,0]
    lum = (lum / 50) - 1
    lum = lum[...,np.newaxis]

    a_b = imcolor.rgb2lab(img)[:,:,1:]
    a_b = (a_b + 128) / 255 * 2 - 1
    if l:
        return lum
    else: return a_b

def lab_to_rgb(img):
    """
    Takes in LAB channels in range -1 to 1 and out puts RGB chanels in range 0-255
    """
    new_img = np.zeros((256,256,3))
    for i in range(len(img)):
        for j in range(len(img[i])):
            pix = img[i,j]
            new_img[i,j] = [(pix[0] + 1) * 50,(pix[1] +1) / 2 * 255 - 128,(pix[2] +1) / 2 * 255 - 128]
    new_img = imcolor.lab2rgb(new_img) * 255
    new_img = new_img.astype('uint8')
    return new_img

In [150]:
L = np.array([rgb_to_lab(image, l=True) for image in X_train])
AB = np.array([rgb_to_lab(image, ab=True) for image in X_train])

In [151]:
L_AB_channels = (L,AB)

In [152]:
with open('l_ab_channels.p','wb') as f:
        pickle.dump(L_AB_channels,f)

In [153]:
def load_images(filepath):
    '''
    Loads in pickle files, specifically the L and AB channels
    '''
    with open(filepath, 'rb') as f:
        return pickle.load(f)

In [154]:
X_train_L, X_train_AB = load_images('l_ab_channels.p')

In [155]:
def resnet_block(x ,num_conv=2, num_filters=512,kernel_size=(3,3),padding='same',strides=2):
    '''
    This function defines a ResNet Block composed of two convolution layers and that returns the sum of the inputs and the
    convolution outputs.
    Parameters
    x: is the tensor which will be used as input to the convolution layer
    num_conv: is the number of convolutions inside the block
    num_filters: is an int that describes the number of output filters in the convolution
    kernel size: is an int or tuple that describes the size of the convolution window
    padding: padding with zeros the image so that the kernel fits the input image or not. Options: 'valid' or 'same'
    strides: is the number of pixels shifts over the input matrix. 
    '''
    input=x
    for i in range(num_conv):
        
        input=Conv2D(num_filters,kernel_size=kernel_size,padding=padding,strides=strides)(input)
        input=InstanceNormalization()(input)
        input=LeakyReLU(0.2)(input)


    return (input + x)

In [156]:
def generator(filters=64,num_enc_layers=4,num_resblock=4,name="Generator"):
    ''' 
    The generator per se is an autoencoder built by a series of convolution layers that initially extract features of the
    input image.
    '''

    # defining input
    x_0=Input(shape=(256,256,1))
    
    '''
    Adding first layer of the encoder model: 64 filters, 5x5 kernel size, 2 so the input size is reduced to half,
    input size is the image size: (256,256,1), number of channels 1 for the luminosity channel.
    We will use InstanceNormalization through the model and Leaky Relu with and alfa of 0.2
    as activation function for the encoder, while relu as activation for the decoder.
    between both of them, in the latent space we insert 4 resnet blocks.
    '''
    
    
    for lay in range(num_enc_layers):
        x=Conv2D(filters*lay,(3,3),padding='same',strides=2,input_shape=(256,256,1))(x_0)
        x=InstanceNormalization()(x)
        x=LeakyReLU(0.2)(x)
    
    '''
----------------------------------LATENT SPACE---------------------------------------------
    '''
    #for r in range(num_resblock):
    #    x=resnet_block(x)    
    '''
----------------------------------LATENT SPACE---------------------------------------------
    '''
    
    x=Conv2DTranspose(256,(3,3),padding='same',strides=2)(x)
    x=InstanceNormalization()(x)
    x=Activation('relu')(x)
              
    x=Conv2DTranspose(128,(3,3),padding='same',strides=2)(x)
    x=InstanceNormalization()(x)
    x=Activation('relu')(x)
    
    x=Conv2DTranspose(64,(3,3),padding='same',strides=2)(x)
    x=InstanceNormalization()(x)
    x=Activation('relu')(x)
              
    x=Conv2DTranspose(32,(5,5),padding='same',strides=2)(x)
    x=InstanceNormalization()(x)
    x=Activation('relu')(x)
    
    x=Conv2D(2,(3,3),padding='same')(x)
    output=Activation('tanh')(x)
    
    model=Model(x_0,output,name=name)

    return model

In [157]:
def discriminator(name="Discriminator"):
    
    # defining input
    x_0=Input(shape=(256,256,2))
    
    x=Conv2D(32,(3,3), padding='same',strides=2,input_shape=(256,256,2))(x_0)
    x=LeakyReLU(0.2)(x)
    x=Dropout(0.25)(x)
        
    x=Conv2D(64,(3,3),padding='same',strides=2)(x)
    x=BatchNormalization()(x)
    x=LeakyReLU(0.2)(x)
    x=Dropout(0.25)(x)
        
        
    x=Conv2D(128,(3,3), padding='same', strides=2)(x)
    x=BatchNormalization()(x)
    x=LeakyReLU(0.2)(x)
    x=Dropout(0.25)(x)
        
        
    x=Conv2D(256,(3,3), padding='same',strides=2)(x)
    x=BatchNormalization()(x)
    x=LeakyReLU(0.2)(x)
    x=Dropout(0.25)(x)
        
        
    x=Flatten()(x)
    x=Dense(1)(x)
    output=Activation('sigmoid')(x)
        
    model=Model(x_0,output,name=name)
    
    return model

In [158]:
d_image_shape = (256,256,2)
g_image_shape = (256,256,1)
discriminator = discriminator()
discriminator.compile(loss='binary_crossentropy', 
                      optimizer=Adam(lr=0.00008,beta_1=0.5,beta_2=0.999), 
                    metrics=['accuracy']) 
  
#Making the Discriminator untrainable so that the generator can learn from fixed gradient 
discriminator.trainable = False

# Build the Generator 
generator = generator()
  
#Defining the combined model of the Generator and the Discriminator 
l_channel = Input(shape=g_image_shape)
image = generator(l_channel) 
valid = discriminator(image)
  
combined_network = Model(l_channel, valid) 
combined_network.compile(loss='binary_crossentropy', 
                         optimizer=Adam(lr=0.0001,beta_1=0.5,beta_2=0.999))

In [161]:
#creates lists to log the losses and accuracy
gen_losses = []
disc_real_losses = []
disc_fake_losses=[] 
disc_acc = []

#train the generator on a full set of 320 and the discriminator on a half set of 160 for each epoch
#discriminator is given real and fake y's while generator is always given real y's
n = 320
y_train_fake = np.zeros([160,1])
y_train_real = np.ones([160,1])
y_gen = np.ones([n,1])

#Optional label smoothing
#y_train_real -= .1


#Pick batch size and number of epochs, number of epochs depends on the number of photos per epoch set above
num_epochs=10
batch_size=32

In [162]:
for epoch in tqdm(range(1,num_epochs+1)):
    #shuffle L and AB channels then take a subset corresponding to each networks training size
    np.random.shuffle(X_train_L)
    l = X_train_L[:n]
    np.random.shuffle(X_train_AB)
    ab = X_train_AB[:160]
    
    fake_images = generator.predict(l[:160], verbose=1)
    
    #Train on Real AB channels
    d_loss_real = discriminator.fit(x=ab, y= y_train_real,batch_size=32,epochs=1,verbose=1) 
    disc_real_losses.append(d_loss_real.history['loss'][-1])
    
    #Train on fake AB channels
    d_loss_fake = discriminator.fit(x=fake_images,y=y_train_fake,batch_size=32,epochs=1,verbose=1)
    disc_fake_losses.append(d_loss_fake.history['loss'][-1])
    
    #append the loss and accuracy and print loss
    disc_acc.append(d_loss_fake.history['acc'][-1])
    

    #Train the gan by producing AB channels from L
    g_loss = combined_network.fit(x=l, y=y_gen,batch_size=32,epochs=1,verbose=1)
    #append and print generator loss
    gen_losses.append(g_loss.history['loss'][-1])
   
    #every 50 epochs it prints a generated photo and every 100 it saves the model under that epoch
    if epoch % 50 == 0:
        print('Reached epoch:',epoch)
        pred = generator.predict(X_test_L[2].reshape(1,256,256,1))
        img = lab_to_rgb(np.dstack((X_test_L[2],pred.reshape(256,256,2))))
        plt.imshow(img)
        plt.show()
        if epoch % 100 == 0:
              generator.save('generator_' + str(epoch)+ '_v3.h5')

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=10.0), HTML(value='')))




KeyboardInterrupt: 