In [None]:
import numpy as np
import tensorflow as tf
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
from tensorflow.keras.layers import Dense,BatchNormalization,LeakyReLU,Reshape,Conv2DTranspose,Conv2D,Dropout,Dense,Flatten
import time
 
from IPython import display
 
from tensorflow.python.client import device_lib
print(device_lib.list_local_devices())

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
#import and preprocess data
training_images=np.load("drive/My Drive/LArTPCdata/SupernovaImages.npy")

training_images = training_images.reshape(len(training_images), 48, 48, 1)
norm=np.amax(training_images)/2
training_images = (training_images - norm) / norm # Normalize the images to [-1, 1]

# Shuffle and batch the data (to fit in memory)
buffer_size = len(training_images)
batch_size = 64
training_dataset = tf.data.Dataset.from_tensor_slices(training_images).shuffle(buffer_size).batch(batch_size)

In [None]:
def generator_model():
    '''
    produce image from a seed
    Dense layer takes seed as input and upsamples several times until an image of size 48x48 is achieved
    Conv2DTranspose: upsampling layer, "deconvolution", going from output to input
    LeakyReLU activation for layers (to avoid dead neurons) except tanh for final layer'''
    model = tf.keras.Sequential()
    model.add(Dense(12*12*256, use_bias=False, input_shape=(48,)))
    model.add(BatchNormalization()) #normalize inputs
    model.add(LeakyReLU()) #used for sparse gradients (signal not strong enough for weigths to be tuned)

    model.add(Reshape((12, 12, 256)))
    assert model.output_shape == (None, 12, 12, 256)

    model.add(Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
    assert model.output_shape == (None, 12, 12, 128)
    model.add(BatchNormalization())
    model.add(LeakyReLU())

    model.add(Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    assert model.output_shape == (None, 24, 24, 64)
    model.add(BatchNormalization())
    model.add(LeakyReLU())

    model.add(Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    assert model.output_shape == (None, 48, 48, 1)

    return model


generator = generator_model()

noise = tf.random.normal([1, 48])
generated_image = generator(noise, training=False)

plt.imshow(generated_image[0, :, :, 0], cmap='gray')

In [None]:
def discriminator_model():
    '''standard CNN
    output positive values for real images
    output negative values for fake images'''
    model = tf.keras.Sequential()
    model.add(Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[48, 48, 1]))
    model.add(LeakyReLU())
    model.add(Dropout(0.3)) #sets inputs to zero at rate given, to avoid overfitting. Other inputs are scaled up so total sum is unchanged

    model.add(Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(LeakyReLU())
    model.add(Dropout(0.3)) 

    model.add(Flatten())
    model.add(Dense(1))

    return model


discriminator = discriminator_model()
#generated images have to be run htrough discriminator to see if they have fooled it
decision = discriminator(generated_image)
print (decision)

In [None]:
#define cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

def discriminator_loss(real_output, fake_output):
    '''quantify how well the discriminator categorises images
    compares predictions on real images to an array of ones
    and on fake generated images to an array of zeros'''
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    return real_loss + fake_loss
    

def generator_loss(fake_output):
    '''quantify how well the discriminator was fooled. 
    A generator performing well means the discriminator classifies generated images as 1
    compare discriminator output on fake images to array of ones
    '''
    return cross_entropy(tf.ones_like(fake_output), fake_output)  

In [None]:
noise_dim = 48 #due to size of images
num_examples_to_generate = 9 #represents the number of pictures displayed at each epoch

seed = tf.random.normal([num_examples_to_generate, noise_dim])

In [None]:
#create two generators to train network parameters separately
generator_optimizer = tf.keras.optimizers.Adam(0.001)
discriminator_optimizer = tf.keras.optimizers.Adam(0.001)

#create a checkpoint to save the model parameters if the training is stopped
checkpoint_dir = 'drive/My Drive/GAN'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)



@tf.function #to compile function
def train_step(images):
    '''
    take random seed as input and produce image
    discriminator classifies real (dataset) and fake (generated) images
    calculates loss, and uses gradients to update generator and discriminator parameters
    '''
    noise = tf.random.normal([batch_size, noise_dim])

    #Records operations for automatic differentiation
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:

        #generate images
        generated_images = generator(noise, training=True)

        #discriminate real and fake images
        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        #calculate loss
        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)

    #calculate gradients
    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    #apply gradients to the model parameters
    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

In [None]:
def train(dataset, epochs):
  '''function to run multiple training steps sequentially'''
    for epoch in range(epochs):
        start = time.time()

        #perform a training step on batches of data
        for image_batch in dataset:
            train_step(image_batch)

        # produce image for display
        display.clear_output(wait=True)
        generate_and_display_images(generator, epoch + 1, seed)

        # Save the model every 50 epochs
        if (epoch + 1) % 50 == 0:
          checkpoint.save(file_prefix = checkpoint_prefix)

        print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

    # Generate display after the final epoch
    display.clear_output(wait=True)
    generate_and_display_images(generator, epochs, seed)

In [None]:
def generate_and_display_images(model, epoch, test_input):

    #training set to false so that prediction on an image does not change parameters for others
    predictions = model(test_input, training=False)

    fig = plt.figure(figsize=(3,3))

    for i in range(predictions.shape[0]):
        plt.subplot(3, 3, i+1)
        plt.imshow(predictions[i, :, :, 0] * norm + norm)
        plt.axis('off')

    plt.show()

In [None]:
#restore last saved checkpoint if training is stopped at some point

checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))

In [None]:
EPOCHS = 2000
#4000 epochs already run
train(training_dataset, EPOCHS)