In [None]:
#**************************************************************************************
#
#    Title: <YoanoGAN-Experiment>
#    Author: <Ege Demir>
#    Date: <6/26/1011>
#    Code version: <1.0>
#
#**************************************************************************************


In [None]:
import glob
import pandas as pd
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
from tensorflow.keras import layers 
import time
import tensorflow as tf
from IPython import display
from IPython.display import clear_output
from keras.initializers import RandomNormal
from PIL import Image
from datetime import datetime

# Load and prepare dataset

In [25]:
BUFFER_SIZE = 60000
BATCH_SIZE = 128

In [None]:
def generate_dataset(singleClassSize):

  classAPoints = np.random.multivariate_normal([1,1], np.eye(2), size=singleClassSize)
  classBPoints = np.random.multivariate_normal([-1,-1], np.eye(2), size=singleClassSize)

  x = np.append(classAPoints,classBPoints,axis=0)
  y = np.append(np.zeros(singleClassSize),np.ones(singleClassSize))

  return x,y


In [None]:
x,y = generate_dataset(1000)

In [None]:
y = tf.keras.utils.to_categorical(y, 2)

In [26]:
x.shape

(2000, 2)

In [27]:
# Batch and shuffle the data
dataset = tf.data.Dataset.from_tensor_slices((x,y)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# Custom Layers

# Custom Models

## Generator

In [28]:
def make_generator_model(latent_dim,class_Amt):
    model = tf.keras.Sequential()
    model.add(layers.Dense(16,input_shape=(latent_dim+class_Amt,)))
    
    for i in range(4):
      model.add(layers.Dense(16))
      model.add(layers.LeakyReLU(0.1))

    
    model.add(layers.Dense(2))
    assert model.output_shape == (None,2)

    return model

## Discriminator

In [41]:
def make_discriminator_model():
    model = tf.keras.Sequential()

    model.add(layers.Dense(32))
    
    for i in range(4):
      model.add(layers.Dense(32))
      model.add(layers.LeakyReLU(0.1))


    model.add(layers.Dense(2))

    return model

# Define Loss and Optimizers

In [30]:
# This method returns a helper function to compute cross entropy loss
cross_entropy_categorical = tf.keras.losses.CategoricalCrossentropy(from_logits=True)
cross_entropy_binary = tf.keras.losses.BinaryCrossentropy(from_logits=True)


In [31]:
def discriminator_loss(y_pred,y_real):
    return cross_entropy_categorical(y_real,y_pred)

In [32]:
def generator_loss(y_pred,y_real):


    # indices = tf.math.argmax(y_real,axis=1)

    # arr = []
    # for i in range(BATCH_SIZE):
    #   arr.append(y_pred[i,indices[i]])

    # final_cross_entropy = cross_entropy_binary(tf.zeros_like(indices),arr)

    final_cross_entropy = cross_entropy_categorical((tf.ones_like(y_real)-(y_real)),y_pred)
    
    return final_cross_entropy






# Define Callbacks

In [33]:
logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
file_writer = tf.summary.create_file_writer(logdir + "/metrics")
file_writer.set_as_default()
tensorboard_callback = tf.keras.callbacks.TensorBoard(
    log_dir='logs',
    histogram_freq=0,
    write_graph=False,
    write_images=True,
    write_steps_per_second=False,
    update_freq=100,
    profile_batch=0,
    embeddings_freq=0,
    embeddings_metadata=None,
)


# Define training loop

In [34]:
EPOCHS = 500
noise_dim = 128
num_examples_to_generate = 9

# You will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)

rand_y = [0,1,2,3,4,5,6,7,8]
rand_y = tf.keras.utils.to_categorical(rand_y, 10)
#print(rand_y)
#rand_y = np.append(np.ones((num_examples_to_generate,1)),np.zeros((num_examples_to_generate, 9)),axis=1)

seed = np.append(rand_y,tf.random.normal([num_examples_to_generate, noise_dim]),axis=1)


In [36]:
class YoanoGAN(tf.keras.Model):
    def __init__(self, discriminator, generator, latent_dim,lmbd=0.7):
        super(YoanoGAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        
        self.currentBatch = 0
        
        self.latent_dim = latent_dim
        self.lmbd = lmbd
        
        self.gen_loss_tracker = tf.keras.metrics.Mean(name="generator_loss")
        self.disc_loss_tracker = tf.keras.metrics.Mean(name="discriminator_loss")
        self.norm_tracker = tf.keras.metrics.Mean(name="gen_output_norm")
        self.w_noise_accuracy_tracker = tf.keras.metrics.Mean(name="w_noise_accuracy")
        self.wo_noise_accuracy_tracker = tf.keras.metrics.Mean(name="wo_noise_accuracy")
        
        
    @property
    def metrics(self):
        return [self.gen_loss_tracker, self.disc_loss_tracker,self.norm_tracker,self.w_noise_accuracy_tracker,self.wo_noise_accuracy_tracker]
    
    def get_accuracy(self,x,y_real):
        x = tf.cast(x,tf.int64)
        y_real = tf.cast(y_real,tf.int64)
        
        predictions = tf.math.argmax(self.discriminator(x),output_type=tf.dtypes.int64,axis=1)
        return (tf.math.count_nonzero(tf.math.argmax(y_real,axis=1)-predictions))/(tf.cast(tf.shape(x)[0],tf.int64))
    
    def get_norm(self,noise):
        norm_calculated = tf.norm(noise,ord=2,axis=[1])
        norm_mean = tf.math.reduce_mean(norm_calculated)
        return norm_mean
    
    def compile(self, d_optimizer, g_optimizer, dis_loss_fn, gen_loss_fn):
        super(YoanoGAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.dis_loss_fn = dis_loss_fn
        self.gen_loss_fn = gen_loss_fn

    def train_step(self, data):
        
        self.currentBatch = self.currentBatch + 1
        
        #Import data and infer batch size
        x, y = data
        BATCH_SIZE = tf.shape(x)[0]
        
        #Cast x to float32 to match with generator output type (Otherwise adding the two gives an error) 
        x = tf.cast(x, tf.float32)
        
        #Append normal noise and one hot labeled y data together for generator input.
        gen_in = tf.experimental.numpy.append(y, tf.random.normal([BATCH_SIZE, self.latent_dim]), axis=1)
        
        with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
            
            #Run the generator and generate noises
            generated_noise = self.generator(gen_in, training=True)
            generated_noise = tf.squeeze(generated_noise * self.lmbd) #Squeezing to remove axis-1 with 1 dimension.
            
            #Check discriminator output by adding noise and x of dataset.
            disc_output = self.discriminator(generated_noise+x, training=True)
            
            #Calculate loss values for both models.
            gen_loss = self.gen_loss_fn(disc_output,y)
            disc_loss = self.dis_loss_fn(disc_output,y)
            
        
        #Calculate and apply gradients for both models.
        if(True):
            gradients_of_generator = gen_tape.gradient(gen_loss, self.generator.trainable_variables)
            self.g_optimizer.apply_gradients(zip(gradients_of_generator, self.generator.trainable_variables))
    
        if(True):
            gradients_of_discriminator = disc_tape.gradient(disc_loss, self.discriminator.trainable_variables)
            self.d_optimizer.apply_gradients(zip(gradients_of_discriminator, self.discriminator.trainable_variables))
        
        
        
        wo_noise_acc = self.get_accuracy(x,y)
        w_noise_acc = self.get_accuracy(generated_noise+x,y)
        norm = self.get_norm(generated_noise)
        
        tf.summary.scalar('Generator Loss', data=gen_loss, step=self.currentBatch)
        tf.summary.scalar('Discriminator Loss', data=disc_loss, step=self.currentBatch)
        tf.summary.scalar('Generator Norm', data=norm, step=self.currentBatch)
        tf.summary.scalar('With Noise Accuracy', data=w_noise_acc, step=self.currentBatch)
        tf.summary.scalar('Without Noise Accuracy', data=wo_noise_acc, step=self.currentBatch)
        
        
        
        #Update and return metrics
        self.gen_loss_tracker.update_state(gen_loss)
        self.disc_loss_tracker.update_state(disc_loss)
        self.norm_tracker.update_state(norm)
        self.w_noise_accuracy_tracker.update_state(w_noise_acc)
        self.wo_noise_accuracy_tracker.update_state(wo_noise_acc)
        return {
            "g_loss": self.gen_loss_tracker.result(),
            "d_loss": self.disc_loss_tracker.result(),
            "norm": self.norm_tracker.result(),
            "With Noise Accuracy": self.w_noise_accuracy_tracker.result(),
            "Without Noise Accuracy": self.wo_noise_accuracy_tracker.result()
        }


In [42]:
generator = make_generator_model(latent_dim=20,class_Amt=2)
discriminator = make_discriminator_model()

yoano_gan = YoanoGAN(
    discriminator=discriminator, generator=generator, latent_dim=20,lmbd=0.7
)

yoano_gan.compile(
    d_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0003),
    g_optimizer = tf.keras.optimizers.Adam(learning_rate=0.0003),
    dis_loss_fn = discriminator_loss,
    gen_loss_fn = generator_loss
)




In [43]:
tf.config.run_functions_eagerly(True) #Figure out how to do graph execution

In [None]:
yoano_gan.fit(dataset,batch_size=128, epochs=20)

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20

KeyboardInterrupt: 

In [46]:
# Load the TensorBoard notebook extension
%reload_ext tensorboard

In [47]:
%tensorboard --logdir logs/scalar

<IPython.core.display.Javascript object>

In [None]:
EPOCHS = 500
latent_dim = 20
num_examples_to_generate = 9

# You will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)

rand_y = [0,1,2,3,4,5,6,7,8]
rand_y = tf.keras.utils.to_categorical(rand_y, 10)

seed = np.append(rand_y,tf.random.normal([num_examples_to_generate, latent_dim]),axis=1)


NameError: name 'tf' is not defined

In [None]:
generated_images = yoano_gan.generator(seed) * 0.7
fig = plt.figure(figsize=(7, 10))
fig.suptitle('Generated Noises', fontsize=15)
for i in range(generated_images.shape[0]):
    plt.subplot(3, 3, i+1)
    plt.title(str(i))
    plt.imshow(generated_images[i, :, :, 0], cmap='gray',vmin=0, vmax=1)
    plt.axis('off')

plt.show()
np.mean(generated_images)

NameError: name 'yoano_gan' is not defined

In [None]:
def test_model_w_attacker():
  #Prepare y values and random values for input to generator
  rand_y_in = tf.experimental.numpy.append(test_labels, tf.random.normal([test_labels.shape[0], latent_dim]), axis=1)
  
  #Generate noises for rand_y_in
  generated_noise = yoano_gan.generator(rand_y_in)
  generated_noise = generated_noise * 0.7
  generated_noise = tf.squeeze(generated_noise)

  #Add noises to x images
  noisy_images = generated_noise.numpy()+test_images

  #Create a new attacker model that has the same architecture as GAN discriminator, and try to fit to noisy x inputs
  attacker_model = make_discriminator_model()
  attacker_model.compile(optimizer=tf.keras.optimizers.Adam(),loss=cross_entropy_categorical,metrics=["accuracy"])
  return attacker_model.fit(noisy_images,test_labels,validation_split=0.3,epochs=2)

In [None]:
test_model_w_attacker()

Epoch 1/2
  1/219 [..............................] - ETA: 21s - loss: 2.3006 - accuracy: 0.0938



Epoch 2/2


<keras.callbacks.History at 0x1d9eddec280>