In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import tensorflow as tf

params = {"axes.labelsize": 14,
          "axes.titlesize": 16,}
plt.rcParams["axes.linewidth"] = 1
plt.rcParams['mathtext.bf'] = 'STIXGeneral:italic:bold'
plt.rcParams['figure.dpi'] = 100
plt.rcParams.update(params)

def place(ax):
  ax.tick_params(direction="in", which="minor", length=3)
  ax.tick_params(direction="in", which="major", length=5, labelsize=13)
  ax.grid(which="major", ls="dashed", dashes=(1, 3), lw=0.8, zorder=0)
  #ax.legend(frameon=True, loc="best", fontsize=12,edgecolor="black")
  fig.tight_layout()


def combine_data(xbins, rho_O, rho_H, mu_O, mu_H,
                 muloc_O, muloc_H, c1_O, c1_H):
    data = {}
    
    data = np.zeros(xbins.shape, dtype=[('xbins', 'f8'),
                                        ('rho_O', 'f8'), ('muloc_O', 'f8'), ('c1_O', 'f8'), ('mu_O', 'f8'),
                                        ('rho_H', 'f8'), ('muloc_H', 'f8'), ('c1_H', 'f8'), ('mu_H', 'f8'),
                                        ('elec_O', 'f8'), ('elec_H', 'f8')])
    
    data['xbins'] = xbins
    data['rho_O'] = rho_O
    data['rho_H'] = rho_H
    data['mu_O'] = mu_O
    data['mu_H'] = mu_H
    data['muloc_O'] = muloc_O
    data['muloc_H'] = muloc_H
    data['c1_O'] = c1_O
    data['c1_H'] = c1_H
    #data['elec_O'] = elec_O
    #data['elec_H'] = elec_H
    return data

# Load the simData dictionary from the file
simData_Efield = np.load("../../data/edl_SR.npy", allow_pickle=True).item()
simData_all = np.load("../../data/RPM_all_Aug20.npy", allow_pickle=True).item()
simData = {}

for key in ['training', 'validation', 'test']:
    # Combine the inner dictionaries
    simData[key] = { **simData_all.get(key, {}), **simData_Efield.get(key, {}) }

all_simulations = []
for category in ['training', 'validation', 'test']:
    all_simulations.extend(list(simData[category].keys()))
    
    

for sim in all_simulations:
    category = next(cat for cat in simData if sim in simData[cat])
    data = simData[category][sim]
    xbins = data['xbins']
    rho_O = data['rho_O']
    muloc_O = data['muloc_O']
    c1_O = data['c1_O']
    rho_H = data['rho_H']
    muloc_H = data['muloc_H']
    c1_H = data['c1_H']
    mu_H = data['mu_H']
    mu_O = data['mu_O']
    #elec_H = data['elec_H']
    #elec_O = data['elec_O']
    
    sim_name = sim + "_mirror"
    combined_data_mirror = combine_data(xbins, rho_H, rho_O, mu_H, mu_O,
                                    muloc_H, muloc_O, c1_H, c1_O,)
    
    simData[category][sim_name] = combined_data_mirror
    
    

# Load the simData dictionary from the file
#simData = np.load("../../data/RPM_all.npy", allow_pickle=True).item()
simPCData = np.load("../../data/RPM_PC.npy", allow_pickle=True).item()


# Combine all simulations into one list
all_simulations = []
for category in ['training', 'validation', 'test']:
    all_simulations.extend(list(simData[category].keys()))

PC_simulations = []
PC_simulations.extend(list(simPCData['training'].keys()))

print(len(all_simulations))
print(len(PC_simulations))




## Inspect the data

In [None]:
# Select a random simulation
random_sim = random.choice(all_simulations)

# Determine which category the random simulation belongs to
category = next(cat for cat in simData if random_sim in simData[cat])


# Get the data for the random simulation
data = simData[category][random_sim]

# Extract z, rho, muloc, and c1
xbins = data['xbins']

rho_O = data['rho_O']
muloc_O = data['muloc_O']
c1_O = data['c1_O']
rho_H = data['rho_H']
muloc_H = data['muloc_H']
c1_H = data['c1_H']
elec_H = data['elec_H']
elec_O = data['elec_O']

# Plot muloc(z), rho(z), and c1(z)
fig, ax = plt.subplots(3, 1, figsize=(5,6), sharex='all')


ax[0].plot(xbins, muloc_O, label='O', color='deepskyblue')
ax[0].plot(xbins, muloc_H, label='H', color='hotpink')

ax[0].plot(xbins, elec_O, label='O', color='deepskyblue', linestyle='--')
ax[0].plot(xbins, elec_H, label='H', color='hotpink', linestyle='--')
ax[0].set_ylabel(r'$\beta\mu - \beta V_{\mathrm{ext}}(x)$')
ax[0].set_title(f'{random_sim}')

ax[1].plot(xbins, rho_O, label='O', color='deepskyblue')
ax[1].plot(xbins, rho_H, label='H', color='hotpink')
ax[1].set_ylabel(r'$\rho(x)$')


ax[2].plot(xbins, c1_O, label='O', color='deepskyblue')
ax[2].plot(xbins, c1_H, label='H', color='hotpink')
ax[2].set_ylabel(r'$c^{(1)}(x)$')
ax[2].set_xlabel(r'$x$ [$\mathrm{\AA}$]')
ax[0].legend(frameon=True, loc="best", fontsize=12,edgecolor="black")
#ax[2].set_xlim(0, 20)

place(ax[1])
place(ax[0])
place(ax[2])

plt.show()

In [None]:


# Select a random simulation
random_sim = random.choice(PC_simulations)

# Determine which category the random simulation belongs to
category = next(cat for cat in simPCData if random_sim in simPCData[cat])


# Get the data for the random simulation
data = simPCData[category][random_sim]


xs = data['xs']
c2_OO = data['c2_OO']
c2_OH = data['c2_OH']
c2_HH = data['c2_HH']



# Plot muloc(z), rho(z), and c1(z)
fig, ax = plt.subplots(1, 1, figsize=(5,5))

ax.set_title(f'{random_sim}')

ax.plot(xs, c2_OO, label='OO', lw=2, color='deepskyblue')
ax.plot(xs, c2_OH, label='OH', lw=2, color='hotpink')
ax.plot(xs, c2_HH, label='HH', lw=2, ls='--', color='turquoise')


ax.set_xlim(0, 20)

place(ax)
plt.show()



## Curate data for training, sliding window approach

In [None]:
import sys
sys.path.append("..")
from data_generators import DataGenerator_inhom_twotype, get_dataset_c1_O_twotype, get_dataset_c2_O_twotype
windowSigma = 10.0


# Generator options
generatorOptions = {
    "batch_size": 128,
    "windowSigma": 10.00,
    "inputKeys1": ["rho_O"],
    "inputKeys2": ["rho_H"],
    "outputKeys": ["c1_O"],
    "binKey": "xbins",
}

# Create data generators
trainingGenerator = DataGenerator_inhom_twotype(simData["training"], **generatorOptions)
validationGenerator = DataGenerator_inhom_twotype(simData["validation"], **generatorOptions)
train_dataset_c1O = get_dataset_c1_O_twotype(trainingGenerator)
validation_dataset_c1O = get_dataset_c1_O_twotype(validationGenerator)
train_dataset_c2OO = get_dataset_c2_O_twotype(simPCData["training"], windowSigma, trainingGenerator.input1Shape, trainingGenerator.input2Shape)


## Create neural network

In [None]:
from tensorflow import keras

# Define the model inputs
rho_O_input = keras.Input(shape=trainingGenerator.input2Shape, name="rho_O")
rho_H_input = keras.Input(shape=trainingGenerator.input1Shape, name="rho_H")

# Flatten array
x_O = keras.layers.Flatten()(rho_O_input)
x_H = keras.layers.Flatten()(rho_H_input)


# Concatenate the two inputs
x = keras.layers.Concatenate()([x_O, x_H])
x = keras.layers.Dense(512, activation="softplus")(x)
x = keras.layers.Dense(512, activation="softplus")(x)
x = keras.layers.Dense(512, activation="softplus")(x)



outputs = {"c1_O": keras.layers.Dense(trainingGenerator.outputShape[0], name="c1_O")(x)}

inputs = [rho_O_input, rho_H_input]
model = keras.Model(inputs=inputs, outputs=outputs)

optimizer = keras.optimizers.Adam()
loss = keras.losses.MeanSquaredError()
metrics = [keras.metrics.MeanAbsoluteError()]
model.compile(
    optimizer=optimizer,
    loss=loss,
    metrics=metrics,
)
model.summary()

keras.utils.plot_model(model, show_shapes=True, show_layer_names=True ,show_layer_activations=True, dpi=80, to_file='model_RPM.png')



Pair-correlation matching

In [6]:

'''
Custom training loop with optional pair-correlation matching
'''



# Define your validation step
@tf.function
def validation_step(x_val, y_val):
    val_predictions = model(x_val, training=False)["c1_O"]
    val_loss = loss(y_val["c1_O"], val_predictions)
    return val_loss

@tf.function
def train_step(x_c1, y_c1, x_c2x=None, y_c2x=None, alpha_c1=1, alpha_c2x=0.001, dx=0.03):
    with tf.GradientTape() as tape:
        c1_model = model(x_c1, training=True)["c1_O"]
        loss_c1 = loss(y_c1["c1_O"], c1_model)
        loss_c2x = 0
        if alpha_c2x > 0:
            with tf.GradientTape(watch_accessed_variables=False) as tape2:
                tape2.watch(x_c2x)
                c1_model_pc = model(x_c2x, training=True)["c1_O"]
            c2OO_model = tape2.gradient(c1_model_pc, x_c2x["rho_O"]) / dx
            loss_c2OOx = loss(y_c2x["c2_OO"], c2OO_model) 
            
            with tf.GradientTape(watch_accessed_variables=False) as tape3:
                tape3.watch(x_c2x)
                c1_model_pc = model(x_c2x, training=True)["c1_O"]
            c2OH_model = tape3.gradient(c1_model_pc, x_c2x["rho_H"]) / dx
            loss_c2OHx = loss(y_c2x["c2_OH"], c2OH_model)
            
            loss_c2x = loss_c2OOx + loss_c2OHx
            
        loss_total = alpha_c1 * loss_c1 + alpha_c2x * loss_c2x
    grads = tape.gradient(loss_total, model.trainable_weights)
    optimizer.apply(grads, model.trainable_weights)
    for metric in metrics:
        metric.update_state(y_c1["c1_O"], c1_model)
    return loss_c1, loss_c2x




class EarlyStoppingCallback:
    def __init__(self, patience=5):
        self.patience = patience
        self.best_val_loss = np.inf
        self.wait = 0
        self.stopped_epoch = 0

    def on_epoch_end(self, epoch, val_loss):
        if val_loss < self.best_val_loss:
            self.best_val_loss = val_loss
            self.wait = 0
        else:
            self.wait += 1
            if self.wait >= self.patience:
                self.stopped_epoch = epoch
                print(f"Early stopping at epoch {epoch + 1}")
                return True
        return False


## Train neural network

In [None]:
early_stopping = EarlyStoppingCallback(patience=200)
best_val_loss = np.inf

for epoch in range(166):
    if epoch % 2 == 0:
        print(f"Epoch: {epoch}")
        print(f"\tlearning rate: {optimizer.learning_rate.numpy():.4g}")

    for step, ((x_c1, y_c1), (x_c2x, y_c2x)) in enumerate(zip(train_dataset_c1O, train_dataset_c2OO)):
        
        loss_c1, loss_c2x = train_step(x_c1, y_c1, x_c2x, y_c2x, alpha_c1=1, alpha_c2x=0.0001)

    if epoch % 2 == 0:
        #print(f"\tsteps: {step}")
        print(f"\tloss_c1: {loss_c1:.4g}", f"\tloss_c2x: {loss_c2x:.4g}")
 
        # Validation
        val_losses = []
        for (x_val, y_val) in validation_dataset_c1O:
            val_loss = validation_step(x_val, y_val)
            val_losses.append(val_loss)
        avg_val_loss = np.mean(val_losses)
        print(f"\tValidation loss: {avg_val_loss:.4g}")

        # Save the best model
        #if avg_val_loss < best_val_loss:
        #    best_val_loss = avg_val_loss
        #    model.save("../../models/RPM_PC_O.keras")
        #    print(f"\tBest model saved with validation loss: {best_val_loss:.4g}")
        model.save("../../models/RPM_O_PC_softplus.keras")
        #model.save("../../models/RPM_PC_O.keras")

        for metric in metrics:
            print(f"\t{metric.name} (c1): {metric.result():.4g}")
            metric.reset_state()

    optimizer.learning_rate *= 0.95
    
    if early_stopping.on_epoch_end(epoch, avg_val_loss):
        break


## Quick test

In [None]:

#model.save('../../models/RPM_O_PC_Aug24.keras') 
#model = keras.models.load_model("../../models/RPM_O_PC_Aug24.keras")

testGenerator = DataGenerator_inhom_twotype(simData["test"], **generatorOptions)
test_metrics = model.evaluate(testGenerator)

## See the predicted correlation function of test set

In [None]:
def generate_windows(array, bins, mode="wrap"):

    padded_array = np.pad(array, bins, mode=mode)
    windows = np.empty((len(array), 2 * bins + 1))
    for i in range(len(array)):
        windows[i] = padded_array[i:i + 2 * bins + 1]
    return windows


def c1O(model, rho_O, rho_H, input_bins=667):


    window_bins = (input_bins - 1) // 2
    rhoO_windows = generate_windows(rho_O, window_bins).reshape(rho_O.shape[0], input_bins, 1)
    rhoH_windows = generate_windows(rho_H, window_bins).reshape(rho_H.shape[0], input_bins, 1)
    
    c1O_result = model.predict_on_batch([rhoO_windows, rhoH_windows])
    return c1O_result["c1_O"].flatten()




# Combine all tests simulations into one list
all_test_simulations = []
for category in ['test']:
    all_test_simulations.extend(list(simData[category].keys()))


# Select a random simulation
random_sim = random.choice(all_test_simulations)
#random_sim = 'sim_1077'
# Determine which category the random simulation belongs to
category = next(cat for cat in simData if random_sim in simData[cat])


# Get the data for the random simulation
data = simData[category][random_sim]

# Extract z, rho, muloc, and c1
xbins = data['xbins']
rho_O = data['rho_O']
muloc_O = data['muloc_O']
c1_O = data['c1_O']
rho_H = data['rho_H']
muloc_H = data['muloc_H']
c1_H = data['c1_H']

# Plot muloc(z), rho(z), and c1(z)
fig, ax = plt.subplots(3, 1, figsize=(5,6), sharex='all')


ax[0].plot(xbins, muloc_H, label='H', color='pink')
ax[0].plot(xbins, muloc_O, label='O', color='deepskyblue')

ax[0].set_ylabel(r'$\beta\mu - \beta V_{\mathrm{ext}}(x)$')
ax[0].set_title(f'{random_sim}')

ax[1].plot(xbins, rho_H, label='H', color='pink')
ax[1].plot(xbins, rho_O, label='O', color='deepskyblue')

ax[2].plot(xbins, c1_H, label='H, sim', color='pink', lw=2)
ax[2].plot(xbins, c1_O, label='O, sim', color='deepskyblue', lw=2)

c1_O_pred = c1O(model, rho_O, rho_H)

ax[2].plot(xbins, c1_O_pred, label='O, predicted', color='darkblue', ls='--')


ax[1].set_ylabel(r'$\rho(x)$')
ax[2].set_ylabel(r'$c^\mathrm{(1)}(x)$')

ax[2].legend()
ax[2].set_xlim(0, 20)

place(ax[1])
place(ax[0])
place(ax[2])

plt.show()