# Training and Testing a Generative Adversarial Network

## Imports and loading in dataset

In [244]:
%load_ext autoreload
%autoreload 2

from numpy import zeros, ones, expand_dims, asarray
from numpy.random import randn, randint
import tensorflow as tf
from keras.datasets import mnist
from keras.optimizers import Adam
from keras.models import Model, load_model
from keras.layers import Input, Dense, Reshape, Flatten
from keras.layers import Conv2D, Conv2DTranspose, Concatenate
from keras.layers import LeakyReLU, Dropout, Embedding
from keras.layers import BatchNormalization, Activation
from keras import initializers, Sequential
from keras.initializers import RandomNormal
from keras.optimizers import Adam, RMSprop, SGD
from matplotlib import pyplot
from matplotlib import pyplot as plt
import numpy as np
from math import sqrt
import os
from keras.callbacks import Callback
from data_reader import DataReader
from sklearn.preprocessing import MinMaxScaler

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


### Load MNIST

In [245]:
(X_train, y_train), (_, _) = mnist.load_data()
X_train = X_train.astype(np.float32) / 127.5 - 1
X_train = np.expand_dims(X_train, axis=3)
print(X_train.shape)


(60000, 28, 28, 1)


### Load Orderbook Dataset

In [246]:
data_reader_instance = DataReader("orderbook_snapshots.csv", rows_per_orderbook=2)
data_reader_instance.read_csv()
X_train = data_reader_instance.get_data()
X_train_raw = X_train.astype(np.float32)
print(X_train_raw.shape)

(5876, 2, 2, 1)


## Preprocessing

In [247]:
def get_dataset_min_price(dataset):
    first_row_prices = dataset[:, -1, 0, 0]
    min_val = np.min(first_row_prices)
    return min_val

def get_dataset_max_price(dataset):
    last_row_prices = dataset[:, 0, 0, 0]
    max_val = np.max(last_row_prices)
    return max_val

def get_prices_range(dataset):
    min_val = get_dataset_min_price(dataset)
    max_val = get_dataset_max_price(dataset)
    range_vals = np.array([min_val, max_val])
    return range_vals

def get_dataset_min_max_quantity(dataset):
    all_quantities = dataset[:, :, 1, 0]
    max_val = np.max(all_quantities)
    min_val = np.min(all_quantities)
    return min_val, max_val

def get_quantity_range(dataset):
    min_val, max_val = get_dataset_min_max_quantity(dataset)
    range_vals = np.array([min_val, max_val])
    return range_vals

def scale_prices(dataset):
    range_vals_price = get_prices_range(dataset)
    range_vals_quantity = get_quantity_range(dataset)
    price_scaler = MinMaxScaler()
    quantity_scaler = MinMaxScaler()
    price_scaler.fit(range_vals_price[:, np.newaxis])
    quantity_scaler.fit(range_vals_quantity[:, np.newaxis])
    
    for i in range(len(dataset)):
        prices_col = dataset[i, :, 0]
        quantity_col = dataset[i, :, 1]
        prices_transformed = price_scaler.transform(prices_col)
        quantity_transformed = quantity_scaler.transform(quantity_col)
        dataset[i, :, 0] = prices_transformed
        dataset[i, :, 1] = quantity_transformed    
    return dataset
        

X_train = scale_prices(X_train_raw)
print(X_train.shape)

(5876, 2, 2, 1)


## Defining Discriminator and Generator

In [248]:
# def define_discriminator(input_dim=(28,28,1)):
#     model = Sequential([
#         Input(shape=input_dim),
#         Flatten(),
#         Dense(units=512),
#         LeakyReLU(alpha=0.2),
#         Dense(units=512),
#         LeakyReLU(alpha=0.2),
#         Dense(units=512),
#         LeakyReLU(alpha=0.2),
#         Dense(units=512),
#         LeakyReLU(alpha=0.2),
#         Dense(units=512),
#         LeakyReLU(alpha=0.2),
#         Dense(units=1, activation='linear')
#     ])
#     return model

# def define_generator(latent_dim = 64, input_dim=(28,28,1)):
#     model = Sequential([
#         Input(shape=(latent_dim,)),
#         Dense(units=256),
#         LeakyReLU(alpha=0.2),
#         Dense(units=256),
#         LeakyReLU(alpha=0.2),
#         # Dense(units=256),
#         # LeakyReLU(alpha=0.2),
#         # Dense(units=256),
#         # LeakyReLU(alpha=0.2),
#         # Dense(units=256),
#         # LeakyReLU(alpha=0.2),
#         Dense(units=input_dim[0] * input_dim[1] * input_dim[2], activation='linear'),
#         Reshape(target_shape=input_dim)
#     ])
#     return model

def define_discriminator(input_dim=(28,28,1)):
    model = Sequential([
        Input(shape=input_dim),
        Flatten(),
        Dense(units=4, activation='relu'),
        Dense(units=4, activation='relu'),
        Dense(units=4, activation='relu'),
        Dense(units=1, activation='linear')
    ])
    return model

def define_generator(latent_dim = 64, input_dim=(28,28,1)):
    model = Sequential([
        Input(shape=(latent_dim,)),
        Dense(units=latent_dim, activation='relu'),
        Dense(units=input_dim[0] * input_dim[1], activation='relu'),
        Dense(units=input_dim[0] * input_dim[1] * input_dim[2], activation='tanh'),
        # Dense(units=input_dim[0] * input_dim[1] * input_dim[2], activation='linear'),
        Reshape(target_shape=input_dim)
    ])
    return model

## Defining GAN Training Architecture

In [256]:
class GANModel(Model):
    def __init__(self, generator, discriminator, generator_latent_dim, *args, **kwargs):
        # Pass through args and kwargs to base class 
        super().__init__(*args, **kwargs)
        
        # Create attributes for gen and disc
        self.generator = generator 
        self.discriminator = discriminator
        self.generator_latent_dim = generator_latent_dim
        
        
    def compile(self, g_opt, d_opt, n_critic, LAMBDA, *args, **kwargs): 
        # Compile with base class
        super().compile(*args, **kwargs)
        
        # Create attributes for losses and optimizers
        self.g_opt = g_opt
        self.d_opt = d_opt
        self.n_critic = n_critic
        self.LAMBDA = LAMBDA
        
    def get_generator(self):
        return self.generator
    
    def get_discriminator(self):
        return self.discriminator
    
    # returns 2D array
    # n_samples number of rows with each row having latent_dim number of random noise.
    def generate_latent_points(self, n_samples):
        # Returns a np array of dimension (X,) meaning 1D array. 
        x_input = randn(self.generator_latent_dim * n_samples)
        
        # Returns a 2D np array. 
        # Divides 1D array such that for each n_samples, there are latent_dim random numbers
        z_input = x_input.reshape(n_samples, self.generator_latent_dim)
        return z_input
    
    def gradient_penalty(self, real_images, fake_images):
        batch_size = real_images.shape[0]
        epsilon = tf.random.uniform(shape=[batch_size, 1, 1, 1], minval=0.0, maxval=1.0)
        interpolated_images = epsilon * tf.dtypes.cast(real_images, tf.float32) + ((1 - epsilon) * fake_images)
        
        with tf.GradientTape() as penalty_tape:
            penalty_tape.watch(interpolated_images)
            yhat_interpolated = self.discriminator(interpolated_images, training=True)
            
        p_grad = penalty_tape.gradient(yhat_interpolated, interpolated_images)
        grad_norms = tf.sqrt(tf.reduce_sum(tf.square(p_grad), axis=[1, 2, 3]))
        gradient_penalty = tf.reduce_mean(tf.square(grad_norms-1.0))
        return gradient_penalty
    
    def rule_penalty(self, fake_images):
        rule_penalty = tf.reduce_mean(tf.nn.relu(fake_images[:, 1, 0] - fake_images[:,0,0]))
        return rule_penalty
    
    def wasserstein_loss_discriminator(self, fake_pred, real_pred):
        return -(tf.reduce_mean(real_pred) - tf.reduce_mean(fake_pred))
    
    def wasserstein_loss_generator(self, fake_pred):
        return -tf.reduce_mean(fake_pred)
        
    def train_step(self, batch):
        batch_size = batch.shape[0]
        real_images = batch
        fake_images = self.generator(self.generate_latent_points(batch_size), training=False)
        
        for _ in range(self.n_critic):
            # Train the discriminator
            with tf.GradientTape() as d_tape:
                yhat_real = self.discriminator(real_images, training=True) 
                yhat_fake = self.discriminator(fake_images, training=True)
                gradient_penalty = self.gradient_penalty(real_images, fake_images)
                # rule_penalty = self.rule_penalty(fake_images)
                
                # Calculate loss - Wassertstein
                total_d_loss = self.wasserstein_loss_discriminator(yhat_fake, yhat_real) + self.LAMBDA * gradient_penalty
                
            # Apply backpropagation to weights
            d_grad = d_tape.gradient(total_d_loss, self.discriminator.trainable_variables) 
            self.d_opt.apply_gradients(zip(d_grad, self.discriminator.trainable_variables))
        
        # Train the generator
        with tf.GradientTape() as g_tape:
            gen_images = self.generator(self.generate_latent_points(batch_size), training=True)
            predicted_labels = self.discriminator(gen_images, training=False)
        
            # Calculate loss - trick to training to fake out the discriminator
            total_g_loss = self.wasserstein_loss_generator(predicted_labels)
            
        # Apply backpropagation to weights
        g_grad = g_tape.gradient(total_g_loss, self.generator.trainable_variables)
        self.g_opt.apply_gradients(zip(g_grad, self.generator.trainable_variables))
        
        return {"d_loss":total_d_loss, "g_loss":total_g_loss}

## Functions

In [253]:

# returns 2D array
# n_samples number of rows with each row having latent_dim number of random noise.
def generate_latent_points(latent_dim, n_samples):
    # Returns a np array of dimension (X,) meaning 1D array. 
    x_input = randn(latent_dim * n_samples)
    
    # Returns a 2D np array. 
    # Divides 1D array such that for each n_samples, there are latent_dim random numbers
    z_input = x_input.reshape(n_samples, latent_dim)
    return z_input

# Chooses n_samples number of samples from training set
# Gets labels alongside with same dimension.
def generate_real_samples(X_train, n_samples):
    #Returns a np array of size n_samples repr. indices of chosen elements for next batch
    ix = randint(0, X_train.shape[0], n_samples)
    X = X_train[ix]
    y = ones((n_samples, 1))
    
    # X is of dimension (n_samples, 28, 28, 1)
    # y is of dimension (n_samples, 1)
    return X, y

# generates n_samples from generator
# takes in 2D array of latent points aswell
def generate_fake_samples(generator, latent_dim, n_samples):
    z_input = generate_latent_points(latent_dim, n_samples)
    outputs = generator.predict(z_input)  
    y = zeros((n_samples, 1))
    return outputs, y

def get_GAN_training_network(generator, discriminator, latent_dim, generator_learning_rate = 0.002, discriminator_learning_rate = 0.002, n_critic=1, adam_beta_1 = 0, adam_beta_2 = 0.9, LAMBDA=1):
    gan_model = GANModel(generator=generator, discriminator=discriminator, generator_latent_dim=latent_dim)
    g_opt = Adam(learning_rate=generator_learning_rate, beta_1=adam_beta_1, beta_2=adam_beta_2)
    d_opt = Adam(learning_rate=discriminator_learning_rate, beta_1=adam_beta_1, beta_2=adam_beta_2)
    gan_model.compile(g_opt, d_opt, n_critic=n_critic, LAMBDA=LAMBDA)
    return gan_model

def get_generator_and_discriminator(latent_dim, input_dim):
    discriminator = define_discriminator(input_dim=input_dim)
    generator = define_generator(latent_dim=latent_dim, input_dim=input_dim)
    return discriminator, generator

## Training the GAN

### Get Discriminator and Generator

In [257]:
discriminator, generator = get_generator_and_discriminator(latent_dim=100, input_dim=(2,2,1))
gan_model = get_GAN_training_network(generator=generator, 
                                     discriminator=discriminator, 
                                     latent_dim=100,
                                     generator_learning_rate=0.0001, 
                                     discriminator_learning_rate=0.0001,
                                     n_critic=5,
                                     adam_beta_1=0,
                                     adam_beta_2=0.9,
                                     LAMBDA=10
                                    )

### Train GAN

In [258]:
hist = gan_model.fit(X_train, epochs=500, batch_size=52)

Epoch 1/500
Discriminator Gradients:
dense_341/kernel:0: Tensor("Mean_16:0", shape=(), dtype=float32)
dense_341/bias:0: Tensor("Mean_17:0", shape=(), dtype=float32)
dense_342/kernel:0: Tensor("Mean_18:0", shape=(), dtype=float32)
dense_342/bias:0: Tensor("Mean_19:0", shape=(), dtype=float32)
dense_343/kernel:0: Tensor("Mean_20:0", shape=(), dtype=float32)
dense_343/bias:0: Tensor("Mean_21:0", shape=(), dtype=float32)
dense_344/kernel:0: Tensor("Mean_22:0", shape=(), dtype=float32)
dense_344/bias:0: Tensor("Mean_23:0", shape=(), dtype=float32)
Generator Gradients:
dense_345/kernel:0: Tensor("Mean_24:0", shape=(), dtype=float32)
dense_345/bias:0: Tensor("Mean_25:0", shape=(), dtype=float32)
dense_346/kernel:0: Tensor("Mean_26:0", shape=(), dtype=float32)
dense_346/bias:0: Tensor("Mean_27:0", shape=(), dtype=float32)
dense_347/kernel:0: Tensor("Mean_28:0", shape=(), dtype=float32)
dense_347/bias:0: Tensor("Mean_29:0", shape=(), dtype=float32)
Discriminator Gradients:
dense_341/kernel:0: T

2024-01-30 21:55:49.620017: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500

KeyboardInterrupt: 

### Review Training History

In [None]:
pyplot.suptitle('Loss')
pyplot.plot(hist.history['d_loss'], label='d_loss')
pyplot.plot(hist.history['g_loss'], label='g_loss')
pyplot.legend()
pyplot.show()

### Get the generator from the GAN

In [None]:
generator = gan_model.get_generator()

## Print resulting images

In [None]:
X_real, y_real = generate_real_samples(X_train=X_train, n_samples=64)
X_fake, y_fake = generate_fake_samples(generator=generator, latent_dim=100, n_samples=64)
for i in range(64):
    print("real image from dataset")
    pyplot.imshow(X_real[i])
    pyplot.show()
    print("generated image of number")
    pyplot.imshow(X_fake[i])
    pyplot.show()

In [None]:
X_real, y_real = generate_real_samples(X_train=X_train, n_samples=1000)
X_fake, y_fake = generate_fake_samples(generator=generator, latent_dim=100, n_samples=1000)

def printOB(OB):
    print("ask: ", OB[0][0][0], " ", OB[0][1][0])
    print("bid: ", OB[1][0][0], " ", OB[1][1][0])
    print()
    
asks_fake = X_fake[:, 0, 0, 0]
asks_real = X_real[:, 0, 0, 0]
bids_fake = X_fake[:, 1, 0, 0]
bids_real = X_real[:, 1, 0, 0]
asks_quantity_real = X_real[:, 0, 1, 0]
bids_quantity_real = X_real[:, 1, 1, 0]

positions = [1, 2, 3, 4]  # X-coordinates for the boxplots
plt.boxplot([asks_fake, asks_real, bids_fake, bids_real], positions=positions, labels=['fake asks', 'real asks', 'fake bids', 'real bids'])
plt.show()
positions = [1, 2]
plt.boxplot([asks_real, bids_real], positions=positions, labels=['real asks', 'real bids'])
plt.show()
plt.boxplot([asks_quantity_real, bids_quantity_real], positions=positions, labels=['real asks quantity', 'real bids quantity'])
plt.show()
plt.hist(asks_real)
plt.show()
plt.hist(bids_real)
plt.show()

for i in range(len(X_fake)):
    print("fake")
    printOB(X_fake[i])
    print("real")
    printOB(X_real[i])

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Assuming X is your 4-dimensional array with shape (1000, 2, 2, 1)
# Replace this with your actual data
X = np.random.rand(1000, 2, 2, 1)
scaler = MinMaxScaler()

for i in range(len(X)):
    ascolumns = X[i].reshape(4,1)
    t = scaler.fit_transform(ascolumns)
    transformed = t.reshape((2,2,1))
    X[i] = transformed
    
transformed = scaler.fit_transform(X_train[:, 0, 0, 0].reshape(-1,1))
original_restored = scaler.inverse_transform(transformed)

print(X_train[:, 0, 0, 0])
print(transformed.flatten())
print(original_restored.flatten())

