In [2]:
import tensorflow as tf
from tensorflow.keras.layers import Input, Reshape, Dropout, Dense
from tensorflow.keras.layers import Flatten, BatchNormalization
from tensorflow.keras.layers import Activation, ZeroPadding2D
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import UpSampling2D, Conv2D
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.optimizers import Adam
import numpy as np
from PIL import Image
from tqdm import tqdm    # progress meter tool
import os
import time
import matplotlib.pyplot as plt

In [None]:
GENERATE_RES = 3 # Generation resolution factor 
# (1=32, 2=64, 3=96, 4=128, etc.)
GENERATE_SQUARE = 32 * GENERATE_RES # rows/cols (should be square)
IMAGE_CHANNELS = 3

# Preview image 
PREVIEW_ROWS = 4
PREVIEW_COLS = 7
PREVIEW_MARGIN = 16

# Size vector to generate images from
SEED_SIZE = 100

# Configuration
DATA_PATH = 'images/images/'
EPOCHS = 15
BATCH_SIZE = 20
BUFFER_SIZE = 60000

In [None]:
# Load and preprocess images:

training_binary_path = os.path.join(DATA_PATH, f'training_data_{GENERATE_SQUARE}_{GENERATE_SQUARE}.npt')
print(f"Looking for file: {training_binary_path}")

if not os.path.isfile(training_binary_path):
    start = time.time9)
    print("Loading training images...")
    
    training data = []
    
    for filename in tqdm(os.listdir(DATA_PATH)):
        path = os.path.join(DATA_PATH, filename)
        image = Image.open(path).resizse((GENERATE_SQUARE, GENERATE_SQUARE), Image.ANTIALIAS)
        training_data.append(np.asarray(image))
    training_data = np.reshape(training_data, (-1, GENERATE_SQUARE, GENERATE_SQUARE, IMAGE_CHANNELS))  #reshape: why?
    training_data = training_data.astype(np.float32)
    training_data = training_data / 127.5 - 1  # looks like a scaler
    
    print("Saving training image binary...")
    np.save(training_binary_path,training_data)   # save an array to a binary file in numpy format
    elapsed = time.time()-start
    print (f'Image preprocess time: {hms_string(elapsed)}')

else:
    print("Loading previous training pickle...")
    training_data = np.load(training_binary_path)

In [None]:
# Batch and shuffle data
train_dataset = tf.data.Dataset.from_tensor_slices(training_data).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# The tf.data.Dataset.shuffle() method randomly shuffles a tensor along its first dimension.
# buffer_size: This is the number of elements from which the new dataset will be sampled.

In [4]:
def build_generator(seed_size, channels):
    model = Sequential()  # initialize model
    
    model.add(Dense(4*4*256, activation='relu', input_dim=seed_size))   # why 4*4*256? -->  image shape / (# of upsamples & strategy)
    model.add(Reshape((4,4,256)))
              
    model.add(UpSampling2D())
    model.add(Conv2D(256, kernel_size=3, padding='same'))
    model.add(BatchNormalization(momentum=0.8))  
    # Batch normalization applies a transformation that maintains the mean output close to 0 
    #    and the output standard deviation close to 1.      But why do we need this?  ---> for stabilization to give better results
    model.add(Activation('relu'))
    
    # Repeat layer
    model.add(UpSampling2D())
    model.add(Conv2D(256, kernel_size=3, padding='same'))
    model.add(BatchNormalization(momentum=0.8)) 
    model.add(Activation('relu'))
    
    # Output resolution, additional upsampling
    model.add(UpSampling2D())
    model.add(Conv2D(128, kernel_size=3, padding='same'))    # goes to 128 --> this number is arbitrary, but comes from a proven architecture. Typicall 2^n
    model.add(BatchNormalization(momentum=0.8))  
    model.add(Activation('relu'))
    
    if GENERATE_RES>1:
        model.add(UpSampling2D(size=(GENERATE_RES,GENERATE_RES)))
        model.add(Conv2D(128, kernel_size=3, padding='same'))
        model.add(BatchNormalization(momentum=0.8))  
        model.add(Activation('relu'))
    
    # Final CNN layer
    model.add(Conv2D(channels, kernel_size=3, padding='same'))
    model.add(Activation('tanh'))
    
    return model

def build_discriminator(image_shape):
    model = Sequential()
    
    model.add(Conv2D(32, kernel_size = 3, strides=2, input_shape=image_shape, padding='same'))
    model.add(LeakyReLU(alpha=0.2))  # Leaky activation layer. Why do we use leaky though? --> prevents model collapse (increases stability) 
                                                                                                # since it allows negative values to pass through
    
    model.add(Dropout(0.25)) # Helps prevent overfitting
    model.add(Conv2D(64, kernel_size=3, strides=2, padding='same'))
    model.add(ZeroPadding2D(padding=((0,1),(0,1))))  # Why do we need zero padding?
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    
    # Increase Conv2D nodes, decrease strides. Remove ZeroPadding.   --> strides decreases the output nodes
    model.add(Dropout(0.25))
    model.add(Conv2D(128, kernel_size=3, strides=1, padding='same'))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Dropout(0.25))
    model.add(Conv2D(256, kernel_size=3, strides=1, padding='same'))
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Dropout(0.25))
    model.add(Conv2D(512, kernel_size=3, strides=1, padding='same'))  # Why do we go to 512?  Based on proven architecture
    model.add(BatchNormalization(momentum=0.8))
    model.add(LeakyReLU(alpha=0.2))
    
    model.add(Dropout(0.25))
    model.add(Flatten())
    model.add(Dense(1, activation = 'sigmoid'))
    
    model.summary()
    
    return model

In [None]:
generator = build_generator(SEED_SIZE, IMAGE_CHANNELS)
# build_generator returns a "model" object

noise = tf.random.normal([1, SEED_SIZE])  # Seed size = 100
generated_image = generator(noise, training=False)

plt.imshow(generated_image[0, :, :, 0])

In [None]:
# What is going on here? Generating a 4x7 grid of the generated images
def save_images(cnt,noise):
    image_array = np.full(( 
      PREVIEW_MARGIN + (PREVIEW_ROWS * (GENERATE_SQUARE+PREVIEW_MARGIN)), 
      PREVIEW_MARGIN + (PREVIEW_COLS * (GENERATE_SQUARE+PREVIEW_MARGIN)), IMAGE_CHANNELS), 
      255, dtype=np.uint8)

    generated_images = generator.predict(noise)

    generated_images = 0.5 * generated_images + 0.5

    image_count = 0
    for row in range(PREVIEW_ROWS):
        for col in range(PREVIEW_COLS):
            r = row * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
            c = col * (GENERATE_SQUARE+16) + PREVIEW_MARGIN
            image_array[r:r+GENERATE_SQUARE,c:c+GENERATE_SQUARE] \
                = generated_images[image_count] * 255
            image_count += 1


    output_path = os.path.join(DATA_PATH,'output')
    if not os.path.exists(output_path):
        os.makedirs(output_path)

    filename = os.path.join(output_path,f"train-{cnt}.png")
    im = Image.fromarray(image_array)
    im.save(filename)

In [None]:
image_shape = (GENERATE_SQUARE,GENERATE_SQUARE,IMAGE_CHANNELS)

discriminator = build_discriminator(image_shape)
decision = discriminator(generated_image)
print (decision)

In [None]:
cross_entropy = tf.keras.losses.BinaryCrossentropy()   # Calculates loss

def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [None]:
generator_optimizer = tf.keras.optimizers.Adam(1.5e-4,0.5)
discriminator_optimizer = tf.keras.optimizers.Adam(1.5e-4,0.5)

In [None]:
@tf.function  # This causes the function to be precompiled and improves performance.
def train_step(images):
    seed = tf.random.normal([BATCH_SIZE, SEED_SIZE])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        # GradientTape: "Record operations for automatic differentiation."
        generated_images = generator(seed, training=True)

        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)


        gradients_of_generator = gen_tape.gradient(\
            gen_loss, generator.trainable_variables)
        gradients_of_discriminator = disc_tape.gradient(\
            disc_loss, discriminator.trainable_variables)

        # Do these update gen_loss / disc_loss? --> yes, through apply_gradients
        generator_optimizer.apply_gradients(zip(
            gradients_of_generator, generator.trainable_variables))
        discriminator_optimizer.apply_gradients(zip(
            gradients_of_discriminator, 
            discriminator.trainable_variables))
        
    return gen_loss,disc_loss

In [None]:
def train(dataset, epochs):
    fixed_seed = np.random.normal(0, 1, (PREVIEW_ROWS * PREVIEW_COLS, 
                                   SEED_SIZE))
    start = time.time()

    for epoch in range(epochs):
        epoch_start = time.time()

        gen_loss_list = []
        disc_loss_list = []

        for image_batch in dataset:
            t = train_step(image_batch)
            gen_loss_list.append(t[0])
            disc_loss_list.append(t[1])

        g_loss = sum(gen_loss_list) / len(gen_loss_list)
        d_loss = sum(disc_loss_list) / len(disc_loss_list)

        epoch_elapsed = time.time()-epoch_start
        print (f'Epoch {epoch+1}, gen loss={g_loss},disc loss={d_loss},'\
               f' {hms_string(epoch_elapsed)}')
        save_images(epoch,fixed_seed)

    elapsed = time.time()-start
    print (f'Training time: {hms_string(elapsed)}')

In [None]:
train(train_dataset, EPOCHS)