In [1]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import matplotlib.pyplot as plt
import os
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)
BATCH_SIZE = 8
EPOCHS = 50
noise_dim = 100
num_examples_to_generate = 16
img_height=224
img_width=224

In [4]:
dataset = tf.keras.preprocessing.image_dataset_from_directory(
        '/content/drive/My Drive/imgs_part_1', #Pad-UFES-20
        label_mode=None,  # Since it's an unconditional generator, we don't need labels
        image_size=(img_height, img_width),  # Resize images to the target size
        batch_size=BATCH_SIZE  # We'll handle batching later
    )

# Normalize images to [-1, 1]
dataset = dataset.map(lambda x: (x - 127.5) / 127.5)

Found 2298 files belonging to 1 classes.


In [3]:
def build_generator():
    model = tf.keras.Sequential()
    model.add(layers.Dense(14*14*512, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Reshape((14, 14, 512)))
    model.add(layers.Conv2DTranspose(256, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(32, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(1, 1), padding='same', use_bias=False, activation='tanh'))
    return model

generator = build_generator()

In [4]:
def build_discriminator():
    model = tf.keras.Sequential()
    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[224, 224, 3]))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(256, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(512, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))
    model.add(layers.Flatten())
    model.add(layers.Dense(1))
    return model

discriminator = build_discriminator()

In [5]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    loss = cross_entropy(tf.ones_like(fake_output), fake_output)
    return loss

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

In [None]:
def sample_images(generator, image_grid_rows=3, image_grid_columns=3):

    satisfied = False
    images = []
    while not satisfied:
      print(len(images))
      noise = np.random.normal(0, 1, (image_grid_rows * image_grid_columns, 100))
      generated_images = generator(noise, training=False)
      #generated_images = 0.5 * generated_images + 0.5

      input_layer = tf.keras.Input(shape=(224, 224, 3))
      output_layer = discriminator(input_layer)
      output_layer = tf.keras.activations.sigmoid(output_layer)

      # Create a new model
      sigDiscriminator = tf.keras.Model(inputs=input_layer, outputs=output_layer)
      test = sigDiscriminator(generated_images, training=False).numpy()

      for n in range(image_grid_rows * image_grid_columns):
        if test[n][0] < 0.5:
          images.append(generated_images[n])
      if len(images) >= image_grid_rows * image_grid_columns:
        satisfied = True


    fig, axs = plt.subplots(image_grid_rows, image_grid_columns, figsize=(4, 4), sharey=True, sharex=True)
    cnt = 0
    #plt.imshow(images[0])
    for i in range(image_grid_rows):
        for j in range(image_grid_columns):
            axs[i, j].imshow(images[cnt])
            axs[i, j].axis('off')
            cnt += 1
    plt.show()

sample_images(generator)


In [8]:
#@tf.function
def train_step(images, epoch):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)


    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss

In [6]:
gen_dir = "/content/drive/My Drive/checkpointsGen/"
checkpoint_prefix = os.path.join(gen_dir, "ckpt")


genCP = tf.train.Checkpoint(optimizer=generator_optimizer, model=generator)
genCheckpointer = tf.train.CheckpointManager(genCP, gen_dir, max_to_keep=3)

In [7]:
disc_dir = "/content/drive/My Drive/checkpointDisc/"
checkpoint_prefix = os.path.join(gen_dir, "ckpt")

discCP = tf.train.Checkpoint(optimizer=discriminator_optimizer, model=discriminator)
discCheckpointer = tf.train.CheckpointManager(discCP, disc_dir, max_to_keep=3)

In [None]:
latest_checkpoint = genCheckpointer.latest_checkpoint
if latest_checkpoint:
    #genCP.restore(latest_checkpoint)
    genCP.restore("/content/drive/My Drive/checkpointsGen/ckpt-12") #.assert_consumed()
    print(f"Generator loaded from {latest_checkpoint}")
else:
    print("Generator training from scratch.")

latest_checkpoint = discCheckpointer.latest_checkpoint
if latest_checkpoint:
    #discCP.restore(latest_checkpoint)
    discCP.restore("/content/drive/My Drive/checkpointDisc/ckpt-12") #.assert_consumed()
    print(f"Discriminator loaded from {latest_checkpoint}")
else:
    print("Discriminator training from scratch.")

In [12]:
def train(dataset, epochs):
  for epoch in range(epochs):
    for image_batch in dataset:
        gen_loss, disc_loss = train_step(image_batch, epoch)
        print(f"\n\nEpoch: {epoch + 1} | Generator Loss: {gen_loss} | Discriminator Loss: {disc_loss}")

    print(f"\nEnd of epoch {epoch + 1}\n")
    if (epoch + 1) % 10 == 0:
      genCheckpointer.save()
      discCheckpointer.save()


In [None]:
train(dataset, EPOCHS)
