In [None]:
# Regular
import os
import math
import glob
import numpy as np
import matplotlib.pyplot as plt

from numpy.random import randint

from sklearn.model_selection import train_test_split

In [None]:
# DNN
import tensorflow as tf

from keras.optimizers import Adam
from keras.layers import Dense
from keras.layers import Conv2D
from keras.layers import Dropout
from keras.layers import LeakyReLU
from keras.utils.vis_utils import plot_model
from keras.layers import Conv2DTranspose
from keras.layers import Reshape
from keras import backend

from keras.layers import BatchNormalization
from keras.initializers import RandomNormal
from keras.constraints import Constraint


# Generate a Distribution (Normal Distribution)

Remember that the guassian curve:

$$g(x) = \dfrac{1}{\sigma \sqrt{2 \pi}} \ exp \left(-\dfrac{1}{2} \dfrac{(x - \mu)^2}{\sigma^2} \right) $$

(***Note exp(x) is equivalent to $e^x$)

such that: 

$\sigma =$ standard deviation 

$\mu =$ mean

For our application, let $\sigma =$ 1 and $\mu =$ 5. While the domain of $x \in [0, 10]$. This simplfies to:

$$g(x) = \dfrac{1}{\sqrt{2 \pi}} \ exp \left(-\dfrac{1}{2} (x - 5)^2 \right) $$

We will use the distrbution as our data and see how a GAN matches these points.


In [None]:
def createGuassianDistributionPoints(
    sigma=1, 
    mu=5,
    interval=[0, 10],
    points=10
):
    """
    Guassian Distribution function. Note that mu and sigma are NOT protected. So have a mean past the interval
    will break the function.
    """
    y_data = [];
    x_data = [];
    distance = abs(interval[0]) + abs(interval[1]);
    step = float(distance) / float(points);
    x = interval[0];
    
    for point in range(points):
        y = (1/(sigma * math.sqrt(2 * math.pi))) * (math.exp(-1/2 * ( (x - mu)**2 / (sigma)**2 )));
        x_data.append(x);
        y_data.append(y);
        x += step;
        
    return x_data, y_data

# Lets create our data distribution of 60,000 points on the interval mean of 5
X_curve, y_curve = createGuassianDistributionPoints(points=100);

X = np.array(X_curve);
y = np.array(y_curve);


In [None]:
# Plot to see the Gaussian
def plotCurve(X, y, title="Curve", xlabel="Steps", ylabel="Value", real=False, save=False, save_path="", epoch=0):
    x_axis = X
    y_axis = y
    
    if (real):
        x_real = X_curve
        y_real = y_curve
        plt.plot(x_real, y_real, label="Expected")

    plt.plot(x_axis, y_axis, label="Plotted")
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.legend()
    if (save):
        plt.title("Training in epoch: " + str(epoch))
        plt.savefig(os.path.join(save_path, str(epoch) + '.png'));
    else:
        plt.title(title)
    plt.show()
    
plotCurve(X_curve, y_curve, title="Gaussian Curve");

# Dataset

Now we want to train a GAN to generate Guassian Distrbutions of our constraints listed earlier. We will make 60,0000 samples (Thats 6,000,000 points to go through!) to have it learn this curve!

In [None]:
def createGaussianDataset(
    sigma=1, 
    mu=5,
    interval=[0, 10],
    points=10,
    times=60000
):
    dataset = [];
    
    for time in range(times):
        X_curve, Y_curve = createGuassianDistributionPoints(
            sigma = sigma,
            mu = mu,
            interval = interval,
            points = points
        );
        dataset.append(Y_curve);
    
    return np.array(dataset);

X_Row = createGaussianDataset(points=100);
print(X_Row.shape)
print(X_Row);

# Some Tensor Complications (Matrix in this case as we are not extending our matrix to Tensor even though it is one)

For Simplicity Sake, we want to train our data in only 2D dimensions so that the its much easier for the data to fit for visualization purposes, thus we must modify our Gaussian Dataset as well.

If you recall your linear algebra:
$$
\begin{bmatrix}
1 & 2 & 3\\
1 & 2 & 3
\end{bmatrix}
$$

If you read column wise, that will be a vector in 2D space.

Thus, we want to do the same such that:
$$
\begin{bmatrix}
x_1 & x_2 & \dots & x_m\\
y_1 & y_2 & \dots & y_m
\end{bmatrix}
$$

Creating a $m \times 2$ matrix. Using connected layers, we will feed this matrix. The code below is just used to swap so we can see if there is a difference on learning.

In [None]:
# def reshapeColumnMajor(curves):
#     """
#     This is incase we want to swap the m x n matrix
#     """
#     data = [];
    
#     for curve in curves:
#         c = [];
        
#         for point_index in range(len(curve[0])):
#             c.append([
#                 curve[0][point_index],
#                 curve[1][point_index]
#             ]);
        
#         data.append(c);
            
#     return np.array(data);

# X_Column = reshapeColumnMajor(X_Row);
# print(X_Column.shape);
# print(X_Column[0]);

# GANS Architecture: Components

Since our data is purely numerical. We will use dense net (FC) and keras for ease. Its interesting to consider the theory of GAN's, but we will do that after :^)! We will use Tensorflow Library to do this demo!

To visualize, we have 60,000 curves. We will have the GAN learning the mapping of each curve to then later learn how to generate Guassian Distributons!

### Discriminator Data Sampling Generator

In [None]:
def sample_real_samples(dataset, n_samples):
    """
    Parameters
    --------------
    real_dataset: dataset with the real data
    n_samples: amount of real images to sample from
    
    Returns
    --------------
    X: samples of n images in a list
    Y: labels of (1's) for true images (Binary Classification)
    """
    if (isinstance(dataset, list)):
        dataset = np.asarray(dataset);
        
    random_num = randint(0, dataset.shape[0], n_samples);
    X = dataset[random_num];
    y = np.ones((n_samples, 1));
    
    return X, y

### Discriminator

In [None]:
def simpleDiscriminator(in_shape=100):
    """
    """
    model = tf.keras.Sequential();
    
    model.add(Dense(100, input_dim=in_shape, activation='relu')) 
    model.add(Dense(1, activation='sigmoid')) # Since the decision is binary (Real | Fake), we use sigmoid
    
    opt = Adam(learning_rate =0.001)
    model.compile(
        loss='binary_crossentropy', 
        optimizer = opt, 
        metrics=['accuracy']
    )
    
    return model
    

### Generator

In [None]:
def simpleGenerator(in_shape=100):
    model = tf.keras.Sequential();
    
    model.add(Dense(100, input_dim=in_shape, activation="relu"))
    model.add(Dense(100, input_dim=in_shape, activation="relu"))
    model.add(Dense(100)) 
    
    return model;

### Summary of Models

AIUSDGHI

In [None]:
discriminator = simpleDiscriminator();
generator = simpleGenerator();

In [None]:
discriminator.summary();
generator.summary();

### Latent Space

In [None]:
def latentDimensionalGenerator(latent_dimensions, n_samples, randomGaussian = False):
    data = [];
    
    for sample in range(n_samples):
#         x_input_0 = np.random.choice(X_Row[0][0], latent_dimensions)
#         x_input_1 = np.random.choice(X_Row[0][1], latent_dimensions)
        x_input_0 = np.random.randn(latent_dimensions); # Points sampled from a normalized distribution.
        data.append(x_input_0);
#         x_input_1 = np.random.randn(latent_dimensions) + 5; # Points sampled from a normalized distribution.
#         data.append([x_input_0, x_input_1]);
        
    return np.array(data)

In [None]:
# Generator production
def generate_samples(g_model, latent_dim, n_samples):
    x_input = latentDimensionalGenerator(latent_dim, n_samples)  # generate points in a latent space
    X = g_model.predict(x_input)
    y = np.zeros((n_samples, 1))  # create 'fake' class labels (0)
    return X, y

### Visualizing the latent dimensional space in 2D

In [None]:
# k = latentDimensionalGenerator(100, 10)
# print(k)

fake_X, fake_y = generate_samples(generator, 100, 10);
print(fake_X.shape)

In [None]:
# def plotFakeData(fake_data):
#     for fake_curve in fake_data:
#         plotCurve(fake_curve[0], fake_curve[1]);
        
# plotFakeData(fake_X);

def plotFakeData1D(fake_data, real=False, save=False, save_path="", epoch=0):
    for fake_curve in fake_data:
        if (real):
            plotCurve(X_curve, fake_curve, real=True, save=True, save_path=save_path, epoch=epoch);
        else:
            plotCurve(X_curve, fake_curve, real = False);
        
plotFakeData1D(fake_X, real=False);

# GAN Architecture: Putting it together

aspodkj

In [None]:
def define_gan(generator, discriminator):
    discriminator.trainable = False # We set the discriminator as not trainable so the generator updates
    model = tf.keras.Sequential() 
    
    model.add(generator)
    model.add(discriminator)
    
    opt = Adam(learning_rate = 0.001)
    model.compile(loss='binary_crossentropy', optimizer=opt) # Generator will train on this loss
    return model

# Evaluation Metrics

In [None]:
def summarize_performance(epoch, g_model, d_model, dataset, latent_dim, n_samples, save_path=""):
    # Real Images based on discriminator
    X_real, y_real = sample_real_samples(dataset, n_samples)
    _, acc_real = d_model.evaluate(X_real, y_real, verbose=0)
    
    # Fake Images based on discriminator
    x_fake, y_fake = generate_samples(g_model, latent_dim, n_samples)
    _, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0)
    
    print("============== CURVE GENERATION ON EPOCH", epoch,"==============");
    
    if (save_path != ""):
        plotFakeData1D(x_fake, real=True, save=True, save_path=save_path, epoch=epoch);
    else:
        plotFakeData1D(x_fake, real=True);
    
    # summarize discriminator performance
    print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real*100, acc_fake*100));

# GAN Training

asdasd

In [None]:
# train the generator and discriminator
def train_gan(g_model, d_model, gan_model, training_data, latent_dim, n_epochs, n_batch, save_path=""):
    d1Loss = [];
    d2Loss = [];
    gLoss = [];
    
    half_batch = int(n_batch / 2);
    
    for i in range(n_epochs):                
        # Real Image Discriminator Training
        X_real, y_real = sample_real_samples(training_data, half_batch)
        d_loss1, _ = d_model.train_on_batch(X_real, y_real) # Training on real

        # Fake Image Discriminator Training
        X_fake, y_fake = generate_samples(g_model, latent_dim, half_batch)
        d_loss2, _ = d_model.train_on_batch(X_fake, y_fake) # Training on fakes

        # Create a latent space and inverted labels
        X_gan = latentDimensionalGenerator(latent_dim, n_batch)
        y_gan = np.ones((n_batch, 1)) # Pretend that that they are all real.

        # Update the generator via the discriminator's error
        g_loss = gan_model.train_on_batch(X_gan, y_gan)

        # summarize loss on this batch
        print('>%d, d1=%.3f, d2=%.3f g=%.3f' % (i+1, d_loss1, d_loss2, g_loss))
        summarize_performance(i, g_model, d_model, training_data, latent_dim, 1, save_path)
        
        d1Loss.append(d_loss1);
        d2Loss.append(d_loss2);
        gLoss.append(g_loss);
        
    return d1Loss, d2Loss, gLoss;

In [None]:
latent_dim = 100;
gan_model = define_gan(generator, discriminator);

In [None]:
image_save_path = "./images/"

if not os.path.exists(image_save_path):
    os.makedirs(image_save_path);

In [None]:
n_epochs = 2000;

#Training
d1, d2, gloss = train_gan(
    generator, 
    discriminator, 
    gan_model, 
    X_Row, 
    latent_dim, 
    n_epochs, # n_epochs
    32,  # batch size
    save_path = image_save_path
);

In [None]:
epochs = list(range(n_epochs + 1));
popping = epochs.pop(0);

In [None]:
plotCurve(epochs, d1, title="d1 loss");
plotCurve(epochs, d2, title="d2 loss");
plotCurve(epochs, gloss, title="GAN Loss");

## Some Parameters to keep in mind

Discriminator -> 100 dense (Sigmoid output)
Generator -> 100 dense (No activation Output)

trained for about 3000 epochs for good convergence on 60,000 of the same Gaussian Curves

Discriminator -> 
100 dense (RELU)
(Sigmoid output)

Generator -> 
100 dense (RELU)
100 dense (RELU)
(No activation Output)

trained for about 2000 epochs for good convergence on 60,000 of the same Gaussian Curves

# Save as GIF

In [None]:
# Quick n' dirty way of saving to GIF
import imageio.v2 as imageio

input_folder = "./images/"
output_folder = './Movie_Data/';

if not os.path.exists(output_folder):
    os.makedirs(output_folder);

images = []
image_name_arr_out = glob.glob(os.path.join(input_folder, "*.png")) + glob.glob(os.path.join(input_folder, "*.tif")) + glob.glob(os.path.join(input_folder, "*.jpg"));

for filename in sorted(image_name_arr_out, key = lambda x:x[0:]):
    images.append(imageio.imread(filename))
imageio.mimsave(os.path.join(output_folder, "GAN.gif"), images);