In [1]:
# DL modules
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import *
from tensorflow.keras import Sequential, Model
import keras.backend as K
from tensorflow.keras.initializers import RandomNormal
! pip install tensorflow_addons
from tensorflow_addons.layers import SpectralNormalization

# relevent libraries 
import numpy as np 
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os

## matplotlib stylings
plt.rcParams['figure.figsize'] = 12, 8



In [2]:
# import dataset
import tensorflow as tf
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

classes = {
    0: 'Airplane',  1: 'Automobile',
    2: 'Bird',      3: 'Cat',
    4: 'Deer',      5: 'Dog',
    6: 'Frog',      7: 'Horse',
    8: 'Ship',      9: 'Truck' 
}

y_train = y_train.reshape(-1)
y_test  = y_test.reshape(-1)

print('Dataset Loaded')

Dataset Loaded


In [3]:
print('x_train shape:', x_train.shape)
print('y_train shape:', y_train.shape)
print('x_test shape:', x_test.shape)
print('y_test shape:', y_test.shape)
print(x_train.max())
print(x_train.min())

x_train shape: (50000, 32, 32, 3)
y_train shape: (50000,)
x_test shape: (10000, 32, 32, 3)
y_test shape: (10000,)
255
0


In [4]:
print(np.unique(y_train, return_counts=True))

(array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8), array([5000, 5000, 5000, 5000, 5000, 5000, 5000, 5000, 5000, 5000],
      dtype=int64))


# Feature Engineering
## Pixel normalization
> Dividing each of the pixels by 255 will normalize the pixels between -1 to 1
We normalize the pixels so that it can increase the speed of the learning process
Neural Network processes inputs uses small weights values. Large inputs can disrupt or slow down learning process.
It is good that we normalize the pixels. 


In [5]:
x_train = x_train / 127.5 - 1
x_test = x_test / 127.5 - 1


# PIC OF GAN

In [6]:
input_dim = (32,32,3) # dimensions of training images
latent_dim = 128        # dimensions of latent vectors aka Gaussian noise

In [16]:
class DCGAN():
    def __init__(self, rows=32, cols=32, channels=3, z = 10):
        # Input shape
        self.img_rows = rows
        self.img_cols = cols
        self.channels = channels
        self.img_shape = (self.img_rows, self.img_cols, self.channels)
        self.latent_dim = z
        optimizer = keras.optimizers.Adam(0.0002, 0.5)
        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy',optimizer = optimizer, metrics=['accuracy'])

        # Build the generator
        self.generator = self.build_generator()

        #Generator takes in noise to generate images

        z = Input(shape=(self.latent_dim,))
        img = self.generator(z)

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated images as input and determines validity
        valid = self.discriminator(img)

        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

    def build_generator(self):

        model = Sequential()

        model.add(Dense(128 * 7 * 7, activation="relu", input_dim=self.latent_dim))
        model.add(Reshape((32, 32, 3)))
        model.add(UpSampling2D())
        model.add(Conv2D(128, kernel_size=3, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(UpSampling2D())
        model.add(Conv2D(64, kernel_size=3, padding="same"))
        model.add(BatchNormalization(momentum=0.8))
        model.add(Activation("relu"))
        model.add(Conv2D(self.channels, kernel_size=3, padding="same"))
        model.add(Activation("tanh"))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        img = model(noise)

        return Model(noise, img)

    # function to create discriminator model
    def build_discriminator(image_size):
        weights_init = RandomNormal(mean=0, stddev=0.02)
        discriminator = Sequential([
            Input(shape=input_dim),

            ## 2 by 2 Strides for downsampling
            ## Leaky ReLU for Activation
            SpectralNormalization(
                Conv2D(64, kernel_size=4, strides=2, padding='same', kernel_initializer=weights_init)
            ),
            BatchNormalization(momentum=0.8),
            LeakyReLU(0.2),
           
           # Conv 2
           SpectralNormalization(
               Conv2D(128, kernel_size=4, strides=2, padding='same', kernel_initializer=weights_init)
           ),
           BatchNormalization(momentum=0.8),
           LeakyReLU(0.2),
   
           # Conv 3
           SpectralNormalization(
               Conv2D(256, kernel_size=4, strides=2, padding='same', kernel_initializer=weights_init)
           ),
           BatchNormalization(momentum=0.8),
           LeakyReLU(0.2),
           
           # # Conv 4
           SpectralNormalization(
               Conv2D(512, kernel_size=4, strides=2, padding='same', kernel_initializer=weights_init)
           ),
           BatchNormalization(momentum=0.8),
           LeakyReLU(0.2),
   
           GlobalMaxPooling2D(),
           Dense(1, activation='sigmoid'),
        ], name='discriminator_GAN')
        return discriminator

    def train(self, epochs, batch_size=256, save_interval=50):


    
            # Adversarial ground truths
            valid = np.ones((batch_size, 1))
            fake = np.zeros((batch_size, 1))
    
            for epoch in range(epochs):
    
                # ---------------------
                #  Train Discriminator
                # ---------------------
    
                # Select a random half of images
                idx = np.random.randint(0, x_train.shape[0], batch_size)
                imgs = x_train[idx]
    
                # Sample noise and generate a batch of new images
                noise = np.random.normal(0, 1, (batch_size, self.latent_dim))
                gen_imgs = self.generator.predict(noise)
    
                # Train the discriminator (real classified as ones and generated as zeros)
                d_loss_real = self.discriminator.train_on_batch(imgs, valid)
                d_loss_fake = self.discriminator.train_on_batch(gen_imgs, fake)
                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
    
                # ---------------------
                #  Train Generator
                # ---------------------
    
                # Train the generator (wants discriminator to mistake images as real)
                g_loss = self.combined.train_on_batch(noise, valid)
    
                # Plot the progress
                print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))
    
                # If at save interval => save generated image samples
                if epoch % save_interval == 0:
                    self.save_imgs(epoch)
                
                
    def save_imgs(self, epoch):
            r, c = 5, 5
            noise = np.random.normal(0, 1, (r * c, self.latent_dim))
            gen_imgs = self.generator.predict(noise)
    
            # Rescale images 0 - 1
            gen_imgs = 0.5 * gen_imgs + 0.5
    
            fig, axs = plt.subplots(r, c)
            cnt = 0
            for i in range(r):
                for j in range(c):
                    axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
                    axs[i,j].axis('off')
                    cnt += 1
            fig.savefig("dcgan_mnist_{:d}.png".format(epoch))
            plt.close()

        

In [17]:
dcgan = DCGAN(32,32,3)
dcgan.train(epochs=200, batch_size=256, save_interval=50)

ValueError: Exception encountered when calling layer "reshape_3" (type Reshape).

total size of new array must be unchanged, input_shape = [6272], output_shape = [32, 32, 3]

Call arguments received by layer "reshape_3" (type Reshape):
  • inputs=tf.Tensor(shape=(None, 6272), dtype=float32)

## Everything below not mine

In [None]:
# Code Source: https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit#wrapping_up_an_end-to-end_gan_example
class GAN(Model):
    def __init__(self, discriminator, generator, latent_dim):
        super(GAN, self).__init__()
        self.discriminator = discriminator
        self.generator = generator
        self.latent_dim = latent_dim
        self.gen_loss_tracker = keras.metrics.Mean(name='generator_loss')
        self.disc_loss_tracker = keras.metrics.Mean(name='discriminator_loss')
        self.kl = tf.keras.metrics.KLDivergence()

    def compile(self, d_optimizer, g_optimizer, loss_fn):
        super(GAN, self).compile()
        self.d_optimizer = d_optimizer
        self.g_optimizer = g_optimizer
        self.loss_fn = loss_fn

    def train_step(self, real_images):
        ### TRAINING DISCRIMINATOR
        if isinstance(real_images, tuple):
            real_images = real_images[0]
        # Sample random points in the latent space
        batch_size = tf.shape(real_images)[0]
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Decode them to fake images
        generated_images = self.generator(random_latent_vectors)

        # Combine them with real images
        combined_images = tf.concat([generated_images, real_images], axis=0)

        # Assemble labels discriminating real from fake images
        labels = tf.concat(
            [tf.zeros((batch_size, 1)), tf.ones((batch_size, 1))], axis=0
        )
        # Add random noise to the labels - important trick!
        labels += 0.05 * tf.random.uniform(tf.shape(labels))

        # Train the discriminator
        with tf.GradientTape() as tape:
            predictions = self.discriminator(combined_images)
            d_loss = self.loss_fn(labels, predictions)
        grads = tape.gradient(d_loss, self.discriminator.trainable_weights)
        self.d_optimizer.apply_gradients(
            zip(grads, self.discriminator.trainable_weights)
        )

        ### TRAINING GENERATOR
        # Sample random points in the latent space
        random_latent_vectors = tf.random.normal(shape=(batch_size, self.latent_dim))

        # Assemble labels that say "all real images"
        misleading_labels = tf.ones((batch_size, 1))

        # Train the generator (note that we should *not* update the weights
        # of the discriminator)!
        with tf.GradientTape() as tape:
            generated_images = self.generator(random_latent_vectors)
            predictions = self.discriminator(generated_images)
            g_loss = self.loss_fn(misleading_labels, predictions)
        grads = tape.gradient(g_loss, self.generator.trainable_weights)
        self.g_optimizer.apply_gradients(zip(grads, self.generator.trainable_weights))

        # Monitor Loss
        self.disc_loss_tracker.update_state(d_loss)
        self.gen_loss_tracker.update_state(g_loss)
        self.kl.update_state(y_true=real_images, y_pred=generated_images)
        
        return {
            "d_loss": self.disc_loss_tracker.result(), 
            "g_loss": self.gen_loss_tracker.result(), 
            "KL Divergence": self.kl.result(),
        }

In [None]:
# Custom callback to monitor the Generator at every n epoch
class GANMonitor(keras.callbacks.Callback):
    # class GANMontor
    # - num_img: number of images generated using matplotlib figure
    # - latent_dim: dimensions of noise latent vector passed into the generator
    # - patience: number of epochs to generate images
    # - vmin: minumum scaling factor, a
    # - vmax: maximum scaling factor, b
    # NOTE: only compatible with the GAN class I've set up
    def __init__(self, num_img=10, latent_dim=128, patience=10, vmin=0, vmax=1):
        self.num_img = num_img
        self.latent_dim = latent_dim
        self.patience = patience
        self.vmin = vmin
        self.vmax = vmax
        self.constant_latent_vector = tf.random.normal(shape=(self.num_img, self.latent_dim))

    # method to generate images and display them as matplotlib figure
    def generate_plot(self):
        # Generate Images
        generated_images = self.model.generator(self.constant_latent_vector)

        # Normalise Image from [vmin, vmax] to [0, 1]
        generated_images -= self.vmin
        generated_images /= (self.vmax - self.vmin)

        # Generate Matplotlib Figure
        row_size = np.ceil(self.num_img/5)
        fig = plt.figure(figsize=(10, 2*row_size), tight_layout=True)
        for i in range(self.num_img):
            ax = fig.add_subplot(row_size, 5, i+1)
            ax.imshow(generated_images[i])
            ax.axis('off')
        plt.show()

    # method to save generator's weights
    def save_weights(self, epoch=None):
        try:
            if epoch != None:
                name='generator-epoch-{}.h5'.format(epoch)
                print('Generator Checkpoint - {}'.format(name))
                self.model.generator.save_weights(
                    filepath=name,
                    save_format='h5'
                )
        except Exception as e:
            print(e)

    # generate a plot every n epochs
    def on_epoch_begin(self, epoch, logs=None):
        if epoch % self.patience == 0:
            self.generate_plot()
            self.save_weights(epoch)

    # generate a plot after training
    def on_train_end(self, epoch, logs=None):
        self.generate_plot()
        self.save_weights('Full Train')

In [None]:
# List of Callback Functions
callbacks = [
    GANMonitor(num_img=15, latent_dim=128, patience=5, vmin=-1, vmax=1),
]

In [None]:
# Hyperparameters and Constants
BATCH_SIZE  = 64
LATENT_DIM  = 128
AUTO        = tf.data.AUTOTUNE
EPOCHS      = 100
BUFFER_SIZE = 1024
INPUT_DIM   = (32, 32, 3)

In [None]:
%%time
# Reseting Graph variables
tf.keras.backend.clear_session()
K.clear_session()

# Creating a GAN class
gan = GAN(
    discriminator=create_discriminator(image_size=INPUT_DIM), 
    generator=create_generator(latent_dim=LATENT_DIM), 
    latent_dim=latent_dim
)
# Compiling with Optimizer and Loss Function
gan.compile(
    d_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
    g_optimizer=keras.optimizers.Adam(learning_rate=2e-4, beta_1=0.5),
    loss_fn=keras.losses.BinaryCrossentropy(),
)

# Preparing the Dataset with `tf.data`
dataset = tf.data.Dataset.from_tensor_slices(x_train)
dataset = dataset.shuffle(buffer_size=BUFFER_SIZE).batch(batch_size=BATCH_SIZE, num_parallel_calls=AUTO).prefetch(AUTO)

# Starting the Train Process
hist = gan.fit(dataset, epochs=EPOCHS, use_multiprocessing=True, workers=16, callbacks=callbacks)


In [None]:
# story history object into dataframe
hist_df = pd.DataFrame(hist.history)

# plot the loss
plt.figure(figsize=(10, 5))
plt.plot(hist_df['d_loss'], label='Discriminator Loss')
plt.plot(hist_df['g_loss'], label='Generator Loss')
plt.plot(hist_df['KL Divergence'], label='KL Divergence')
plt.legend()
plt.show()

In [None]:
# Generating 1000 Synthetic Images
random_noise = tf.random.normal(shape=(1000, latent_dim))
synthetic_images = gan.generator.predict(random_noise)
print("Latent Vector Dim: {}\tGenerated Images Dim: {}".format(random_noise.shape, synthetic_images.shape))

# Scaling back to [0, 1]
synthetic_images -= -1
synthetic_images /= (1 - (-1))

# Display 15 randomly sampled images
fig = plt.figure(figsize=(10, 10), tight_layout=True)
for i in range(25):
    rand_idx = np.random.randint(0, len(synthetic_images))
    ax = fig.add_subplot(5, 5, i+1)
    ax.imshow(synthetic_images[rand_idx])
    ax.axis('off')
plt.show()

# Metrics to see accuracy 
## FID

In [None]:
# import relevant libraries to calculate FID
import numpy as np
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.image import resize
from scipy.linalg import sqrtm
import math
from tqdm.notebook import tqdm
import tensorflow as tf

In [None]:
class GAN_FID:
    def __init__(self, batch_size, latent_dim, sample_size, buffer_size):
        # setting Hyperparameters
        self.BATCH_SIZE = batch_size
        self.LATENT_DIM = latent_dim
        self.SAMPLE_SIZE = sample_size
        self.BUFFER_SIZE = buffer_size

        # setting Constants
        self.INCEPTION_SHAPE = (299, 299, 3)
        self.INCEPTION = InceptionV3(include_top=False, pooling='avg', input_shape=self.INCEPTION_SHAPE)
        self.AUTO = tf.data.AUTOTUNE

    # method to set generator and training data
    def fit(self, generator, train_data):
        # setting generative model and original data used for training 
        self.GENERATOR = generator
        self.train_data = train_data

        # Preparing Real Images
        trainloader = tf.data.Dataset.from_tensor_slices((self.train_data))
        trainloader = (
            trainloader
            .shuffle(self.BUFFER_SIZE)
            .map(self.__resize_and_preprocess, num_parallel_calls=self.AUTO)
            .batch(self.BATCH_SIZE, num_parallel_calls=self.AUTO)
            .prefetch(self.AUTO)
        )
        self.trainloader = trainloader

        # Generate and prepare Synthetic Images (Fake)
        noise = tf.random.normal([self.SAMPLE_SIZE, self.LATENT_DIM])
        generated_images = self.GENERATOR(noise)
        genloader = tf.data.Dataset.from_tensor_slices(generated_images)
        genloader = (
            genloader
            .map(self.__resize_and_preprocess, num_parallel_calls=self.AUTO)
            .batch(self.BATCH_SIZE, num_parallel_calls=self.AUTO)
            .prefetch(self.AUTO)
        )
        self.genloader = genloader

        # prepare embeddings
        count = math.ceil(self.SAMPLE_SIZE/self.BATCH_SIZE)

        ## compute embeddings for real images
        print("Computing Real Image Embeddings")
        self.real_image_embeddings = self.__compute_embeddings(self.trainloader, count)

        ## compute embeddings for generated images
        print("Computing Generated Image Embeddings")
        self.generated_image_embeddings = self.__compute_embeddings(self.genloader, count)
        assert self.real_image_embeddings.shape == self.generated_image_embeddings.shape, "Embeddings are not of the same size"
        print("Computed Embeddings\tReal Images Embedding Shape: {}\tGenerated Images Embedding Shape: {}".format(
            self.real_image_embeddings.shape, 
            self.generated_image_embeddings.shape
        ))
    
    # method to produce evaluation results
    @tf.autograph.experimental.do_not_convert
    def evaluate(self):
        # calculate Frechet Inception Distance
        fid = self.__calculate_fid(self.real_image_embeddings, self.generated_image_embeddings)
        print('The computed FID score is:', fid)

        return fid

    # method to generate embeddings from inception model 
    def __compute_embeddings(self, dataloader, count):
        image_embeddings = []
        for _ in tqdm(range(count)):
            images = next(iter(dataloader))
            embeddings = self.INCEPTION.predict(images)
            image_embeddings.extend(embeddings)
        return np.array(image_embeddings)

    ## STATIC METHODS: these methods knows nothing about the class
    # static method to prepare the data before computing Inception Embeddings
    @staticmethod
    def __resize_and_preprocess(image):
        # image *= 255.0 # original image are scaled to [0, 1], scaling back to [0, 255]
        image -= -1
        image /= (1 - (-1))
        image *= 255.

        # .preprocess_input() expects an image of scale [0, 255]
        image = preprocess_input(image)
        # inception model expects an image of shape (None, 299, 299, 3)
        image = tf.image.resize(image, (299, 299), method='nearest')
        return image

    # static method to calculate frechet inception distance based on embeddings
    @staticmethod 
    def __calculate_fid(real_embeddings, generated_embeddings):
        # calculate mean and covariance statistics
        mu1, sigma1 = real_embeddings.mean(axis=0), np.cov(real_embeddings, rowvar=False)
        mu2, sigma2 = generated_embeddings.mean(axis=0), np.cov(generated_embeddings, rowvar=False)
        # calculate sum squared difference between means
        ssdiff = np.sum((mu1 - mu2)**2.0)
        # calculate sqrt of product between cov
        covmean = sqrtm(sigma1.dot(sigma2))
        # check and correct imaginary numbers from sqrt
        if np.iscomplexobj(covmean):
            covmean = covmean.real
        # calculate score
        fid = ssdiff + np.trace(sigma1 + sigma2 - 2.0 * covmean)
        return fid

In [None]:
%%time
fid_class = GAN_FID(batch_size=512, latent_dim=128, sample_size=10000, buffer_size=1024)
fid_class.fit(generator=gan.generator, train_data=x_train)
fid_score = fid_class.evaluate()

In [None]:
# summary information
gan.generator.summary()

# saving as .hdf format
gan.generator.save("GAN_Generator.h5")
assert os.path.exists('GAN_Generator.h5'), "GAN Generator does not exists"