<a href="https://colab.research.google.com/github/JueunL/IANNWTF-Group25/blob/Workflow/hw_02_group_25.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%load_ext tensorboard
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import layers
import datetime
%matplotlib notebook

In [None]:
# Load the MNIST dataset.
# Normalize to [-1,1]
(train_images, train_labels), (_, _) = tf.keras.datasets.fashion_mnist.load_data()
train_images = (train_images/255.0)*2 - 1
train_images = np.reshape(train_images, newshape=[-1,28,28,1])
train_images = train_images.astype(np.float32)

print (train_images.shape)
print (train_labels.shape)

(60000, 28, 28, 1)
(60000,)


In [None]:
# Define the Generator.
class Generator(tf.keras.layers.Layer):
    
    def __init__(self):
        super(Generator, self).__init__()
        # In the Generator we take the Embedding and feed it into a dense layer
        # This will get some more information out of the embedding since we 
        # increase the "channel size"
        self.dense = tf.keras.layers.Dense(
                            units=7*7*64,
                            activation=None,
                            use_bias=False
            
        )

        # Now we perform BatchNorm (to prevent overfitting) and
        # upsample the embedding to a proper image with a Convolutional Transpose Layer
        # (after doing this three times we have the image)
        self.batchnorm_1 = tf.keras.layers.BatchNormalization()
        self.convT_1 = tf.keras.layers.Conv2DTranspose(
                            filters=32,
                            kernel_size=5,
                            strides=1,
                            padding='SAME',
                            activation=None,
                            use_bias=False
        )
        self.batchnorm_2 = tf.keras.layers.BatchNormalization()
        self.convT_2 = tf.keras.layers.Conv2DTranspose(
                            filters=16,
                            kernel_size=5,
                            strides=2,
                            padding='SAME',
                            activation=None,
                            use_bias=False
        )
        self.batchnorm_3 = tf.keras.layers.BatchNormalization()
        self.convT_3 = tf.keras.layers.Conv2DTranspose(
                            filters=1,
                            kernel_size=5,
                            strides=2,
                            padding='SAME',
                            activation=tf.nn.tanh,
                            use_bias=False
        )
        
    def call(self,x,is_training):
        # Here we pass the input thru all layers in the order they were specified
        x = self.dense(x)

        # In the BatchNorm layers we specify if we are currently training or testing
        # the generator
        x = self.batchnorm_1(x, training=is_training)
        
        # Like always we have to add the activation after the batchnorm
        x = tf.nn.leaky_relu(x)
        x = tf.reshape(x, shape=(-1, 7, 7, 64))
        x = self.convT_1(x)
        x = self.batchnorm_2(x, training=is_training)
        x = tf.nn.leaky_relu(x)
        x = self.convT_2(x)
        x = self.batchnorm_3(x, training=is_training)
        x = tf.nn.leaky_relu(x)
        x = self.convT_3(x)
        return x

In [None]:
# Define the Discriminator.
class Discriminator(tf.keras.layers.Layer):
     
    def __init__(self):
        super(Discriminator, self).__init__()
        
        # Firstly we perform two convolution steps to downsample the image
        self.conv_1 = tf.keras.layers.Conv2D(
                            filters = 8,
                            kernel_size =5, 
                            strides=2,
                            padding="SAME",
                            activation=tf.nn.leaky_relu
                            
        )
        self.conv_2 = tf.keras.layers.Conv2D(
                            filters = 16,
                            kernel_size =5, 
                            strides=2,
                            padding="SAME",
                            activation=tf.nn.leaky_relu
        )        
        # Now we flatten the image and use a Dense layer to classify the image
        # using a single unit and sigmoid for activation
        self.flatten = tf.keras.layers.Flatten()
        self.output_layer = tf.keras.layers.Dense(
                            units = 1,
                            activation=tf.nn.sigmoid
        )
        
    def call(self, x, is_training):
        x = self.conv_1(x)
        x = self.conv_2(x)
        x = self.flatten(x)
        x = self.output_layer(x)
        return x

In [None]:
# Define the dataset.
dataset = tf.data.Dataset.from_tensor_slices(train_images)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(batch_size=32)

In [None]:
# Define the loss for the generator.
def generator_loss(probabilities):
    # Get only the output probabilities for the fake images.
    probabilities_fake = probabilities[:32]
    # Create the label vector indicating that the images are correct (=1).
    labels_one = tf.convert_to_tensor(np.ones(shape=(32,1)))
    # Use binary cross entropy loss to compute the loss. tf.keras.losses.BinaryCrossentropy()
    binary = tf.keras.losses.BinaryCrossentropy()
    loss = binary(labels_one,probabilities_fake)
    return loss

# Define the loss for the discriminator.
def discriminator_loss(probabilities):
    # Create the label vector indicating which images are real and which are fake.
    labels_one = tf.convert_to_tensor(np.ones(shape=(32,1)))
    labels_zero = tf.convert_to_tensor(np.zeros(shape=(32,1)))
    labels = tf.concat([labels_zero, labels_one], axis=0)
    # Use binary cross entropy loss to compute the loss.
    binary = tf.keras.losses.BinaryCrossentropy()
    loss = binary(labels,probabilities)
    return loss

In [None]:
!rm -rf ./logs/ 
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

# For me the in-notebook tensorboard sometimes doesn't work. In this case maybe just use your terminal
# if you work on your own machine. (comment out the following line)
%tensorboard --logdir logs/

tf.keras.backend.clear_session()

# Initialzing generator and discriminator.
generator = Generator()
discriminator = Discriminator()
# Define size of latent variable vector.
z_dim = 100
# Define random noise to use for generating 8 images for supervision.
seed = tf.random.normal(shape=[8,z_dim])

# Initialize two optimizers (one for generator, one for discriminator).
# During training you will have to do every step twice (computing loss, computing gradients,
# applying gradients, storing loss in summary). Namely once for generator and once for discriminator.
gen_optimizer = tf.keras.optimizers.Adam(1e-4)
dis_optimizer = tf.keras.optimizers.Adam(1e-4)

step=0

for epochs in range(25):
    for real in dataset:
        
        with tf.GradientTape() as gen_tape, tf.GradientTape() as dis_tape:
            
            # Generate random noise vector. tf.random.normal()
            noise = tf.random.normal(shape=[32, z_dim])
            # Generate fake images with generator.
            fake = generator(noise, is_training=True)
            # Merge fake and real images to a long vector. tf.concat()
            images = tf.concat([fake, real], axis=0)
            # Compute output from discriminator.
            probs = discriminator(images, is_training=True)
            
            # Compute loss, compute gradients, apply gradients, store summaries.
            gen_loss = generator_loss(probs)
            dis_loss = discriminator_loss(probs)
            gen_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
            dis_gradients = dis_tape.gradient(dis_loss, discriminator.trainable_variables)

            gen_optimizer.apply_gradients(zip(gen_gradients, generator.trainable_variables))
            dis_optimizer.apply_gradients(zip(dis_gradients, discriminator.trainable_variables))

            with train_summary_writer.as_default():
                tf.summary.scalar('generator_loss', gen_loss, step=step)
                tf.summary.scalar('discriminator_loss', dis_loss, step=step)
            
        # Every 100 steps generate images from the defined seed and 
        # store to supervise how well the generator works.
        if step % 100 == 0:
            fake = generator(seed, is_training=False)
            with test_summary_writer.as_default():
                # Before showing the fake images we also have to bring them back in 
                # range 0:1 (for training we used -1:1)
                tf.summary.image('fake_images', (fake + 1) / 2, step=step, max_outputs=8)
            
        step += 1

### cGAN

In [None]:
(img, label), _ = tf.keras.datasets.fashion_mnist.load_data()
img = (img / 255) * 2 - 1
img = np.reshape(img, newshape=[-1,28,28,1])
img = img.astype(np.float32)

BATCH_SIZE = 128

img = tf.data.Dataset.from_tensor_slices(img)
label = tf.data.Dataset.from_tensor_slices(label)
data = tf.data.Dataset.zip((img,label))
data = data.shuffle(buffer_size=1000).batch(batch_size=BATCH_SIZE)

In [None]:
class cGAN_Discriminator(tf.keras.Model):
  def __init__(self, input_shape=(28,28,1), n_classes=10):
    super(cGAN_Discriminator, self).__init__()

    self.label_layers = []
    self.image_layers = []

    # Turn the label into a embedding of size 50 
    # (for each label the discriminator will learn a different embedding)
    self.label_layers.append(layers.Embedding(n_classes, 50))
    
    # Now we scale up the embedding to the amount of pixels the images have
    self.label_layers.append(layers.Dense(input_shape[0]*input_shape[1]))

    # To get the correct dimensions and not a flat image we reshape the pixels
    self.label_layers.append(layers.Reshape(input_shape))

    # Merge image and label (the label becomes another channel)
    self.merge = layers.Concatenate()

    # Now we can define the classifier
    # We start with two Convolutional layers
    for _ in range(2):
      self.image_layers.append(layers.Conv2D(128, (3,3), (2,2), "same"))
      self.image_layers.append(layers.LeakyReLU(alpha=0.2))

    # Now we flatten the images with GlobalAveragePooling
    self.image_layers.append(layers.GlobalAveragePooling2D())

    # To prevent overfitting we apply Dropout
    self.image_layers.append(layers.Dropout(0.4))

    # And get our prediction using a final Dense Layers
    # (Due to GANs taking long to train we won't use more Dense Layers,
    # but if you have the time to spend knock yourself out)
    self.image_layers.append(layers.Dense(1, "sigmoid"))

  @tf.function
  def __call__(self, x_img, x_lab):
    for layer in self.label_layers:
      x_lab = layer(x_lab)

    x = self.merge([x_img, x_lab])

    for layer in self.image_layers:
      x = layer(x)

    return x

In [None]:
class cGAN_Generator(tf.keras.Model):
  def __init__(self, latent_dim, n_classes=10):
    super(cGAN_Generator, self).__init__()

    self.label_layers = []
    self.embedding_layers = []
    self.image_layers = []

    # Here everything is like in the Generator, except we 
    # shape to a size of 7x7 matching the size of the embeddings
    self.label_layers.append(layers.Embedding(n_classes, 50))
    self.label_layers.append(layers.Dense(7*7))
    self.label_layers.append(layers.Reshape((7,7,1)))

    # Now we generate the Embeddings from the randomly generated
    # latent values
    self.embedding_layers.append(layers.Dense(128*7*7))
    self.embedding_layers.append(layers.LeakyReLU(alpha=0.2))
    self.embedding_layers.append(layers.Reshape((7,7,128)))

    self.merge = layers.Concatenate()

    for _ in range(2):
      self.image_layers.append(layers.Conv2DTranspose(128, (4,4), (2,2), "same"))
      self.image_layers.append(layers.LeakyReLU(alpha=0.2))

    self.image_layers.append(layers.Conv2D(1, (7,7), padding="same", activation="tanh"))

  #@tf.function
  def __call__(self, x):
    # Split image and label
    x_img = x[0]
    x_lab = x[1]

    for layer in self.label_layers:
      x_lab = layer(x_lab)
    
    for layer in self.embedding_layers:
      x_img = layer(x_img)

    x = self.merge([x_img, x_lab])

    for layer in self.image_layers:
      x = layer(x)

    return x

In [None]:
# Define the loss for the generator.
def generator_loss(probabilities):
    # Get only the output probabilities for the fake images.
    probabilities_fake = probabilities[:BATCH_SIZE]
    # Create the label vector indicating that the images are correct (=1).
    labels_one = tf.convert_to_tensor(np.ones(shape=(len(probabilities_fake),1)))
    # Use binary cross entropy loss to compute the loss. tf.keras.losses.BinaryCrossentropy()
    binary = tf.keras.losses.BinaryCrossentropy()
    loss = binary(labels_one,probabilities_fake)
    return loss

# Define the loss for the discriminator.
def discriminator_loss(probabilities):
    # Create the label vector indicating which images are real and which are fake.
    labels_one = tf.convert_to_tensor(np.ones(shape=(BATCH_SIZE,1)))
    labels_zero = tf.convert_to_tensor(np.zeros(shape=(len(probabilities) - BATCH_SIZE,1)))
    labels = tf.concat([labels_zero, labels_one], axis=0)
    # Use binary cross entropy loss to compute the loss.
    binary = tf.keras.losses.BinaryCrossentropy()
    loss = binary(labels,probabilities)
    return loss

In [None]:
!rm -rf ./logs/ 
current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
train_log_dir = 'logs/gradient_tape/' + current_time + '/train'
test_log_dir = 'logs/gradient_tape/' + current_time + '/test'
train_summary_writer = tf.summary.create_file_writer(train_log_dir)
test_summary_writer = tf.summary.create_file_writer(test_log_dir)

# For me the in-notebook tensorboard sometimes doesn't work. In this case maybe just use your terminal
# if you work on your own machine. (comment out the following line)
%tensorboard --logdir logs/

tf.keras.backend.clear_session()


# Define size of latent variable vector.
z_dim = 100

# Initialzing generator and discriminator.
generator = cGAN_Generator(z_dim)
discriminator = cGAN_Discriminator()

# Define random noise to use for generating 10 images for supervision. (one for each class)
seed = tf.random.normal(shape=[10,z_dim])
sup_labels = tf.convert_to_tensor(list(range(0,10)))

# Initialize two optimizers (one for generator, one for discriminator).
# During training you will have to do every step twice (computing loss, computing gradients,
# applying gradients, storing loss in summary). Namely once for generator and once for discriminator.
gen_optimizer = tf.keras.optimizers.Adam(1e-4)
dis_optimizer = tf.keras.optimizers.Adam(1e-4)

step=0

for epochs in range(25):
    for real in data:
        
        with tf.GradientTape() as gen_tape, tf.GradientTape() as dis_tape:
            
            # Generate random noise vector. tf.random.normal()
            noise = tf.random.normal(shape=[BATCH_SIZE, z_dim])
            labels = tf.random.uniform([BATCH_SIZE], 0, 10, tf.int32)

            # Generate fake images with generator.
            fake = generator([noise, labels])
            # Merge fake and real images to a long vector. tf.concat()
            images = tf.concat([fake, real[0]], axis=0)
            # Compute output from discriminator.
            labels_real = tf.cast(real[1], tf.int32)

            probs = discriminator(images, tf.concat([labels, labels_real],0))
            
            # Compute loss, compute gradients, apply gradients, store summaries.
            gen_loss = generator_loss(probs)
            dis_loss = discriminator_loss(probs)
            gen_gradients = gen_tape.gradient(gen_loss, generator.trainable_variables)
            dis_gradients = dis_tape.gradient(dis_loss, discriminator.trainable_variables)

            gen_optimizer.apply_gradients(zip(gen_gradients, generator.trainable_variables))
            dis_optimizer.apply_gradients(zip(dis_gradients, discriminator.trainable_variables))

            with train_summary_writer.as_default():
                tf.summary.scalar('generator_loss', gen_loss, step=step)
                tf.summary.scalar('discriminator_loss', dis_loss, step=step)
            
        # Every 500 steps generate images from the defined seed and 
        # store to supervise how well the generator works.
        if step % 500 == 0:
            # To get the best fakes possible each 500 steps we generate
            # a variable number of fakes and only show the image that fooled the
            # discriminator the most (one fake per class)
            best_fakes = []
            NUM_FAKES = 10
            for i in range(10):
                seed = tf.random.normal(shape=[NUM_FAKES,z_dim])
                fake = generator([seed, np.asarray([i]*NUM_FAKES)])
                prob = discriminator(fake, np.asarray([i]*NUM_FAKES))
                best_fakes.append(fake[np.argmax(prob)])

            with test_summary_writer.as_default():
                # Before showing the fake images we also have to bring them back in 
                # range 0:1 (for training we used -1:1)
                tf.summary.image("fake images", (np.asarray(best_fakes) + 1) / 2, step=step, max_outputs=10)
            
        step += 1

In [None]:
# This makes sure that there is no colab timeout while training
while True:pass