In [1]:
from random import random
import numpy as np
from numpy import load
from numpy import zeros
from numpy import ones
from numpy import asarray
from numpy.random import randint
import tensorflow as tf

from matplotlib import pyplot
 
# define the discriminator model
def define_discriminator(shape):
    # weight initialization
    padding="same"
    stride=(2,2)
    depth=[128,256,512,512]
    alpha=0.3
    filter_size=(4,1)
    strides=(2,2)	
    init = tf.keras.initializers.RandomNormal(stddev=0.02)

    x=tf.keras.Input(shape=shape)
    d = tf.keras.layers.Conv2D(64, filter_size, strides=strides, padding='same',kernel_initializer=init)(x)
    d = tf.keras.layers.LeakyReLU(alpha=0.2)(d)
    for i in depth:
        d = tf.keras.layers.Conv2D(i, filter_size, strides=strides, padding='same',kernel_initializer=init)(d)
        d = tf.keras.layers.BatchNormalization()(d)
        d = tf.keras.layers.LeakyReLU(alpha=0.2)(d)  
        d = tf.keras.layers.Conv2D(1,filter_size, padding='same',kernel_initializer=init)(d)
    # define model
    model = tf.keras.Model(x, d)
    # compile model
    model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5),loss_weights=[0.5])
    return model

def define_generator(shape):
    depth=[128,256,512]
    f_shape1=(7,7)
    f_shape2=(3,3)
    stride1=(2,1)
    dil1=(1,1)   
    padding="same"
    init = tf.keras.initializers.RandomNormal(stddev=0.02)
    n_resnet=3 # orginally 3
    x=tf.keras.Input(shape=shape)
    g = tf.keras.layers.Conv2D(64, f_shape1, padding=padding,kernel_initializer=init)(x)
    g = tf.keras.layers.BatchNormalization()(g)
    g = tf.keras.layers.LeakyReLU()(g)
    for i in depth:
        g = tf.keras.layers.Conv2D(i, f_shape2, padding=padding,strides=stride1,kernel_initializer=init)(g)
        g = tf.keras.layers.BatchNormalization()(g)
        g = tf.keras.layers.LeakyReLU()(g)
    # R256
    for _ in range(n_resnet):
        y=g
        g = tf.keras.layers.Conv2D(depth[-1], f_shape2, padding=padding,kernel_initializer=init)(g)
        g = tf.keras.layers.BatchNormalization()(g)
        g = tf.keras.layers.LeakyReLU()(g)
        g = tf.keras.layers.Conv2D(depth[-1], f_shape2, padding=padding,kernel_initializer=init)(g)
        g = tf.keras.layers.BatchNormalization()(g)
        g=tf.keras.layers.Concatenate()([g,y])
    # u128
    depth.reverse()
    for i in depth:        
        g = tf.keras.layers.Conv2DTranspose(i, f_shape2, padding=padding,strides=stride1,kernel_initializer=init)(g)
        g = tf.keras.layers.BatchNormalization()(g)
        g = tf.keras.layers.LeakyReLU()(g)

    g = tf.keras.layers.Conv2DTranspose(1, f_shape1, padding=padding,kernel_initializer=init)(g)
    g = tf.keras.layers.BatchNormalization()(g)
    g = tf.keras.activations.tanh(g)
    # define model
    model = tf.keras.Model(x,g)
    return model


    # define a composite model for updating generators by adversarial and cycle loss
def define_composite_model(g_model_1, d_model, g_model_2, image_shape):
    # ensure the model we're updating is trainable
    g_model_1.trainable = True
    # mark discriminator as not trainable
    d_model.trainable = False
    # mark other generator model as not trainable
    g_model_2.trainable = False
    # discriminator element
    input_gen = tf.keras.Input(shape=image_shape)
    gen1_out = g_model_1(input_gen)
    output_d = d_model(gen1_out)
    # identity element
    input_id = tf.keras.Input(shape=image_shape)
    output_id = g_model_1(input_id)
    # forward cycle
    output_f = g_model_2(gen1_out)
    # backward cycle
    gen2_out = g_model_2(input_id)
    output_b = g_model_1(gen2_out)
    # define model graph
    model = tf.keras.models.Model([input_gen, input_id], [output_d, output_id, output_f, output_b])
    # define optimization algorithm configuration
    opt = tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
    # compile model with weighting of least squares loss and L1 loss
    model.compile(loss=['mse', 'mae', 'mae', 'mae'], loss_weights=[1, 5, 10, 10], optimizer=opt)
    return model

 
# select a batch of random samples, returns images and target
def generate_real_samples(dataset, n_samples, patch_shape0,patch_shape1):
    # choose random instances
    ix = randint(0, dataset.shape[0], n_samples)
    # retrieve selected images
    X = dataset[ix]
    # generate 'real' class labels (1)
    y = ones((n_samples, patch_shape0, patch_shape1, 1))
    return X, y
 
    # generate a batch of images, returns images and targets
def generate_fake_samples(g_model, dataset,patch_shape0,patch_shape1):
    # generate fake instance
    X = g_model.predict(dataset)
    # create 'fake' class labels (0)
    y = zeros((len(X), patch_shape0, patch_shape1, 1))
    return X, y

    # save the generator models to file
def save_models(step, g_model_AtoB, g_model_BtoA,dA,dB):
    # save the first generator model
    filename1 = 'g_model_AtoB.h5'
    g_model_AtoB.save(filename1)
    # save the second generator model
    filename2 = 'g_model_BtoA.h5' 
    g_model_BtoA.save(filename2)
    dA.save('d_model_A.h5')
    dB.save('d_model_B.h5')

    print('>Saved: %s and %s' % (filename1, filename2))
 
    # generate samples and save as a plot and save the model
def summarize_performance(step, g_model, trainX, name, n_samples=5):
    # select a sample of input images
    X_in, _ = generate_real_samples(trainX, n_samples, 0)
    # generate translated images
    X_out, _ = generate_fake_samples(g_model, X_in, 0)
    # scale all pixels from [-1,1] to [0,1]
    X_in = (X_in + 1) / 2.0
    X_out = (X_out + 1) / 2.0
    # plot real images
    for i in range(n_samples):
        pyplot.subplot(2, n_samples, 1 + i)
        pyplot.axis('off')
        pyplot.imshow(X_in[i])
    # plot translated image
    for i in range(n_samples):
        pyplot.subplot(2, n_samples, 1 + n_samples + i)
        pyplot.axis('off')
        pyplot.imshow(X_out[i])
    # save plot to file
    filename1 = '%s_generated_plot_%06d.png' % (name, (step+1))
    pyplot.savefig(filename1)
    pyplot.close()
 
# update image pool for fake images
def update_image_pool(pool, images, max_size=50):
    selected = list()
    for image in images:
        if len(pool) < max_size:
            # stock the pool
            pool.append(image)
            selected.append(image)
        elif random() < 0.5:
            # use image, but don't add it to the pool
            selected.append(image)
        else:
            # replace an existing image and use replaced image
            ix = randint(0, len(pool))
            selected.append(pool[ix])
            pool[ix] = image
    return asarray(selected)
 
# train cyclegan models
def train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset):
    # n_epochs, n_batch, = 100, 1 # original
    n_epochs, n_batch, = 1, 1
    # determine the output square shape of the discriminator
    n_patch0 = d_model_A.output_shape[1]
    n_patch1 = d_model_A.output_shape[2]
    print(d_model_A.output_shape)
    # unpack dataset
    trainA, trainB = dataset
    # prepare image pool for fakes
    poolA, poolB = list(), list()
    # calculate the number of batches per training epoch
    bat_per_epo = int(len(trainA) / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # manually enumerate epochs
    for i in range(n_steps):
    # select a batch of real samples
        X_realA, y_realA = generate_real_samples(trainA, n_batch, n_patch0,n_patch1)
        X_realB, y_realB = generate_real_samples(trainB, n_batch, n_patch0,n_patch1)
        # generate a batch of fake samples
        X_fakeA, y_fakeA = generate_fake_samples(g_model_BtoA, X_realB, n_patch0,n_patch1)
        X_fakeB, y_fakeB = generate_fake_samples(g_model_AtoB, X_realA, n_patch0,n_patch1)
        # update fakes from pool
        X_fakeA = update_image_pool(poolA, X_fakeA)
        X_fakeB = update_image_pool(poolB, X_fakeB)
        # update generator B->A via adversarial and cycle loss
        g_loss2, _, _, _, _  = c_model_BtoA.train_on_batch([X_realB, X_realA], [y_realA, X_realA, X_realB, X_realA])
        # update discriminator for A -> [real/fake]
        dA_loss1 = d_model_A.train_on_batch(X_realA, y_realA)
        dA_loss2 = d_model_A.train_on_batch(X_fakeA, y_fakeA)
        # update generator A->B via adversarial and cycle loss
        g_loss1, _, _, _, _ = c_model_AtoB.train_on_batch([X_realA, X_realB], [y_realB, X_realB, X_realA, X_realB])
        # update discriminator for B -> [real/fake]
        dB_loss1 = d_model_B.train_on_batch(X_realB, y_realB)
        dB_loss2 = d_model_B.train_on_batch(X_fakeB, y_fakeB)
        # summarize performance
        print('>%d, dA[%.3f,%.3f] dB[%.3f,%.3f] g[%.3f,%.3f]' % (i+1, dA_loss1,dA_loss2, dB_loss1,dB_loss2, g_loss1,g_loss2))
        # evaluate the model performance every so often
#         if (i+1) % (bat_per_epo * 1) == 0:
          # plot A->B translation
#           summarize_performance(i, g_model_AtoB, trainA, 'AtoB')
          # plot B->A translation
#           summarize_performance(i, g_model_BtoA, trainB, 'BtoA')
        if (i+1) % 1 == 0:
            save_models(i, g_model_AtoB, g_model_BtoA,d_model_A,d_model_B)

# load image data
dataset_A = np.load('../input/gac-pia/data_gac.npz')["arr_0"][:2]
dataset_B = np.load('../input/gac-pia/data_pia.npz')["arr_0"][:2]
dataset_A=np.expand_dims(dataset_A,axis=-1)
dataset_B=np.expand_dims(dataset_B,axis=-1)
dataset=np.array([dataset_A, dataset_B])
print('Loaded', dataset[0].shape, dataset[1].shape)
# define input shape based on the loaded dataset
image_shape = dataset[0].shape[1:]
print("image_shape", image_shape)
# generator: A -> B
g_model_AtoB = define_generator(image_shape)
# generator: B -> A
g_model_BtoA = define_generator(image_shape)
# discriminator: A -> [real/fake]
d_model_A = define_discriminator(image_shape)
# discriminator: B -> [real/fake]
d_model_B = define_discriminator(image_shape)
# composite: A -> B -> [real/fake, A]
load=1
if load==1:
    g_model_AtoB.load_weights("../input/main-cs753/g_model_AtoB.h5")
    g_model_BtoA.load_weights("../input/main-cs753/g_model_BtoA.h5")
    d_model_A.load_weights("../input/main-cs753/d_model_A.h5")
    d_model_B.load_weights("../input/main-cs753/d_model_B.h5")
c_model_AtoB = define_composite_model(g_model_AtoB, d_model_B, g_model_BtoA, image_shape)
# composite: B -> A -> [real/fake, B]
c_model_BtoA = define_composite_model(g_model_BtoA, d_model_A, g_model_AtoB, image_shape)
# train models
train(d_model_A, d_model_B, g_model_AtoB, g_model_BtoA, c_model_AtoB, c_model_BtoA, dataset)

Loaded (2, 256, 128, 1) (2, 256, 128, 1)
image_shape (256, 128, 1)
(None, 8, 4, 1)
>1, dA[0.419,0.007] dB[0.364,0.002] g[6.155,6.513]
>Saved: g_model_AtoB.h5 and g_model_BtoA.h5
>2, dA[0.324,0.006] dB[0.255,0.014] g[6.733,6.777]
>Saved: g_model_AtoB.h5 and g_model_BtoA.h5


In [2]:
!ls ../input/main-cs753/

__notebook__.ipynb  __results__.html  d_model_A.h5  g_model_AtoB.h5
__output__.json     custom.css	      d_model_B.h5  g_model_BtoA.h5
