In [1]:
import keras
from keras.layers import Conv2D, MaxPool2D, UpSampling2D, Dense, Input, Flatten, Conv2DTranspose, BatchNormalization, Activation
from keras import Model, Sequential
from keras.initializers import RandomNormal
import tensorflow as tf
from tensorflow.keras.optimizers import Adam, RMSprop, Nadam
from tensorflow.keras.preprocessing.image import ImageDataGenerator

import os
import math
import numpy as np
import cv2
import pickle
import matplotlib.pyplot as plt
import re
import glob
from PIL import Image
from numpy.random import randn
from numpy.random import randint
from numpy import zeros, ones

In [2]:
OUTPUT_CHANNELS = 3

# Define the downsample function for the generator model
def downsample(filters, size, apply_batchnorm=True):
    # Weight initialization
    initializer = tf.random_normal_initializer(0., 0.02)

    # Define the model
    model = tf.keras.Sequential()
    model.add(
        tf.keras.layers.Conv2D(filters, size, strides=2, padding='same',
                               kernel_initializer=initializer, use_bias=False))
  
    # Apply batch normalization if required and add leaky ReLU activation function to the model
    if apply_batchnorm:
        model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.LeakyReLU())

    # Return the model
    return model

# Define the upsample function for the generator model
def upsample(filters, size, apply_dropout=False):
    # Weight initialization
    initializer = tf.random_normal_initializer(0., 0.02)

    # Define the model
    model = tf.keras.Sequential()
    model.add(
        tf.keras.layers.Conv2DTranspose(filters, size, strides=2,
                                        padding='same',
                                        kernel_initializer=initializer,
                                        use_bias=False))
  
    # Apply batch normalization if required and add ReLU activation function to the model
    model.add(tf.keras.layers.BatchNormalization())

    # Apply dropout if required
    if apply_dropout:
        model.add(tf.keras.layers.Dropout(0.5))

    model.add(tf.keras.layers.ReLU())

    # Return the model
    return model

input = 128

# Generator network
def Generator(in_shape=(256,256,3)):
    inputs = tf.keras.layers.Input(in_shape)
    
    down_stack = [
        downsample(64, 4, apply_batchnorm=False),  # (batch_size, 128, 128, 64)
        downsample(128, 4),  # (batch_size, 64, 64, 128)
        downsample(256, 4),  # (batch_size, 32, 32, 256)
        downsample(512, 4),  # (batch_size, 16, 16, 512)
        downsample(512, 4),  # (batch_size, 8, 8, 512)
        downsample(512, 4),  # (batch_size, 4, 4, 512)
        downsample(512, 4),  # (batch_size, 2, 2, 512)
        downsample(512, 4),  # (batch_size, 1, 1, 512)
    ]
    
    up_stack = [
        upsample(512, 4, apply_dropout=True),  # (batch_size, 2, 2, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 4, 4, 1024)
        upsample(512, 4, apply_dropout=True),  # (batch_size, 8, 8, 1024)
        upsample(512, 4),  # (batch_size, 16, 16, 1024)
        upsample(256, 4),  # (batch_size, 32, 32, 512)
        upsample(128, 4),  # (batch_size, 64, 64, 256)
        upsample(64, 4),  # (batch_size, 128, 128, 128)
    ]
    
    initializer = tf.random_normal_initializer(0., 0.02)
    last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4,
                                         strides=2,
                                         padding='same',
                                         kernel_initializer=initializer,
                                         activation='tanh')  # (batch_size, 256, 256, 3)
    
    x = inputs
    
    # Downsampling through the model
    skips = []
    
    for down in down_stack:
        x = down(x)
        skips.append(x)
        
    skips = reversed(skips[:-1])
    
    # Upsampling and establishing the skip connections
    for up, skip in zip(up_stack, skips):
        x = up(x)
        x = tf.keras.layers.Concatenate()([x, skip])
        
    x = last(x)
    
    return tf.keras.Model(inputs=inputs, outputs=x)

LAMBDA = 100
loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True)

# Define the loss function for the generator model 
def generator_loss(disc_generated_output, gen_output, target):
    # Binary cross-entropy loss function for the generator model
    gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output)

    # Mean absolute error loss function for the generator model 
    l1_loss = tf.reduce_mean(tf.abs(target - gen_output))

    # Total generator loss = gan loss + (LAMBDA * l1 loss) 
    total_gen_loss = gan_loss + (LAMBDA * l1_loss)

    # Return the total generator loss, gan loss, and l1 loss
    return total_gen_loss, gan_loss, l1_loss

In [3]:
gen = Generator()
gen.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 sequential (Sequential)        (None, 128, 128, 64  3072        ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 sequential_1 (Sequential)      (None, 64, 64, 128)  131584      ['sequential[0][0]']             
                                                                                              

In [47]:
# Define the global discriminator model
def Global_Discriminator(in_shape=(256, 256, 3)):
    # Weight initialization
    init = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    # Define the input layer
    inputs = tf.keras.layers.Input(in_shape)
    
    # Define the convolutional layers
    conv1 = tf.keras.layers.Conv2D(32, 5, strides=2, padding='same', kernel_initializer=init)(inputs)
    conv1 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv1)
    
    conv2 = tf.keras.layers.Conv2D(64, 5, strides=2, padding='same', kernel_initializer=init)(conv1)
    conv2 = tf.keras.layers.BatchNormalization()(conv2)
    conv2 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv2)
    
    conv3 = tf.keras.layers.Conv2D(128, 5, strides=2, padding='same', kernel_initializer=init)(conv2)
    conv3 = tf.keras.layers.BatchNormalization()(conv3)
    conv3 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv3)
    
    conv4 = tf.keras.layers.Conv2D(256, 5, strides=2, padding='same', kernel_initializer=init)(conv3)
    conv4 = tf.keras.layers.BatchNormalization()(conv4)
    conv4 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv4)

    conv4 = tf.keras.layers.Conv2D(256, 5, strides=2, padding='same', kernel_initializer=init)(conv4)
    conv4 = tf.keras.layers.BatchNormalization()(conv4)
    conv4 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv4)
    
    x = tf.keras.layers.Flatten()(conv4)
    x = tf.keras.layers.Dense(512, activation='relu')(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    # Define the discriminator model
    model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    
    return model

def Local_Discriminator(input_shape=(256, 256, 3)):
    # Weight initialization
    init = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    # Define the input layer
    inputs = tf.keras.layers.Input(shape=input_shape)
    
    # Define the convolutional layers
    conv1 = tf.keras.layers.Conv2D(64, 3, strides=1, padding='same', kernel_initializer=init)(inputs)
    conv1 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv1)
    
    conv2 = tf.keras.layers.Conv2D(128, 3, strides=2, padding='same', kernel_initializer=init)(conv1)
    conv2 = tf.keras.layers.BatchNormalization()(conv2)
    conv2 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv2)
    
    conv3 = tf.keras.layers.Conv2D(256, 3, strides=2, padding='same', kernel_initializer=init)(conv2)
    conv3 = tf.keras.layers.BatchNormalization()(conv3)
    conv3 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv3)
    
    conv4 = tf.keras.layers.Conv2D(512, 3, strides=2, padding='same', kernel_initializer=init)(conv3)
    conv4 = tf.keras.layers.BatchNormalization()(conv4)
    conv4 = tf.keras.layers.LeakyReLU(alpha=0.2)(conv4)
    
    x = tf.keras.layers.Flatten()(conv4)
    x = tf.keras.layers.Dense(512, activation='relu')(x)
    outputs = tf.keras.layers.Dense(1, activation='sigmoid')(x)

    # Define the discriminator model
    model = tf.keras.models.Model(inputs=inputs, outputs=outputs)
    
    return model


# Define the PatchGAN discriminator model
def PGAN_Discriminator(input_shape=(256, 256, 3)):
    # Weight initialisation
    initializer = tf.keras.initializers.RandomNormal(mean=0.0, stddev=0.02)

    # Define the input layer
    inp = tf.keras.layers.Input(shape=input_shape, name='input_image')
    # Define the target layer
    tar = tf.keras.layers.Input(shape=input_shape, name='target_image')

    # Concatenate the input and target images
    x = tf.keras.layers.concatenate([inp, tar])  # (batch_size, 256, 256, channels*2)

    # Define the convolutional layers
    down1 = downsample(64, 4, False)(x)  # (batch_size, 128, 128, 64)
    down2 = downsample(128, 4)(down1)  # (batch_size, 64, 64, 128)
    down3 = downsample(256, 4)(down2)  # (batch_size, 32, 32, 256)

    zero_pad1 = tf.keras.layers.ZeroPadding2D()(down3)  # (batch_size, 34, 34, 256)
    conv = tf.keras.layers.Conv2D(512, 4, strides=1,
                                kernel_initializer=initializer,
                                use_bias=False)(zero_pad1)  # (batch_size, 31, 31, 512)
  
    # Define the batch normalisation layer
    batchnorm1 = tf.keras.layers.BatchNormalization()(conv)

    # Define the leaky ReLU layer
    leaky_relu = tf.keras.layers.LeakyReLU()(batchnorm1)

    zero_pad2 = tf.keras.layers.ZeroPadding2D()(leaky_relu)  # (batch_size, 33, 33, 512)

    # Define the last convolutional layer
    last = tf.keras.layers.Conv2D(1, 4, strides=1,
                                kernel_initializer=initializer)(zero_pad2)  # (batch_size, 30, 30, 1)
  
    # Define the discriminator model
    return tf.keras.Model(inputs=[inp, tar], outputs=last)

In [52]:
def discriminator_loss(disc_real_output, disc_generated_output):
    real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output)

    generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output)

    total_disc_loss = real_loss + generated_loss

    return total_disc_loss

In [55]:
disc = Global_Discriminator()
disc.summary()

disc2 = Local_Discriminator()
disc2.summary()

disc3 = PGAN_Discriminator()
disc3.summary()

Model: "model_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_12 (InputLayer)       [(None, 256, 256, 3)]     0         
                                                                 
 conv2d_63 (Conv2D)          (None, 128, 128, 32)      2432      
                                                                 
 leaky_re_lu_61 (LeakyReLU)  (None, 128, 128, 32)      0         
                                                                 
 conv2d_64 (Conv2D)          (None, 64, 64, 64)        51264     
                                                                 
 batch_normalization_71 (Bat  (None, 64, 64, 64)       256       
 chNormalization)                                                
                                                                 
 leaky_re_lu_62 (LeakyReLU)  (None, 64, 64, 64)        0         
                                                          

                                                                                                  
 leaky_re_lu_73 (LeakyReLU)     (None, 31, 31, 512)  0           ['batch_normalization_80[0][0]'] 
                                                                                                  
 zero_padding2d_5 (ZeroPadding2  (None, 33, 33, 512)  0          ['leaky_re_lu_73[0][0]']         
 D)                                                                                               
                                                                                                  
 conv2d_76 (Conv2D)             (None, 30, 30, 1)    8193        ['zero_padding2d_5[0][0]']       
                                                                                                  
Total params: 2,770,433
Trainable params: 2,768,641
Non-trainable params: 1,792
__________________________________________________________________________________________________


In [56]:
#g_model.compile(optimizer=Adam(learning_rate=1e-4), loss=['mse'])

g_model = Generator(in_shape=(256,256,3))
d_model = Global_Discriminator(in_shape=(256,256,3))

def define_gan(g_model, d_model, in_shape=(256,256,3)):
    # make weights in the discriminator not trainable
    d_model.trainable = False
    
    # define the source image
    in_src = Input(shape=in_shape)
    
    # connect the source image to the generator input
    gen_out = g_model(in_src)
    
    # connect the source input and generator output to the discriminator input
    dis_out = d_model(gen_out)
    
    # src image as input, generated image and classification output
    model = Model(in_src, [dis_out, gen_out])
    
    # compile model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'mse'], 
                  optimizer=opt, 
                  loss_weights=[1,100])
    
    return model

gan_model = define_gan(g_model, d_model, in_shape=(256,256,3))

### Generate real/fake samples

In [32]:
# select a batch of random samples, returns images and target

def generate_real_samples(dataset, n_samples):
    trainA = dataset
    
    # choose random instances
    ix = np.random.randint(0, trainA.shape[0], n_samples)
    
    # retrieve selected images
    X1 = trainA[ix]
    
    # generate 'real' class labels (1)
    y = np.ones((n_samples, 1))
    
    return X1, y, ix

# generate a batch of images, returns images and targets
def generate_fake_samples(g_model, samples):
    # generate fake instance
    X = g_model.predict(samples)
    
    # create 'fake' class labels (0)
    y = zeros((len(X), 1))
    
    return X, y


In [39]:
data_dir = 'C:/Users/chris/2023-mcm-master/src/data/masked_images_split/train'
#image_files = data_dir + '/**/*_masked.png'

orig_data_dir = 'C:/Users/chris/2023-mcm-master/src/data/dataset_split/train'
#orig_image_files = orig_data_dir + '/**/*.png'

# Define the ImageDataGenerator for training data
train_datagen = ImageDataGenerator(rescale=1./255)

# Load and preprocess training data
train_masked = train_datagen.flow_from_directory(
    data_dir,  # Pass the directory path, not the file paths
    target_size=(256, 256),  # Specify the target size of the images
    batch_size=32,
    class_mode='input',  # Use 'input' for input modeling
    shuffle=True
)

# Load and preprocess training data from orig_data_dir
train_orig = train_datagen.flow_from_directory(
    orig_data_dir,  # Pass the directory path, not the file paths
    target_size=(256, 256),
    batch_size=32,
    class_mode='input',
    shuffle=True
)

Found 7413 images belonging to 22 classes.
Found 7686 images belonging to 22 classes.


In [44]:
# Custom generator that yields both masked and original images
def combined_generator(masked_generator, orig_generator):
    while True:
        masked_images, _ = masked_generator.next()
        orig_images, _ = orig_generator.next()
        yield masked_images, orig_images

# Create the combined generator
combined_train_generator = combined_generator(train_masked, train_orig)

### Training

In [33]:
def train(d_model, g_model, gan_model, dataset_real, dataset_damaged, n_epochs=100, n_batch=32,n_batch_test=10): 
    # unpack dataset
    #trainA= image_names
    trainA= dataset_real
    # calculate the number of batches per training epoch
    bat_per_epo = int(len(trainA) / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # manually enumerate epochs

    dlossreal=[]
    dlossfake=[]
    glossbce=[]
    glossmae=[]
    steps=[]
    for k in range(1,n_epochs+1):
        for i in range(bat_per_epo):
            # select a batch of real samples
            #X_real, y_real ,damaged_images= generate_real_samples(image_names, n_batch)  
            X_real, y_real ,image_index= generate_real_samples(dataset_real, n_batch)
            # generate a batch of fake samples
            #X_fake, y_fake = generate_fake_samples(g_model, damaged_images)
            X_fake, y_fake = generate_fake_samples(g_model, dataset_damaged[image_index])
            if (i==124):
                plt.figure(figsize=(20, 6))
                for m in range(5):
                # Display original
                    ax = plt.subplot(3, 5, m + 1)
                    plt.imshow(X_fake[m][:,:,::-1])
                    ax.get_xaxis().set_visible(False)
                    ax.get_yaxis().set_visible(False)
            
                    ax = plt.subplot(3, 5, m + 1 + 5)
                    plt.imshow(X_real[m][:,:,::-1])
                    ax.get_xaxis().set_visible(False)
                    ax.get_yaxis().set_visible(False)
                    
                    ax = plt.subplot(3, 5, m + 1 + 10)
                    plt.imshow(dataset_damaged[image_index][m][:,:,::-1])
                    ax.get_xaxis().set_visible(False)
                    ax.get_yaxis().set_visible(False)
                    
                plt.show() 
        
                # update discriminator for real samples
               # d_model.trainable = True
            d_loss_real = d_model.train_on_batch(X_real, y_real)
            # update discriminator for generated samples
            d_loss_fake = d_model.train_on_batch(X_fake, y_fake)
            # update the generator
            #gloss_all,g_loss_BCE,g_loss_mae= gan_model.train_on_batch(damaged_images, [y_real, X_real])
            gloss_all,g_loss_BCE,g_loss_mae = gan_model.train_on_batch(dataset_damaged[image_index], [y_real, X_real])
            # summarize performance
            if(i==124):
              #print('>%d, d_real[%.5f] d_fake[%.5f] g_BCE[%.5f] g_mae[%.5f]' % (i+1, d_loss_real, d_loss_fake, g_loss_BCE,g_loss_mae))
              print('epochs[%d],batch[%d], d_real[%.5f] d_fake[%.5f] g_BCE[%.5f] g_mae[%.5f]' % (k,i+1, d_loss_real, d_loss_fake, g_loss_BCE,g_loss_mae))
            dlossreal.append(d_loss_real)
            dlossfake.append(d_loss_fake)
            glossbce.append(g_loss_BCE)
            glossmae.append(g_loss_mae)
            #steps.append(i)
            steps.append(k*i)
    plt.plot(steps,dlossreal)
    plt.plot(steps,dlossfake)
    plt.title('Discriminator loss')
    plt.ylabel('loss')
    plt.xlabel('batches')
    plt.legend(['Discriminator real loss', 'Discriminator fake loss'], loc='upper right')
    plt.show()
    
    plt.plot(steps,glossbce)
    plt.plot(steps,glossmae)
    plt.title('Generator loss')
    plt.ylabel('loss')
    plt.xlabel('batches')
    plt.legend(['GAN BCE train loss', 'Generator MAE train loss'], loc='upper right')
    plt.show()
    

In [45]:
# Calculate the number of steps per epoch
steps_per_epoch = min(len(train_masked), len(train_orig))

#train_x = pickle.load(open("C:/Users/chris/2023-mcm-master/src/data/dataset_split/train_images.pkl", "rb"))
#train_Y = pickle.load(open("C:/Users/chris/2023-mcm-master/src/data/dataset_split/orig_train_images.pkl", "rb"))

# Create the custom data generator
train_cgan = custom_data_generator(train_masked, train_orig)

# Train the model using fit
history = gan_model.fit(
    train_cgan,
    steps_per_epoch=steps_per_epoch,
    epochs=50,
    verbose=1
)

"""
print('Start of training')
def train(d_model, g_model, gan_model, dataset_real, dataset_damaged, n_epochs=100, n_batch=32, n_batch_test=10):
    # Convert dataset_real to a NumPy array
    dataset_real = np.array(dataset_real)

gan_model.save('C:/Users/chris/2023-mcm-master/src/data/model_results/faces_inpaint_gan.h5',overwrite=True)
g_model.save('C:/Users/chris/2023-mcm-master/src/data/model_results/faces_inpaint_g.h5',overwrite=True)
d_model.save('C:/Users/chris/2023-mcm-master/src/data/model_results/faces_inpaint_d.h5',overwrite=True)

for m in range(0,7):
    pyplot.figure(figsize=(20, 6))
    
    for i in range(5):
        # Display original
        ax = pyplot.subplot(3, 5, i + 1)
        pyplot.imshow(damaged_images[i+5*m][:,:,::-1])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        # Display reconstruction
        ax = plt.subplot(3, 5, i + 1 + 5)
        plt.imshow(pred[i+5*m])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        # Display ground truth
        ax = pyplot.subplot(3, 5, i + 1 + 10)
        pyplot.imshow(x_train[i+5*m][:,:,::-1])
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    
    plt.show()
"""

Epoch 1/50


ValueError: in user code:

    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\engine\training.py", line 1021, in train_function  *
        return step_function(self, iterator)
    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\engine\training.py", line 1010, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\engine\training.py", line 1000, in run_step  **
        outputs = model.train_step(data)
    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\engine\training.py", line 859, in train_step
        y_pred = self(x, training=True)
    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\utils\traceback_utils.py", line 67, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\Users\chris\anaconda3\envs\snowflakes\lib\site-packages\keras\engine\input_spec.py", line 200, in assert_input_compatibility
        raise ValueError(f'Layer "{layer_name}" expects {len(input_spec)} input(s),'

    ValueError: Layer "model_4" expects 1 input(s), but it received 2 input tensors. Inputs received: [<tf.Tensor 'IteratorGetNext:0' shape=(None, None, None, None) dtype=float32>, <tf.Tensor 'IteratorGetNext:1' shape=(None, None, None, None) dtype=float32>]
