In [None]:
# REMOVAL OF CROSS-PHASE-MODULATION ARTIFACTS IN ULTRAFAST PUMP-PROBE DYNAMICS VIA DEEP LEARNING

# GENERATION OF THE XPMnet TRAINING/VALIDATION DATASET
import numpy as np
import math
import matplotlib.pyplot as plt

# XPMnet processes single wavelength time dynamics.
# XPM starting time t0(lambda) is varied in a typical experimental range:
t0 = np.random.uniform(150, 700) #[fs]
# Number of time points in the simulated dynamics:
t_points = 200 

def f_cos(t0):
    """ Input = delay t0
        Output = cross-phase modulation (XPM) pattern
        time points = t_points
        The function simulates randomly reversed peaks (a[0], a[1]), 
        a random tau, a random phase phi and a random b parameter. """
    t = np.arange(0, t_points)*5
    t0xpm = t0 + round(np.random.uniform(-15, +15))
    # Load the 'Experimental_XPM_data provided to perform data augmentation on
    # experimentally measured XPM artifacts on soda-lime glass
    exp_xpm_data = np.loadtxt("/content/gdrive/MyDrive/Colab Notebooks/Experimental_XPM_data.txt")
    dim = exp_xpm_data.shape
    select = round(np.random.uniform(0,dim[0]-1))
    parameters = exp_xpm_data[select,:]
    parameters[0] = parameters[0] + np.random.uniform(-.05, .05)*parameters[0]    
    parameters[1] = parameters[1] + np.random.uniform(-.05, .05)*parameters[1]
    parameters[2] = parameters[2] + np.random.uniform(-.05, .05)*parameters[2]
    parameters[3] = parameters[3] + np.random.uniform(-.05, .05)*parameters[3]
    parameters[4] = parameters[4] + np.random.uniform(-.05, .05)*parameters[4]
    out = np.cos(parameters[3]*(t-t0xpm)**2+parameters[4])*(parameters[0]* \
          np.exp(-(4*np.log(2)*(t-t0xpm)**2)/parameters[2]**2)-parameters[1]* \
          8*np.log(2)/(parameters[2])**2*(t-t0xpm)*np.exp(-(4*np.log(2)* \
          (t-t0xpm)**2)/parameters[2]**2))
    return out

def dyn(t0):
    """ Input = delay t0
        Output = exponential dynamic pattern
        time points = t_points
        The function simulates an exponential electronic relaxation dynamic.
        Parameters are chose in typical experimental ranges. The set of 
        parameters comprises: amplitude a, time constant tau and c to account
        for the remaining signal after relaxation. 
        The Heaviside(t0) function is employed. """
    t = np.arange(0,t_points)
    a = np.random.uniform(.0017, .013)
    tau = np.random.uniform(150, 300)
    c = np.random.uniform(0, 1E-5)
    h=np.zeros(t_points)
    h[round(t0/5):t_points] = 1
    out = (-a*np.exp(-(t*5-t0)/tau)+c)*h
    return out
 
def f_sech():
    """ Output = pulse pattern as an hyperbolic secant (sech)
        The function simulates a pulse pattern with a FWHM in typical
        experimental ranges """
    t2 = int(t_points/2)
    fwhm = np.random.uniform(16, 26)
    sigma0=fwhm/1.76
    X = sigma0/(2*np.log(2+math.sqrt(3)))
    hyper_secant = np.zeros(t_points)
    for i in range(-t2, t2):
        hyper_secant[i] = ((2/(np.exp(i/X) + np.exp(-i/X))))
    out = np.zeros(t_points)
    out[0:t2] = hyper_secant[t2:t_points]
    out[t2:t_points] = hyper_secant[0:t2]
    out=out**2
    out=out/np.trapz(out)
    
    return out

def generate_dynamics():
    """ Output: INPUT X of the XPMnet , OUTPUT Y of the XPMnet, 
        exponential dynamics, pulse dynamics, XPM dynamics, 
        convolution(exponential dynamics, pulse dynamics).
        The function normalizes the X and Y in the range [0, 1]. """
    t0 = np.random.uniform(150, 700)
    # Creation of X and Y  
    d = dyn(t0)
    s = f_sech()
    f = f_cos(t0)
    c = np.convolve(d, s, mode='same')
    noise = np.random.randn(t_points)*np.random.uniform(.00003, .0001)
    # Input X
    X = c + f + noise
    # Output Y
    Y = c
    # Normalization of X and Y
    X_norm =(X +0.02)/(0.0063 + 0.02)
    Y_norm =(Y +0.02)/(0.0063 + 0.02)
    return X_norm, Y_norm, d, s, f, c

def data_augmentation(X, y, n_augmented):
    """ Input: Training/validation dataset to augment 
        (input X and ground truth y) and the desired number of augmented 
        instances to generate.
        Output: augmented X and augmented ground truth y
        This function aims at augmenting the dataset to account for a varying 
        baseline in the input data points, due to a case-specific
        normalization. """
    augmented = 0
    batch_dim = X.shape[0]
    print(batch_dim)
    augmented_instance_X = np.empty((batch_dim + n_augmented, t_points, 1))
    augmented_instance_y = np.empty((batch_dim + n_augmented, t_points))
    for i in range(batch_dim):
        augmented_instance_X[i, :, 0] = X[i, :, 0].flatten()
        augmented_instance_y[i, :] = y[i, :].flatten()
    while augmented < n_augmented:
        Xnew, Ynew, d, s, f, c = generate_dynamics()
        if np.min(Xnew) > .3:
            baseline_shift = np.random.uniform(0, .3)
            Xnew = Xnew - baseline_shift
            Ynew = Ynew - baseline_shift
            augmented_instance_X[batch_dim + augmented,:, 0] = Xnew 
            augmented_instance_y[batch_dim + augmented, :] = Ynew
            augmented += 1
    print(f"Dataset successfully augmented by {augmented} instances.")
    return augmented_instance_X, augmented_instance_y
   
X_norm, Y_norm, d, s, f, c = generate_dynamics()

# Plotting:
print('Example of generated train/test instance:')
t = np.arange(0, t_points)*5
fig, axs = plt.subplots(3, 2, sharex=True)
plt.subplots_adjust(hspace = .5, wspace = .5)
axs[0, 0].plot(t, f)
axs[0, 0].set_title("XPM artifact")
axs[1, 0].plot(t, d)
axs[1, 0].set_title("Exponential dynamic")
axs[0, 1].plot(t, s)
axs[0, 1].set_title("Pump pulse")
axs[1, 1].plot(t, c)
axs[1, 1].set_title("Electron cooling dynamics")
axs[2, 0].plot(t, X_norm)
axs[2, 0].set_ylim([0, 1])
axs[2, 0].set_title("XPMnet input")
axs[2, 1].plot(t, Y_norm)
axs[2, 1].set_title("XPMnet ground truth")
axs[2, 1].set_ylim([0, 1])
np.savetxt("plotxpm", f)

In [None]:
# BUILD, COMPILE AND FIT XPMnet
#from xpmnet_short_1 import generate_dynamics, data_augmentation
import numpy as np
import tensorflow as tf
import time

tf.keras.backend.clear_session()
#Number of time points in the simulated dynamics:
t_points = 200 

def generate_batch(n=1):
    """ Input = number of desired simulated dynamics to train and
        validate XPMnet
        Output = XPMnet input X, XPMnet output y """
    X = np.empty((n, t_points, 1))
    y = np.empty((n, t_points))
    for i in range(n):
        X[i,:, 0], y[i, :], d, s, f, c = generate_dynamics()
    return X, y

def R_squared(y_true, y_pred):
    """ Computes the Rsquared coefficient, index of the accuracy
        of prediction in regression-like problems. This coefficient will be
        used as a metric during the XPMnet training/validation """
    SS_res =  tf.keras.backend.sum(tf.keras.backend.square(y_true-y_pred)) 
    SS_tot = tf.keras.backend.sum(tf.keras.backend.square(y_true-tf.keras.backend.mean(y_true))) 
    return (1-SS_res/(SS_tot+tf.keras.backend.epsilon()))

def plot_metric(history, metric, yscale):
    """ Plots the metrics per training/validation epoch.
        Input: history of the model, metric to plot and y-axis scale ('log' for
        logarithmic scale, 'linear' for linear scale)"""
    train_metrics = history.history[metric]
    val_metrics = history.history['val_'+metric]
    start_epoch = 1
    epochs = range(start_epoch, len(train_metrics) + 1)
    plt.figure()
    plt.subplot(121)
    plt.grid(True)
    plt.yscale(yscale)
    plt.plot(epochs, train_metrics, epochs, val_metrics)
    plt.ylim(0, 1)
    plt.xticks(np.arange(start_epoch, len(train_metrics) + 1, 1.0))
    plt.xlabel("Epochs")
    plt.ylabel(metric)
    plt.title('Training and validation '+ metric)
    plt.legend(["train_"+metric, 'val_'+ metric])    

# Generate the XPMnet training/validation batch
start = time.time()
print('Generating the XPMnet train/test dataset...')
X, y = generate_batch(40000)
# Perform data augmentation to compensate for baseline shifting
start1 = time.time()
X_augmented, y_augmented = data_augmentation(X, y, 40000)
end = time.time()
print('...XPMnet train/test dataset successfully generated!')
print("Dataset generation time: {:.2f} min".format((end - start)/60))

# Build XPMnet
model = tf.keras.Sequential([

    tf.keras.layers.BatchNormalization(input_shape=(t_points, 1)),
    
    tf.keras.layers.Conv1D(128, 32, activation='relu'),
    
    tf.keras.layers.Conv1D(96, 24, activation='relu'),

    tf.keras.layers.Conv1D(64, 8, activation='relu'),
    tf.keras.layers.Conv1D(8, 3, activation='relu'),
    tf.keras.layers.Conv1D(8, 3, activation='relu'),
    tf.keras.layers.Conv1D(8, 3, activation='relu'),
    
    tf.keras.layers.Dense(64, activation='relu', kernel_regularizer=tf.keras.regularizers.l1_l2(l1=0,l2=0.1)),
    tf.keras.layers.Dense(36, activation='relu', kernel_regularizer=tf.keras.regularizers.l1_l2(l1=0,l2=0.1)),
    tf.keras.layers.Dense(36, activation='relu', kernel_regularizer=tf.keras.regularizers.l1_l2(l1=0,l2=0.1)),
   
    
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dropout(.35),
    tf.keras.layers.Dense(t_points, activation='relu')
    
    ])

model.compile(loss='mean_absolute_percentage_error', optimizer='Adam', metrics=['mean_squared_error', 'mean_absolute_percentage_error', R_squared])
print(model.summary())

history = model.fit(X_augmented, y_augmented, epochs=100, verbose=1, validation_split=0.25, batch_size=128)

In [None]:
# PLOT THE TRAINING & VALIDATION HISTORY
import matplotlib.pyplot as plt

metric = 'loss'
yscale = 'linear'
train_metrics = history.history[metric]
val_metrics = history.history['val_'+metric]
start_epoch = 1
epochs = range(start_epoch, len(train_metrics) + 1)
plt.figure(figsize=(15, 5))
plt.subplot(121)
plt.grid(True)
plt.yscale(yscale)
plt.plot(epochs, train_metrics, epochs, val_metrics)
plt.xticks(np.arange(start_epoch, len(train_metrics) + 1, 1.0))
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.title('Training and validation '+ metric)
plt.legend(["train_"+metric, 'val_'+ metric]) 

In [None]:
# TEST THE CNN PREDICTION ON INDIVIDUAL RANDOMLY GENERATED ISTANCES
import time
import matplotlib.pyplot as plt
import tensorflow as tf
X, y_true = generate_batch(1)
start = time.time()
y_pred = model.predict([X])
end = time.time()
print("XPM artifact removal time: {:.2f} s".format(end - start))
t = np.arange(0, t_points)*5
plt.plot(t, X.flatten(), t, y_true.T, t, y_pred.flatten())
plt.legend(["Simulated XPM-affected dynamic", "Ground truth", "XPMnet retrieved dynamics"])
plt.xlabel("Time [fs]")
plt.ylabel("Normalized intensity")
plt.ylim(0, 1)