# Spectralgen Code

Code related to the work "J.J. García-Esteban, J.C. Cuevas, J. Bravo-Abad, "Generative adversarial networks for data-scarce spectral applications in the physical sciences”, submitted for publication (2023)."

Contains:

- code used to create and train the networks described. 

- code used to create some of the graphs in the paper.

## General imports

Run this code to import the relevant libraries, load and prepare the datasets and define the evaluation metrics used in the paper.

In [None]:
# Import relevant libraries

import tensorflow as tf

from keras import *
from keras.models import Sequential
from keras.layers import Dense

import numpy as np

import matplotlib.pyplot as plt # for plotting
import matplotlib
matplotlib.rcParams['figure.dpi']=300 # highres display
plt.rcParams["figure.figsize"] = (20,15)
matplotlib.rcParams.update({'font.size': 30})
%matplotlib inline

In [None]:
# Data loading

# Total amount of data: 6561 examples

label_path = '/Users/usuario/Desktop/GAN_multilayered/labels8metal.csv'
data_path = '/Users/usuario/Desktop/GAN_multilayered/data8metal.csv'

dataindex_raw = np.genfromtxt(label_path,dtype="float32") # RAW indices
dataindex = dataindex_raw[:,[1,2,3,4,5,6,7,8]]
datafile = np.genfromtxt(data_path,dtype="float32") # RAW spectra

# Random_index

np.random.seed(42)

random_index = np.linspace(0,dataindex.shape[0]-1,dataindex.shape[0])
np.random.shuffle(random_index)
      
# Normalization

input_mean = np.mean(np.log10(dataindex),axis=0)
input_std = np.std(np.log10(dataindex),axis=0)

data_mean = np.mean(np.log10(datafile),axis=0)
data_std = np.std(np.log10(datafile),axis=0)

# Preallocate the sets

# Target data

y_target = np.zeros([datafile.shape[0],datafile.shape[1]]) 
y_target_rand = np.zeros([datafile.shape[0],datafile.shape[1]])

# Input data

y_in = np.zeros([dataindex.shape[0],dataindex.shape[1]])
y_in_rand = np.zeros([dataindex.shape[0],dataindex.shape[1]])

# Populate the sets

for i in range(dataindex.shape[0]):

    index = int(random_index[i])
    
    y_in[i,:] = (np.log10(dataindex[i,:])-input_mean)/input_std
    y_target[i,:] = (np.log10(datafile[i,:])-data_mean)/data_std
    
    y_in_rand[i,:] = (np.log10(dataindex[index,:])-input_mean)/input_std
    y_target_rand[i,:] = (np.log10(datafile[index,:])-data_mean)/data_std

# Create the train and val sets

val = 0.2 # Validation fraction [0,1]

train_index = np.linspace(0,np.int((1-val)*datafile.shape[0]-1),np.int((1-val)*datafile.shape[0])).astype(int)
val_index = np.linspace(np.int((1-val)*datafile.shape[0]),datafile.shape[0]-1,np.int(val*datafile.shape[0])+1).astype(int)

y_in_train = y_in_rand[train_index,:]
y_in_val = y_in_rand[val_index,:]

y_target_train = y_target_rand[train_index,:]
y_target_val = y_target_rand[val_index,:]

In [None]:
# Evaluation metrics

# FFNN

def mean_relative_abs_error_pointwise(net, inputs, target):
    
    Npoints = target.shape[1]
    Nexamples = target.shape[0]
    
    error = np.zeros([1,1])
    
    for i in range(Nexamples):
        
        fake = np.reshape(net.predict_on_batch(np.reshape(inputs[i,:],(1,inputs.shape[1]))),(Npoints,))
        
        faked = 10**(fake*data_std+data_mean)
        realed = 10**(target[i,:]*data_std+data_mean)
        
        for j in range(Npoints):
            
            error = error + np.abs((faked[j]-realed[j])/realed[j])
    
    return error/(Npoints*Nexamples)*100

def integral_relative_error(net, inputs, target):
    
    Npoints = target.shape[1]
    Nexamples = target.shape[0]
    
    error = np.zeros([1,1])
    
    for i in range(Nexamples):
        
        fake = np.reshape(net.predict_on_batch(np.reshape(inputs[i,:],(1,inputs.shape[1]))),(Npoints,))
        
        faked = 10**(fake*data_std+data_mean)
        realed = 10**(target[i,:]*data_std+data_mean)
        
        error = error + np.abs((np.trapz(faked)-np.trapz(realed))/np.trapz(realed))
        
    return error/Nexamples*100

# GAN, CGAN, CWGAN

def mean_relative_abs_error_pointwise_gan(net, inputs, target):
    
    Npoints = target.shape[1]
    Nexamples = target.shape[0]
    
    error = np.zeros([1,1])

    fake = net([inputs]).numpy()
    
    for i in range(Nexamples):
        
        faked = 10**(fake[i,:]*data_std+data_mean)
        realed = 10**(target[i,:]*data_std+data_mean)
        
        for j in range(Npoints):
            
            error = error + np.abs((faked[j]-realed[j])/realed[j])
    
    return error/(Npoints*Nexamples)*100

def integral_relative_error_gan(net, inputs, target):
    
    Npoints = target.shape[1]
    Nexamples = target.shape[0]
    
    error = np.zeros([1,1])
    
    fake = net([inputs]).numpy()
    
    for i in range(Nexamples):
        
        faked = 10**(fake[i,:]*data_std+data_mean)
        realed = 10**(target[i,:]*data_std+data_mean)
                
        error = error + np.abs((np.trapz(faked)-np.trapz(realed))/np.trapz(realed))
        
    return error/Nexamples*100

## Train a Network

Run this code to create and train a network like the ones used in the paper.

In [None]:
# Create a FFNN

# Defining the network

net_o=Sequential()
net_o.add(Dense(200, input_shape=(8,), activation = 'selu'))
net_o.add(Dense(200, activation = 'selu'))
net_o.add(Dense(200, activation = 'selu'))
net_o.add(Dense(200, activation = 'selu'))
net_o.add(Dense(200, activation = 'selu'))
net_o.add(Dense(datafile.shape[1]))

# Compiling the network

opt = tf.keras.optimizers.Adam(learning_rate = 0.0003)
net_o.compile(loss='mean_squared_error', optimizer=opt, metrics=['accuracy'])

In [None]:
# Train a FFNN

batchsize = dataindex.shape[0]
batches = 20000

net2_o = net_o.fit(y_in_rand,y_target_rand,batch_size=batchsize, validation_split = val, epochs=batches, verbose = 0)
loss = net2_o.history['loss'] #recording of loss
val_loss = net2_o.history['val_loss'] #recording of val_loss

# Evaluate the FFNN

print(mean_relative_abs_error_pointwise(net_o, y_in_train, y_target_train))
print(mean_relative_abs_error_pointwise(net_o, y_in_val, y_target_val))
print(integral_relative_error(net_o, y_in_train, y_target_train))
print(integral_relative_error(net_o, y_in_val, y_target_val))

# Plot the evolution of the loss

plt.plot(loss,color='blue')
plt.plot(val_loss,color='black')
plt.title('Train and val loss')
plt.ylabel('Loss')
plt.yscale('log')
plt.xlabel('Epoch')
plt.legend(['Training', 'Validation'], loc='upper right')
plt.show()

In [None]:
# Create a CWGAN

# Step 1: generator

input_label = Input(shape = (8,))

gen = layers.Dense(50, activation = 'selu')(input_label)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(100, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(150, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(200, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(y_target_rand.shape[1], activation = 'linear')(gen)

genr = Model(input_label,gen)

# Step 2: discriminator

input_discr_spect = Input(shape = (y_target_rand.shape[1]))
input_discr_label = Input(shape = (8,))

discr_spect = layers.Dense(150, activation = 'selu')(input_discr_spect)
discr_label = layers.Dense(50, activation = 'selu')(input_discr_label)

intermediate = layers.concatenate([discr_spect,discr_label])

discr = layers.Dense(100, activation = 'selu')(intermediate)
discr = layers.Dense(50, activation = 'selu')(discr)
discr = layers.Dense(1, activation = 'linear')(discr)

discrr = Model([input_discr_spect,input_discr_label],discr)

# Step 3: Optimizers

discr_opt = tf.keras.optimizers.Adam(learning_rate=0.0003)
gen_opt = tf.keras.optimizers.Adam(learning_rate=0.0003)

# Step 4: loss functions (and gradient penalty)

def discriminator_loss(real, fake):
    real_loss = tf.reduce_mean(real)
    fake_loss = tf.reduce_mean(fake)
    return fake_loss - real_loss

def generator_loss(fake, pred):
    return -tf.reduce_mean(fake) + tf.reduce_mean(tf.math.abs(pred-tf.convert_to_tensor(y_target_train,dtype=tf.float32)))  

def gradient_penalty(real, fake, label):
    """ Calculates the gradient penalty. This loss is calculated on an interpolated image 
    and added to the discriminator loss.
    """
    # Get the interpolated example
    alpha = tf.random.normal([y_target_train.shape[0], 1], 0, 1)
    diff = fake - real
    interp = real + alpha * diff

    with tf.GradientTape() as gp_tape:
        gp_tape.watch(interpolated)
        pred = discrr([interp, label], training=True)

    grad = gp_tape.gradient(pred, [interp])[0]
    norm = tf.sqrt(tf.reduce_sum(tf.square(grad), axis=1))
    gp = tf.reduce_mean((norm - 1.0) ** 2)
    
    return gp

In [None]:
# Train a CWGAN

# Preallocate and define constants

n_epochs = 50000 # Training epochs
d_steps = 5      # Training advantage for the critic
gp_weight = 10   # Weight of the Gradient Penalty

y_in_latent = tf.convert_to_tensor(y_in_train,dtype=tf.float32)

# Train the network 

for i in range(n_epochs):
    
    # Step 1: train the discriminator, d_steps times per gen step
    
    for j in range(d_steps):
                
        with tf.GradientTape() as tape:
            
            # Generate fake results 
            fake = genr(y_in_latent, training=True)
            
            # Get the logits for the fake results
            fake_logits = discrr([fake,y_in_latent], training=True)
            
            # Get the logits for the real data
            real_logits = discrr([tf.convert_to_tensor(y_target_train,dtype=tf.float32),y_in_latent], training=True)
            
            # Calculate the discriminator loss using the fake and real logits
            d_cost = discriminator_loss(real_logits, fake_logits)
            
            # Calculate the gradient penalty
            gp = gradient_penalty(tf.convert_to_tensor(y_target_train,dtype=tf.float32), fake, y_in_latent)
            
            # Add the gp to the original loss
            d_loss = d_cost + gp * gp_weight
        
        # Get the gradients 
        d_gradient = tape.gradient(d_loss, discrr.trainable_variables)
        
        # Update the weights 
        discr_opt.apply_gradients(zip(d_gradient, discrr.trainable_variables))
        
    # Step 2: train the generator
        
    with tf.GradientTape() as tape:
        
        # Generate fake results 
        generated_curves = genr(y_in_latent, training=True)
        
        # Get the discriminator logits for fake results
        gen_logits = discrr([generated_curves,y_in_latent], training=True)
        
        # Calculate the generator loss
        g_loss = generator_loss(gen_logits, generated_curves)
        
    # Record the values of the losses
    record1[i] = record11(gen_logits, generated_curves)
    record2[i] = record22(gen_logits, generated_curves)
        
    # Get the gradients 
    gen_gradient = tape.gradient(g_loss, genr.trainable_variables)
    
    # Update the weights 
    gen_opt.apply_gradients(zip(gen_gradient, genr.trainable_variables))
    
# Evaluate the CWGAN

print(mean_relative_abs_error_pointwise_gan(genr, y_in_train, y_target_train))
print(mean_relative_abs_error_pointwise_gan(genr, y_in_val, y_target_val))
print(integral_relative_error_gan(genr, y_in_train, y_target_train))
print(integral_relative_error_gan(genr, y_in_val, y_target_val))

## Evaluate a network

Run this code to evaluate a pre-trained network (also used in next section).

In [None]:
# Load networks

gen_CGAN = models.load_model('GAN_8metal_020_CGAN.h5')
gen_CWGAN = models.load_model('GAN_8metal_020_v3g.h5')

In [None]:
# All evaluation metrics

gen_trial = gen_CWGAN

print(mean_relative_abs_error_pointwise_gan(gen_trial, y_in_train, y_target_train))
print(mean_relative_abs_error_pointwise_gan(gen_trial, y_in_val, y_target_val))
print(integral_relative_error_gan(gen_trial, y_in_train, y_target_train))
print(integral_relative_error_gan(gen_trial, y_in_val, y_target_val))

## First epochs

Run this code to generate a figure similar to figure 5, varies slightly each training run

In [None]:
# Create and train the first N epochs of a CWGAN

N = 20000 # Train for the N first steps 

# Preallocate and define constants

record1 = np.zeros(N,) # Store results
record2 = np.zeros(N,)

n_epochs = N # Training epochs
d_steps = 5     # Training advantage for the critic
gp_weight = 10  # Weight of the Gradient Penalty

y_in_latent = tf.convert_to_tensor(y_in_train,dtype=tf.float32)

# Step 1: generator

input_label = Input(shape = (8,))

gen = layers.Dense(50, activation = 'selu')(input_label)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(100, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(150, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(200, activation = 'selu')(gen)
gen = layers.Dropout(0.2)(gen)
gen = layers.Dense(y_target_rand.shape[1], activation = 'linear')(gen)

genr = Model(input_label,gen)

# Step 2: discriminator

input_discr_spect = Input(shape = (y_target_rand.shape[1]))
input_discr_label = Input(shape = (8,))

discr_spect = layers.Dense(150, activation = 'selu')(input_discr_spect)
discr_label = layers.Dense(50, activation = 'selu')(input_discr_label)

intermediate = layers.concatenate([discr_spect,discr_label])

discr = layers.Dense(100, activation = 'selu')(intermediate)
discr = layers.Dense(50, activation = 'selu')(discr)
discr = layers.Dense(1, activation = 'linear')(discr)

discrr = Model([input_discr_spect,input_discr_label],discr)

# Step 3: Optimizers

discr_opt = tf.keras.optimizers.Adam(learning_rate=0.0003)
gen_opt = tf.keras.optimizers.Adam(learning_rate=0.0003)

# Step 4: loss functions (and gradient penalty)

def discriminator_loss(real, fake):
    real_loss = tf.reduce_mean(real)
    fake_loss = tf.reduce_mean(fake)
    return fake_loss - real_loss

def generator_loss(fake, pred):
    return -tf.reduce_mean(fake) + tf.reduce_mean(tf.math.abs(pred-tf.convert_to_tensor(y_target_train,dtype=tf.float32)))

def record11(fake, pred):
    return -tf.reduce_mean(fake) 

def record22(fake, pred):
    return tf.reduce_mean(tf.math.abs(pred-tf.convert_to_tensor(y_target_train,dtype=tf.float32)))  

def gradient_penalty(real, fake, label):
    """ Calculates the gradient penalty. This loss is calculated on an interpolated image 
    and added to the discriminator loss.
    """
    # Get the interpolated example
    alpha = tf.random.normal([y_target_train.shape[0], 1], 0, 1)
    diff = fake - real
    interp = real + alpha * diff

    with tf.GradientTape() as gp_tape:
        gp_tape.watch(interpolated)
        pred = discrr([interp, label], training=True)

    grad = gp_tape.gradient(pred, [interp])[0]
    norm = tf.sqrt(tf.reduce_sum(tf.square(grad), axis=1))
    gp = tf.reduce_mean((norm - 1.0) ** 2)
    
    return gp

# Train the network 

for i in range(n_epochs):
    
    # Step 1: train the discriminator, d_steps times per gen step
    
    for j in range(d_steps):
                
        with tf.GradientTape() as tape:
            
            # Generate fake results 
            fake = genr(y_in_latent, training=True)
            
            # Get the logits for the fake results
            fake_logits = discrr([fake,y_in_latent], training=True)
            
            # Get the logits for the real data
            real_logits = discrr([tf.convert_to_tensor(y_target_train,dtype=tf.float32),y_in_latent], training=True)
            
            # Calculate the discriminator loss using the fake and real logits
            d_cost = discriminator_loss(real_logits, fake_logits)
            
            # Calculate the gradient penalty
            gp = gradient_penalty(tf.convert_to_tensor(y_target_train,dtype=tf.float32), fake, y_in_latent)
            
            # Add the gp to the original loss
            d_loss = d_cost + gp * gp_weight
        
        # Get the gradients 
        d_gradient = tape.gradient(d_loss, discrr.trainable_variables)
        
        # Update the weights 
        discr_opt.apply_gradients(zip(d_gradient, discrr.trainable_variables))
        
    # Step 2: train the generator
        
    with tf.GradientTape() as tape:
        
        # Generate fake results 
        generated_curves = genr(y_in_latent, training=True)
        
        # Get the discriminator logits for fake results
        gen_logits = discrr([generated_curves,y_in_latent], training=True)
        
        # Calculate the generator loss
        g_loss = generator_loss(gen_logits, generated_curves)
        
    # Record the values of the losses
    record1[i] = record11(gen_logits, generated_curves)
    record2[i] = record22(gen_logits, generated_curves)
        
    # Get the gradients 
    gen_gradient = tape.gradient(g_loss, genr.trainable_variables)
    
    # Update the weights 
    gen_opt.apply_gradients(zip(gen_gradient, genr.trainable_variables))

In [None]:
# Create figure 3

plt.plot(np.abs(record1),'k')
plt.plot(np.abs(record2),'b')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.yscale('log')
plt.legend(['Adversarial error','MAE'])
plt.show()

## CGAN vs CWGAN

Run this code to generate figure 3 of the paper using pre-trained CGAN and CWGAN.

In [None]:
# Load parameters

gen_CGAN = models.load_model('GAN_8metal_020_CGAN.h5')
gen_CWGAN = models.load_model('GAN_8metal_020_v3g.h5')

y_target_train_new = y_target_rand[train_index,:]

# PCA

cov_mat = np.matmul(np.transpose(y_target_train_new),y_target_train_new)/y_target_train_new.shape[0]

[w,v] = np.linalg.eig(cov_mat)

z = np.matmul(y_target_train_new,v[:,:7])
z_CGAN = np.matmul(gen_CGAN(y_in_train),v[:,:7])
z_CWGAN = np.matmul(gen_CWGAN(y_in_train),v[:,:7])

# Check information PCA

s = np.zeros(w.shape[0]) # Pre-allocate

s0 = np.sum(w)

for i in range(w.shape[0]):

  si = np.sum(w[:(i+1)])
  s[i] = si/s0

print(s[1]) # First two elements, what we use for the figure
print(s[7]) # First 8 elements

In [None]:
# Figure 4a

plt.scatter(z[:,0],z[:,1],color='darkgray')
plt.scatter(z_CGAN[:,0],z_CGAN[:,1],color='forestgreen')
plt.xlabel('$z_1$')
plt.ylabel('$z_2$')
plt.legend(['training set','generated'])
plt.show()

In [None]:
# Figure 4b

plt.scatter(z[:,0],z[:,1],color='darkgray')
plt.scatter(z_CWGAN[:,0],z_CWGAN[:,1],color='forestgreen')
plt.xlabel('$z_1$')
plt.ylabel('$z_2$')
plt.legend(['training set','generated'])
plt.show()