In [1]:
import logging
logging.getLogger('tensorflow').disabled = True  # Biar ga banyak warning!
logging.getLogger('absl').disabled = True        # Biar ga banyak warning!

import config
import os, cv2
import time

import numpy as np
from tensorflow import keras

#### Dataset

In [2]:
class generator(keras.utils.Sequence):
    augmentation = keras.Sequential(
        [
            keras.layers.RandomFlip(mode='horizontal'),
            keras.layers.RandomZoom(height_factor=(-0.2,0.2)),
            keras.layers.RandomTranslation(height_factor=(-0.2,0.2), width_factor=(-0.2,0.2)),
            keras.layers.RandomRotation(factor = (0.2))
        ]
    )
    
    def __init__(self, path, batch_size, input_shape, output_shape):
        self.path = path
        self.file_names = os.listdir(path)
        self.batch_size = batch_size
        self.input_shape = input_shape
        self.output_shape = output_shape
    
    def __len__(self):
        return int(np.ceil( len(self.file_names) / self.batch_size ))
    
    def __getitem__(self, index):
        def load_image(file_name):
            image = cv2.imread( os.path.join(self.path, file_name) )
            return image
        
        def bgr2gray(image):
            image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
            return image
        
        def resize_image(image, width, height):
            image = cv2.resize( image, (width, height) )
            return image
        
        filenames = self.file_names[index*self.batch_size:min((index+1)*self.batch_size, len(self.file_names))]
        batch_color = np.array([load_image(filename) for filename in filenames])
        batch_color = self.augmentation(batch_color).numpy()
        batch_grayscale = [bgr2gray(image) for image in batch_color]
        
        batch_color = [resize_image(image, self.output_shape[1], self.output_shape[0]) for image in batch_color]
        batch_grayscale = [resize_image(image, self.input_shape[1], self.input_shape[0]) for image in batch_grayscale]
        
        batch_color = (np.asarray(batch_color, 'float32') / 127.5 ) - 1
        batch_grayscale = (np.asarray(batch_grayscale, 'float32') / 127.5 ) -1
        
        assert(batch_grayscale[0].shape == self.input_shape)
        assert(batch_color[0].shape == self.output_shape)
        
        return batch_grayscale, batch_color
    
    def shuffle(self):
        np.random.shuffle(self.file_names)

In [3]:
batch_size = 8
train_dataset = generator(config.dir_train, batch_size, config.input_shape, config.output_shape)

#### Model

In [4]:
def create_model_generator(input_shape):
    init_weight = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
        
    def encoder(in_layer, k, batch_normalization=True):
        net = keras.layers.Conv2D(k, kernel_size=(4,4), strides=(2,2), padding='same', kernel_initializer=init_weight)(in_layer)
        if(batch_normalization):
            net = keras.layers.BatchNormalization()(net, training=True)
        net = keras.layers.LeakyReLU(alpha=0.2)(net)
        return net
    
    def decoder(in_layer, skip_layer, k, batch_normalization=True, dropout=True):
        net = keras.layers.Conv2DTranspose(k, kernel_size=(4,4), strides=(2,2), padding='same', kernel_initializer=init_weight)(in_layer)
        if(batch_normalization):
            net = keras.layers.BatchNormalization()(net, training=True)
        if(dropout):
            net = keras.layers.Dropout(0.5)(net, training=True)
        net = keras.layers.Concatenate()([net, skip_layer])
        net = keras.layers.ReLU()(net)
        return net
        
    input_layer = keras.Input(shape=input_shape)
    input_layer = keras.layers.Reshape((input_shape[0], input_shape[1], 1))(input_layer)
    
    encoder_1 = encoder(input_layer, 64, batch_normalization=False)
    encoder_2 = encoder(encoder_1, 128)
    encoder_3 = encoder(encoder_2, 256)
    encoder_4 = encoder(encoder_3, 512)
    encoder_5 = encoder(encoder_4, 512)
    encoder_6 = encoder(encoder_5, 512)
    encoder_7 = encoder(encoder_6, 512)
    encoder_8 = encoder(encoder_7, 512)
    
    decoder_1 = decoder(encoder_8, encoder_7, 512)
    decoder_2 = decoder(decoder_1, encoder_6, 512)
    decoder_3 = decoder(decoder_2, encoder_5, 512)
    decoder_4 = decoder(decoder_3, encoder_4, 512, dropout=False)
    decoder_5 = decoder(decoder_4, encoder_3, 256, dropout=False)
    decoder_6 = decoder(decoder_5, encoder_2, 128, dropout=False)
    decoder_7 = decoder(decoder_6, encoder_1, 64, dropout=False)
    
    output_layer = keras.layers.Conv2DTranspose(3, kernel_size=(4,4), strides=(2,2), padding='same', kernel_initializer=init_weight)(decoder_7)
    output_layer = keras.layers.Activation(keras.activations.tanh)(output_layer)
    model=keras.Model(inputs=[input_layer],outputs=[output_layer])
    return model

def create_model_discriminator(input_shape, output_shape):
    init_weight = keras.initializers.RandomNormal(mean=0.0, stddev=0.02)
        
    def encoder(in_layer, k, batch_normalization=True):
        net = keras.layers.Conv2D(k, kernel_size=(4,4), strides=(2,2), padding='same', kernel_initializer=init_weight)(in_layer)
        if(batch_normalization):
            net = keras.layers.BatchNormalization()(net, training=True)
        net = keras.layers.LeakyReLU(alpha=0.2)(net)
        return net
    
    input_layer_1 = keras.Input(shape=input_shape)
    input_layer_1 = keras.layers.Reshape((input_shape[0], input_shape[1], 1))(input_layer_1)
    
    input_layer_2 = keras.Input(shape=output_shape)
    
    net = keras.layers.Concatenate()([input_layer_1, input_layer_2])
    net = encoder(net, 64, batch_normalization=False)
    net = encoder(net, 128)
    net = encoder(net, 256)
    net = encoder(net, 512)
    
    output_layer = keras.layers.Conv2D(1, kernel_size=(4,4), padding='same', kernel_initializer=init_weight)(net)
    output_layer = keras.layers.Activation(keras.activations.sigmoid)(output_layer)
    model=keras.Model(inputs=[input_layer_1, input_layer_2],outputs=[output_layer])
    model.compile(
        loss = 'binary_crossentropy',
        loss_weights = [0.5],
        optimizer = keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5, beta_2=0.999)
    )
    return model

def create_model_gan(model_generator, model_discriminator, input_shape):
    for layer in model_discriminator.layers:
        if(not isinstance(layer, keras.layers.BatchNormalization)):
            layer.trainable = False
        
    input_layer = keras.Input(shape=input_shape)
    generator_output = model_generator(input_layer)
    discriminator_output = model_discriminator([input_layer, generator_output])
    
    model=keras.Model(inputs=[input_layer],outputs=[generator_output, discriminator_output])
    model.compile(
        loss = ['mae', 'binary_crossentropy'],
        loss_weights = [100, 1],
        optimizer = keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5, beta_2=0.999)
    )
    return model

# =================================================================

def load_model(path):
    print('Loading model from %s'%(path), end='\r')
    model = keras.models.load_model(path)
    print('Successfully loaded model from %s'%(path))
    return model

def save_model(path, model):
    print('Saving model to %s'%(path), end='\r')
    model.save(path)
    print('Successfully saved model to %s'%(path))

def save_plot(path, model):
    print('Saving model plot to %s'%(path), end='\r')
    keras.utils.plot_model(model, path, show_shapes=True, expand_nested=True)
    print('Successfully saved model plot to %s'%(path))

def save_summary(path, model):
    print('Saving model summary to %s'%(path), end='\r')
    with open(path, 'w') as f:
        model.summary(print_fn=lambda x:f.write(x + '\n'), expand_nested=True)
    print('Successfully saved model summary to %s'%(path))
    
# =================================================================

def load_model_gan(path):
    model_generator = load_model(os.path.join(path, 'generator'))
    model_discriminator = load_model(os.path.join(path, 'discriminator'))
    return model_generator, model_discriminator

def save_model_gan(path, model_generator, model_discriminator):
    os.makedirs(path, exist_ok=True)
    save_model(os.path.join(path, 'generator'), model_generator)
    save_model(os.path.join(path, 'discriminator'), model_discriminator)

def save_plot_gan(path, model_generator, model_discriminator):
    os.makedirs(path, exist_ok=True)
    save_plot(os.path.join(path, 'generator.png'), model_generator)
    save_plot(os.path.join(path, 'discriminator.png'), model_discriminator)

def save_summary_gan(path, model_generator, model_discriminator):
    os.makedirs(path, exist_ok=True)
    save_summary(os.path.join(path, 'generator.txt'), model_generator)
    save_summary(os.path.join(path, 'discriminator.txt'), model_discriminator)

---

In [5]:
model_name = 'v1-pix2pix'

In [6]:
path_model_saved   = os.path.join(config.dir_model_saved  , model_name)
path_model_plot    = os.path.join(config.dir_model_plot   , model_name)
path_model_summary = os.path.join(config.dir_model_summary, model_name)

path_progress_image = os.path.join(config.dir_progress_image, model_name)
path_progress_loss  = os.path.join(config.dir_progress_loss , model_name)

---

Initialize model from new model

In [7]:
# model_generator     = create_model_generator(config.input_shape)
# model_discriminator = create_model_discriminator(config.input_shape, config.output_shape)

# save_model_gan(path_model_saved, model_generator, model_discriminator)
# save_plot_gan(path_model_plot, model_generator, model_discriminator)
# save_summary_gan(path_model_summary, model_generator, model_discriminator)

Initialize model from saved model

In [8]:
model_generator, model_discriminator = load_model_gan(path_model_saved)

Successfully loaded model from ../Resources/model_saved/v1-pix2pix\generator
Successfully loaded model from ../Resources/model_saved/v1-pix2pix\discriminator


---

In [9]:
model_gan = create_model_gan(model_generator, model_discriminator, config.input_shape)

#### Training

In [10]:
def log_progress_image(model_generator, epochs):
    path_sample = config.dir_progress_image_sample
    path = os.path.join(config.dir_progress_image, model_name)
    os.makedirs(path, exist_ok=True)
    
    file_names = os.listdir(path_sample)
    images = np.array([cv2.imread(os.path.join(path_sample, file_name), 0) for file_name in file_names])
    images = (np.array(images, 'float32') / 127.5 ) - 1
    images = model_generator(images).numpy()
    images = (images + 1) * 127.5
    images = np.array(images, 'uint8')
    for i, image in enumerate(images):
        cv2.imwrite(os.path.join(path, '%s-%s'%(str(epochs).zfill(5), file_names[i])), image)
    
def log_progress_loss(loss_real, loss_fake, loss_gan, epochs):
    path = os.path.join(config.dir_progress_loss, model_name)
    os.makedirs(path, exist_ok=True)
    
    with open(os.path.join(path, 'loss.txt'), 'a') as f:
        f.write("%d %.4f %.4f %.4f\n"%(epochs, loss_real, loss_fake, loss_gan))

In [11]:
def train(dataset, model_generator, model_discriminator, model_gan, epochs, offset=0):
    for e in range(epochs):
        sum_loss_real = 0
        sum_loss_fake = 0
        sum_loss_gan = 0
        
        t = time.time()
        
        for index in range(dataset.__len__()):
            print('Epochs: %d (iteration: %d/%d) [%.1f s]'%(e+1, index+1,dataset.__len__(), time.time()-t), end='\r')
            batch_grayscale, batch_color = dataset.__getitem__(index)
            batch_fake = model_generator(batch_grayscale).numpy()
            
            y_real = np.ones((len(batch_grayscale), config.patch_shape[0], config.patch_shape[1]))
            y_fake = np.zeros((len(batch_grayscale), config.patch_shape[0], config.patch_shape[1]))

            loss_real = model_discriminator.train_on_batch([batch_grayscale, batch_color], y_real)
            loss_fake = model_discriminator.train_on_batch([batch_grayscale, batch_fake], y_fake)
            loss_gan, _, _ = model_gan.train_on_batch(batch_grayscale, [batch_color, y_real])
            
            sum_loss_real += loss_real
            sum_loss_fake += loss_fake
            sum_loss_gan += loss_gan
            
        print()
        print('Loss Real: %.4f'%sum_loss_real)
        print('Loss Fake: %.4f'%sum_loss_fake)
        print('Loss GAN : %.4f'%sum_loss_gan)
        print()
        
        log_progress_image(model_generator, e+1+offset)
        log_progress_loss(sum_loss_real, sum_loss_fake, sum_loss_gan, e+1+offset)
            
        dataset.shuffle()
        
        save_model_gan(path_model_saved, model_generator, model_discriminator)
        print()

In [12]:
train(train_dataset, model_generator, model_discriminator, model_gan, 30, offset=0)

Epochs: 1 (iteration: 1000/1000) [969.9 s]
Loss Real: 212.1023
Loss Fake: 610.1647
Loss GAN : 8762.1548

Successfully saved model to ../Resources/model_saved/v1-pix2pix\generator
Successfully saved model to ../Resources/model_saved/v1-pix2pix\discriminator

Epochs: 2 (iteration: 1000/1000) [752.9 s]
Loss Real: 141.7094
Loss Fake: 684.8001
Loss GAN : 7490.9162

Successfully saved model to ../Resources/model_saved/v1-pix2pix\generator
Successfully saved model to ../Resources/model_saved/v1-pix2pix\discriminator

Epochs: 3 (iteration: 1000/1000) [751.4 s]
Loss Real: 131.6225
Loss Fake: 696.3476
Loss GAN : 7193.1104

Successfully saved model to ../Resources/model_saved/v1-pix2pix\generator
Successfully saved model to ../Resources/model_saved/v1-pix2pix\discriminator

Epochs: 4 (iteration: 1000/1000) [749.8 s]
Loss Real: 120.1866
Loss Fake: 701.5717
Loss GAN : 7050.9931

Successfully saved model to ../Resources/model_saved/v1-pix2pix\generator
Successfully saved model to ../Resources/model_