In [79]:
# Tensorflow
import tensorflow as tf
# Image data and layers pacakges
from tensorflow.keras.preprocessing.image import ImageDataGenerator, array_to_img
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten, Reshape, LeakyReLU, Dropout, UpSampling2D, MaxPool2D, Rescaling
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomZoom
# Losses and Optimizers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import BinaryCrossentropy

# Base model class to subclass for training step
from tensorflow.keras.models import Model

# Load images from directory
from tensorflow.keras.utils import image_dataset_from_directory
# For callbacks and checkpoints
from tensorflow.keras.callbacks import Callback
from tensorflow.train import Checkpoint
# Extra
import numpy as np
import os

In [80]:
gpus = tf.config.experimental.list_physical_devices("GPU")
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

In [81]:
os.chdir("C:\\GANs\\zelda_gan\\code")
resize_path = "..\\resized_images"

In [82]:
batch_size = 32
img_height = 40
img_width = 30

In [83]:
def load_images(in_dir, img_height, img_width, batch_size, save_dir):
    norm = lambda x: x.astype("float32")/255
    dg = ImageDataGenerator(preprocessing_function = img_norm, zoom_range = 0.25, 
                                  horizontal_flip = True, rotation_range = 0.05)
    x_train = dg.flow_from_directory(in_dir,
                                            target_size = (img_height, img_width),
                                            batch_size = batch_size,
                                            shuffle = True,
                                            save_to_dir = save_dir,
                                            classes = None,
                                            class_mode = None,
                                            subset = "training")
    return x_train

In [84]:

img_norm = tf.keras.layers.Rescaling(1./255)
train_ds = image_dataset_from_directory(resize_path, 
                                       labels = None, 
                                       color_mode = 'grayscale', 
                                       image_size = (img_height , img_width),
                                       batch_size = batch_size, 
                                       shuffle = True)
train_ds = train_ds.map(img_norm)
AUTOTUNE = tf.data.AUTOTUNE

train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)

Found 2340 files belonging to 1 classes.


In [85]:
# Generative NN
def build_gen_nn():
    # Initialize Model
    model = Sequential()

    # Takes in random values and reshape it to 7x7x64
    model.add(Dense(20*15*128, input_dim = 64))
    model.add(LeakyReLU(.2))
    model.add(Reshape((20, 15, 128)))

    # Effectively doubles the size of previous layer
    model.add(UpSampling2D())
    model.add(Conv2D(128, 5, padding = "same"))
    model.add(LeakyReLU(.2))

    # Second Upsampling block
    model.add(UpSampling2D())
    model.add(Conv2D(256, 5, padding = "same"))
    model.add(LeakyReLU(.2))
    
        
    # Down Sample
    model.add(MaxPool2D())
    model.add(Conv2D(256, 5, padding = "same"))
    model.add(LeakyReLU(.2))
    
    # Normal Convolutional Block
    model.add(Conv2D(256, 4, padding = "same"))
    model.add(LeakyReLU(.2))
    
    # Normal Convolutional Block
    model.add(Conv2D(512, 4, padding = "same"))
    model.add(LeakyReLU(.2))
    
    # Second Convolutional Block
    model.add(Conv2D(512, 4, padding = "same"))
    model.add(LeakyReLU(.2))


    # Conv layer to get to one channel
    model.add(Conv2D(1, 4, padding = 'same', activation = "sigmoid"))
    
    return model

In [86]:
def build_disc_nn():
    model = Sequential()

    model.add(Conv2D(32, 5, input_shape = (40,30,1)))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))

    model.add(Conv2D(64, 5))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))

    model.add(Conv2D(128, 5))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))

    model.add(Conv2D(256, 5))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))
    
    model.add(Conv2D(512, 5))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))

    model.add(Conv2D(512, 5))
    model.add(LeakyReLU(.2))
    model.add(Dropout(.2))

    # Flatten then pass to a dense layer
    model.add(Flatten())
    model.add(Dropout(.4))
    model.add(Dense(1, activation = "sigmoid"))

    return model

In [87]:
class ZelGAN(Model):
    def __init__(self, generator, discriminator, *args, **kwargs):
        # pass args and kwards to base class
        super().__init__(*args, **kwargs)
        
        # Create attributes for two models
        self.generator = generator
        self.discriminator = discriminator
        self.transformation = Sequential([
  RandomFlip("horizontal_and_vertical"),
  RandomRotation(0.2),
  RandomZoom(height_factor=(-0.1, 0.1), width_factor = (-0.1, 0.1))
])

        pass
    
    def compile(self, gen_opt, disc_opt, gen_loss, disc_loss, *args, **kwargs):
        # Compile with base class
        super().compile(*args, **kwargs)

        # Create attributes for losses and optimizers
        self.gen_opt = gen_opt
        self.disc_opt = disc_opt
        self.gen_loss = gen_loss
        self.disc_loss = disc_loss
    
    def train_step(self, batch):
        # Get the data
        real_images = self.transformation(batch)
        gen_images = self.generator(tf.random.normal((32, 64, 1), 0, 2), training = False)
        # Train the discriminator
        with tf.GradientTape() as disc_tape:
            # Pass the real and generated images to the discriminator
            yhat_real = self.discriminator(real_images, training = True)
            yhat_gen = self.discriminator(gen_images, training = True)
            yhat_all = tf.concat([yhat_real, yhat_gen], axis = 0)

            # Creat labels for real and generated images
            y_all = tf.concat([tf.zeros_like(yhat_real), tf.ones_like(yhat_gen)], axis = 0)

            # Add some noise to the TRUE outputs
            noise_real = .1 * tf.random.normal(tf.shape(yhat_real), 0, 2)
            noise_gen = -.1 * tf.random.normal(tf.shape(yhat_gen), 0, 2)
            y_all += tf.concat([noise_real, noise_gen], axis = 0)

            # Calculate loss
            total_disc_loss = self.disc_loss(y_all, yhat_all)

        # Apply backprop
        disc_grad = disc_tape.gradient(total_disc_loss, self.discriminator.trainable_variables)
        self.disc_opt.apply_gradients(zip(disc_grad, self.discriminator.trainable_variables))

        with tf.GradientTape() as gen_tape:
            # Generate some new images
            gen_images = self.generator(tf.random.normal((64, 64, 1), 0, 2), training = True)
            
            # Create the predicted labels
            predicted_labels = self.discriminator(gen_images, training = False)
            
            # Calculate Loss
            total_gen_loss = self.gen_loss(tf.zeros_like(predicted_labels), predicted_labels)
            
        # Apply Backprop
        gen_grad = gen_tape.gradient(total_gen_loss, self.generator.trainable_variables)
        self.gen_opt.apply_gradients(zip(gen_grad, self.generator.trainable_variables))
        
        

        return {"disc_loss" : total_disc_loss, "gen_loss" : total_gen_loss}

In [88]:
# Initialize Networks
generator = build_gen_nn()
discriminator = build_disc_nn()

# Set up Losses and Optimizers
gen_opt = Adam(learning_rate = .000005, beta_1=0.8, clipvalue=1.0)
disc_opt = Adam(learning_rate = .0000005, beta_1=0.8, clipvalue=1.0)
gen_loss = BinaryCrossentropy()
disc_loss = BinaryCrossentropy()


In [89]:
Zelda = ZelGAN(generator, discriminator)
Zelda.compile(gen_opt, disc_opt, gen_loss, disc_loss)

In [90]:
class ModelMonitor(Callback):
    def __init__(self, num_img = 1, latent_dim = 64):
        self.num_img = num_img
        self.latent_dim = latent_dim

    def on_epoch_end(self, epoch, logs = None):
        random_latent_vectors = tf.random.normal((self.num_img, self.latent_dim, 1), 0, 2)
        generated_images = self.model.generator(random_latent_vectors)
        generated_images *= 255
        generated_images.numpy()
        for i in range(self.num_img):
            img = array_to_img(generated_images[i])
            img.save(os.path.join("..\\progress_images", f"generated_img_{epoch}_{i}.png"))
        


In [91]:
#Zelda.generator.load_weights('..//saves//generator_norm.h5')
#Zelda.discriminator.load_weights('..//saves//discriminator_norm.h5')

In [None]:
hist = Zelda.fit(train_ds, epochs = 1024, callbacks = [ModelMonitor()])

Epoch 1/1024
10/74 [===>..........................] - ETA: 1:11 - disc_loss: 0.6928 - gen_loss: 0.6942

In [None]:
Zelda.generator.save('..//saves//generator_norm.h5')
Zelda.discriminator.save('..//saves//discriminator_norm.h5')

In [None]:
#random_latent_vectors = tf.random.normal((50, 64, 1))
#generated_images = Zelda.generator(random_latent_vectors)
#generated_images *= 255
#generated_images.numpy()
#for i in range(50):
#    img = array_to_img(generated_images[i])
#    img.save(os.path.join("..\\progress_images", f"generated_img_{i}.png"))