In [None]:
%matplotlib inline

from tensorflow.keras.models import Sequential
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import Input
from tensorflow.keras.losses import BinaryCrossentropy

#JIT for the data generation.
from numba import jit

from IPython import display

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt

# Data Gen

In [None]:
@jit
def Kernel(x, x0):
    sigma = 0.8
    protonFraction = 0.4
    norm = protonFraction/(np.sqrt(2.*np.pi)*sigma)
    return(norm*np.exp(-(x - x0)**2./(2.*sigma**2.)))

@jit
def FakeKernel(x, x0, s):
    sigma = s
    protonFraction = 0.4
    norm = protonFraction/(np.sqrt(2.*np.pi)*sigma)
    return(norm*np.exp(-(x - x0)**2./(2.*sigma**2.)))

In [None]:
@jit
def test_data_gen(fakeKernel=False, sigma=0.4):
    A = 197
    yBeam = 5.36
    slope = 0.5
    sigmaEtas = 0.2
    
    # generate input data
    nBaryons = np.random.randint(0, 2*A)
    randX = np.random.uniform(0, 1, size=nBaryons)
    etasBaryon = 1./slope*np.arcsinh((2.*randX - 1)*np.sinh(slope*yBeam))
    etasArr = np.linspace(-6.4, 6.4, 128)
    dNBdetas = np.zeros(len(etasArr))
    norm = 1./(np.sqrt(2.*np.pi)*sigmaEtas)
    for iB in etasBaryon:
        dNBdetas += norm*np.exp(-(etasArr - iB)**2./(2.*sigmaEtas**2.))
    
    # generate test data with convolution with a kernel
    dNpdy = np.zeros(len(etasArr))
    detas = etasArr[1] - etasArr[0]
    for i in range(len(etasArr)):
        dNpdy[i] = sum(Kernel(etasArr, etasArr[i])*dNBdetas)*detas
    
    if fakeKernel:
        dNBdetasFake = np.random.uniform(0.0, dNBdetas.max(), size=len(etasArr))
        dNpdyFake = np.zeros(len(etasArr))
        detas = etasArr[1] - etasArr[0]
        for i in range(len(etasArr)):
            dNpdyFake[i] = sum(FakeKernel(etasArr, etasArr[i], sigma)*dNBdetas)*detas
            
        return(etasArr, dNBdetas, dNpdy, dNBdetas, dNpdyFake)
    else:
        # generate fake data with random noise
        dNBdetasFake = np.random.uniform(0.0, dNBdetas.max(), size=len(etasArr))
        dNpdyFake = np.random.uniform(0.0, dNpdy.max(), size=len(etasArr))

        return(etasArr, dNBdetas, dNpdy, dNBdetasFake, dNpdyFake)

In [None]:
#Depth defines the number of blocks of data that will be returned per "iev".
DEPTH = 2

def generate_data(size=128):
    real_data = []
    fake_data = []
    
    for iev in range( size ):
        x, y1, y2, y3, y4 = test_data_gen(fakeKernel=False)
        
        # real data - Block
        x = y2
        real_data.append(x)
        
        # fake data: random - Block
        x = y4
        fake_data.append(x)
                
    return(np.array(real_data, dtype=np.float32), np.array(fake_data, dtype=np.float32))

# Generator and Discriminator Models

In [None]:
#Global Constants

BATCH_SIZE = 128

#Discriminator
DROPOUT_RATE = 0.2
RELU_ALPHA=0.2

#Optimizer Parameters
BETA_1 = 0.9
BETA_2 = 0.999
EPSILON = 1e-07
LEARNING_RATE=0.0005

In [None]:
def define_discriminator(dimShape=(128,1)):
    model = Sequential([
        layers.Input(shape=dimShape),
        
        layers.Conv1D(1, 7, strides = 2, padding="same"),
        layers.LeakyReLU(RELU_ALPHA),
        layers.BatchNormalization(),
        
        layers.Conv1D(8, 7, strides = 2, padding="same"),
        layers.LeakyReLU(RELU_ALPHA),
        layers.BatchNormalization(),
        
        layers.Conv1D(16, 5, strides = 2, padding="same"),
        layers.LeakyReLU(RELU_ALPHA),
        layers.BatchNormalization(),
        
        layers.Flatten(),
        layers.Dropout(DROPOUT_RATE),
        
        layers.Dense(1, activation="sigmoid")
        
    ], name="Discriminator")

    return(model)

In [None]:
D = define_discriminator()

D.summary()

In [None]:
def graph_samples(epoch=None, disc_loss=None):
    display.clear_output(wait=True)
    if epoch:
        print("Epoch: " + str(epoch))

    if disc_loss:
        real_data, fake_data = generate_data(1024)
        
        combined_data = tf.concat([real_data, fake_data], axis=0)
        
        tf.random.shuffle(combined_data)
        
        predictions = D.predict(combined_data)
        
        fig = plt.figure(constrained_layout=True, figsize=(25,5))
        ax = fig.subplots(1,2)
        ax[0].plot(np.arange(0, len(disc_loss), 1, int ), disc_loss, label="Average Discriminator Loss(BCE)")
        ax[0].set_ylabel("Average Loss")
        ax[0].set_xlabel("Epoch Count")
        ax[0].set_title("Average Model Losses")
        ax[0].legend()
        
        ax[1].hist(predictions.reshape(2048), 100)
        ax[1].set_xlabel("Sample Prediction")
        ax[1].set_ylabel("Sample Count")
        ax[1].set_title("Predictions")
        plt.xlim([-0.05, 1.05])
        
        plt.show()

# Training Functions

In [None]:
d_cross_entropy = BinaryCrossentropy()

discriminator_optimizer = Adam(learning_rate = LEARNING_RATE, beta_1=BETA_1, beta_2=BETA_2, epsilon=EPSILON)

In [None]:
@tf.function
def train_step(real_data, fake_data):
    with tf.GradientTape() as disc_tape:
        real = D(real_data, training=True)
        fake = D(fake_data, training=True)
        
        predictions = tf.concat([real, fake], axis=0)
        
        real_labels = tf.concat([tf.ones_like(real), tf.zeros_like(fake)], axis=0)
        
        #Add a little noise
        noise_real = 0.15*tf.random.uniform(tf.shape(real))
        noise_fake = -0.15*tf.random.uniform(tf.shape(fake))
        real_labels += tf.concat([noise_real, noise_fake], axis=0)
        
        d_loss = d_cross_entropy(real_labels, predictions)
    
    gradients_of_discriminator = disc_tape.gradient(d_loss, D.trainable_variables)
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, D.trainable_variables))
    
    return d_loss

In [None]:
def train(epochs=100000):
    total_disc_loss = []
    average_disc_loss = []
    
    for epoch in range(epochs):
        display.clear_output(wait=True)
        real_data, fake_data = generate_data(size=BATCH_SIZE)
        disc_loss = train_step(real_data, fake_data)
        
        total_disc_loss.append(disc_loss)
        average_disc_loss.append((1/(epoch+1))*np.sum(total_disc_loss))
        
        #Only use this for looking at data, when training for real, comment this out. This function is very hefty.
        if (epoch % 50) == 0:
            graph_samples(epoch, average_disc_loss)

# Test and Train Zone

In [None]:
train()