In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import tensorflow as tf

params = {"axes.labelsize": 14,
          "axes.titlesize": 16,}
plt.rcParams["axes.linewidth"] = 1
plt.rcParams['mathtext.bf'] = 'STIXGeneral:italic:bold'
plt.rcParams['figure.dpi'] = 100
plt.rcParams.update(params)

def place(ax):
  ax.tick_params(direction="in", which="minor", length=3)
  ax.tick_params(direction="in", which="major", length=5, labelsize=13)
  ax.grid(which="major", ls="dashed", dashes=(1, 3), lw=0.8, zorder=0)
  fig.tight_layout()


training_data_path = "../../../bui2026-dielectrocapillarity/training-data/"
simData1 = np.load(training_data_path + "dipole_T250_T500_coswave.npy", allow_pickle=True).item()
simData2 = np.load(training_data_path + "dipole_T500_coswave.npy", allow_pickle=True).item()
simData3 = np.load(training_data_path + "dipole_T300_coswave.npy", allow_pickle=True).item()
simData4 = np.load(training_data_path + "dipole_T500_walls.npy", allow_pickle=True).item()
simData5 = np.load(training_data_path + "dipole_T250_T500_walls.npy", allow_pickle=True).item()

simData = {}

for key in ['training', 'validation', 'test']:
    combined_data = {}
    datasets = [ simData1, simData2, simData3, simData4, simData5]
    
    for i, simData in enumerate(datasets, start=1):
        for inner_key, value in simData.get(key, {}).items():
            
            combined_data[f"{inner_key}_{i}"] = value
        
            
    simData[key] = combined_data

def combine_data(xbins, elec, elec_grad, rho_A, mu_A, muloc_A, c1_A, n):
    data = {}
    
    data = np.zeros(xbins.shape, dtype=[('xbins', 'f8'), ('elec', 'f8'), ('elec_grad', 'f8'),  ('n', 'f8'), 
                                        ('rho_A', 'f8'), ('muloc_A', 'f8'), ('c1_A_scaledT', 'f8'), ('mu_A', 'f8')])
    
    data['xbins'] = xbins
    data['elec'] = elec
    data['elec_grad'] = elec_grad
    data['rho_A'] = rho_A
    data['mu_A'] = mu_A
    data['muloc_A'] = muloc_A
    data['elec'] = elec
    data['elec_grad'] = elec_grad
    data['c1_A_scaledT'] = c1_A
    data['n'] = n
    
    # set n to zero if all of elec is zero
    if np.all(elec_grad == 0):
        data['n'] = np.zeros_like(n)
        print("Setting n to zero")
    
    return data

# Combine all simulations into one list
all_simulations = []
for category in ['training', 'validation', 'test']:
    all_simulations.extend(list(simData[category].keys()))
    

# Add mirror simulations
for sim in all_simulations:
    category = next(cat for cat in simData if sim in simData[cat])
    data = simData[category][sim]
    xbins = data['profiles']['xbins']
    rho_A = data['profiles']['rho_A']
    muloc_A = data['profiles']['muloc_A']
    c1_A = data['profiles']['c1_A_scaledT']
    mu_A = data['profiles']['mu_A']
    elec = -data['profiles']['elec']
    elec_grad = -np.flip(data['profiles']['elec_grad'])
    n = -data['profiles']['n']
    
    sim_name = sim + "_mirror"
    combined_data_mirror = combine_data(xbins, elec, elec_grad, rho_A, mu_A, muloc_A, c1_A, n)
    
    data_dict = {'profiles': {}, 'params': {}}
    data_dict['profiles'] = combined_data_mirror
    data_dict['params'] = data['params']
    
    simData[category][sim_name] = data_dict
    
# Add flip simulations
for sim in all_simulations:
    category = next(cat for cat in simData if sim in simData[cat])
    data = simData[category][sim]
    xbins = np.flip(data['profiles']['xbins'])
    rho_A = np.flip(data['profiles']['rho_A'])
    muloc_A = np.flip(data['profiles']['muloc_A'])
    c1_A = np.flip(data['profiles']['c1_A_scaledT'])
    mu_A = np.flip(data['profiles']['mu_A'])
    elec = np.flip(data['profiles']['elec'])
    elec_grad = -np.flip(data['profiles']['elec_grad'])
    n = np.flip(data['profiles']['n'])
    
    sim_name = sim + "_flip"
    combined_data_mirror = combine_data(xbins, elec, elec_grad, rho_A, mu_A, muloc_A, c1_A, n)
    
    data_dict = {'profiles': {}, 'params': {}}
    data_dict['profiles'] = combined_data_mirror
    data_dict['params'] = data['params']
    
    simData[category][sim_name] = data_dict

# Combine all simulations into one list
all_simulations = []
for category in ['training', 'validation', 'test']:
    all_simulations.extend(list(simData[category].keys()))

print(len(all_simulations))

## Inspect the data

In [None]:
# Select a random simulation
random_sim = random.choice(all_simulations)
#random_sim = 'sim_0954'


category = next(cat for cat in simData if random_sim in simData[cat])

data = simData[category][random_sim]

z = data['profiles']['xbins']
rho = data['profiles']['rho_A']
muloc = data['profiles']['muloc_A']
elec = data['profiles']['elec']
elec_grad = data['profiles']['elec_grad']
c1 = data['profiles']['c1_A_scaledT']
n = data['profiles']['n']
temp = data['params']['T']


fig, ax = plt.subplots(4, 1, figsize=(5,6), sharex='all')

ax[0].plot(z, elec, label='elec', color='black')

ax[0].set_ylabel(r'$e\beta\phi(z)$')
ax[0].set_title(f'{random_sim} {temp}')

ax[1].plot(z, rho, label='rho', color='hotpink')

ax[2].plot(z, n, label='n', color='hotpink')


ax[3].plot(z, c1, label='c1', color='hotpink')


ax[1].set_ylabel(r'$\rho$ [$\mathrm{\AA}^{-3}$]')

ax[2].set_ylabel(r'$10^3 n$ [$ e\mathrm{\AA}^{-3}$]')


ax[3].set_ylabel(r'$c^{(1)}$')
ax[3].set_xlabel(r'$x$ [$\mathrm{\AA}$]')

ax[2].set_xlim(0, 20)

place(ax[1])
place(ax[0])
place(ax[2])
place(ax[3])

plt.show()

## Curate data for training, sliding window approach

In [None]:
import sys
sys.path.append("..")
from data_generators import DataGeneratorThreeInput

def filt(sim):
    temp = sim["params"]["T"]
    return temp > 0.0

# Generator options
generatorOptions = {
    "batch_size": 256,
    "window1Sigma": 10.00,
    "window2Sigma": 10.00,
    "inputKeys1": ["rho_A"],
    "inputKeys2": ["elec"],
    "paramsKeys":["T"],
    "outputKeys": ["n"],
    "binKey": "xbins",
    "filt": filt,
}

# Create data generators
trainingGenerator = DataGeneratorThreeInput(simData["training"], **generatorOptions)
validationGenerator = DataGeneratorThreeInput(simData["validation"], **generatorOptions)


## Create neural network for model

In [None]:
import tensorflow as tf
from tensorflow import keras

# Define a custom layer to compute gradients of `phi` (1D array)
@keras.utils.register_keras_serializable()
class GradientLayer(keras.layers.Layer):
    def call(self, inputs):
        # Compute numerical gradient using central difference (approximated)
        grad = 0.5 * (inputs[:, 2:] - inputs[:, :-2])  # Central difference
        grad = tf.pad(grad, [[0, 0], [1, 1]])  # Pad to keep the same shape
        return grad

# Define the model
profile1Inputs = {"rho_A": keras.Input(shape=trainingGenerator.input1Shape, name="rho")}
profile2Inputs = {"elec": keras.Input(shape=trainingGenerator.input2Shape, name="phi")}
paramsInputs = {"T": keras.Input(shape=(1,), name="T")}  # Temperature input

# L2 regularization
regularizer = keras.regularizers.l2(0.0003)

# Process `rho_A`
x1 = keras.layers.Dense(256, activation="softplus", kernel_regularizer=regularizer)(profile1Inputs["rho_A"])

# Compute gradient of `phi` using custom layer (for 1D input)
grad_phi = GradientLayer()(profile2Inputs["elec"])

# Process both `phi` and its gradient
x2_phi = keras.layers.Dense(32, activation="softplus", kernel_regularizer=regularizer)(profile2Inputs["elec"])
x2_grad = keras.layers.Dense(512, activation="softplus", kernel_regularizer=regularizer)(grad_phi)

# Concatenate `phi` and its gradient
x2 = keras.layers.Concatenate()([x2_phi, x2_grad])

# Further process `phi` and gradient combined
x2 = keras.layers.Dense(256, activation="softplus", kernel_regularizer=regularizer)(x2)

# Process `T`
x3 = keras.layers.Dense(64, activation="softplus", kernel_regularizer=regularizer)(paramsInputs["T"])

# Concatenate processed inputs
x = keras.layers.Concatenate()([x1, x2, x3])

# Additional Dense layers
x = keras.layers.Dense(512, activation="softplus", kernel_regularizer=regularizer)(x)
x = keras.layers.Dense(512, activation="softplus", kernel_regularizer=regularizer)(x)

# Output layer
outputs = keras.layers.Dense(trainingGenerator.outputShape[0], name="n")(x)

# Define Model
model = keras.Model(inputs=(profile1Inputs | profile2Inputs | paramsInputs), outputs=outputs)

# Compile Model
model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.MeanAbsoluteError(),
    metrics=[keras.metrics.MeanAbsoluteError()]
)

# Print Model Summary
model.summary()

# Visualize Model
keras.utils.plot_model(model, show_shapes=True, show_layer_names=True, show_layer_activations=True, dpi=80)


## Train neural network

In [None]:

import callbacks as cb

# Define the callbacks
callbacks = [
    keras.callbacks.LearningRateScheduler(cb.lrschedule),
    keras.callbacks.ModelCheckpoint(
        filepath="../../models/n1_dipole.keras",
        monitor="val_mean_absolute_error",
        save_best_only=True),
    keras.callbacks.EarlyStopping(
        monitor="val_mean_absolute_error",
        patience=20),
    cb.LossHistory()]


# Train the model
model.fit(
    trainingGenerator,
    validation_data=validationGenerator,
    epochs=200,
    callbacks=callbacks
)



In [None]:

#model = keras.models.load_model("../../models/n1_dipole.keras")
model = keras.models.load_model("../../../bui2026-dielectrocapillarity/models/n1_dipole_Apr30.keras")

testGenerator = DataGeneratorThreeInput(simData["test"], **generatorOptions)
test_metrics = model.evaluate(testGenerator)


## See the predicted correlation function of test set

In [None]:

def generate_windows(array, bins, mode="wrap"):

    padded_array = np.pad(array, bins, mode=mode)
    windows = np.empty((len(array), 2 * bins + 1))
    for i in range(len(array)):
        windows[i] = padded_array[i:i + 2 * bins + 1]
    return windows


def get_charge_density(model, density_profile, elec, params):
    input_bins = model.input_shape[1][1]
    window_bins = (input_bins - 1) // 2
    rho_windows = generate_windows(density_profile, window_bins).reshape(density_profile.shape[0], input_bins, 1)
    elec_windows = generate_windows(elec, window_bins).reshape(elec.shape[0], input_bins, 1)
    
    paramsInput = {key: tf.convert_to_tensor(np.full(density_profile.shape[0], value)) for key, value in params.items()}
    return model.predict_on_batch({"rho_A": rho_windows, "elec": elec_windows, **paramsInput}).flatten()


# Combine all tests simulations into one list
all_test_simulations = []
for category in ['test']:
    all_test_simulations.extend(list(simData[category].keys()))


# Select a random simulation
random_sim = random.choice(all_test_simulations)

# Determine which category the random simulation belongs to
category = next(cat for cat in simData if random_sim in simData[cat])


# Get the data for the random simulation
data = simData[category][random_sim]

# Extract z, rho, muloc, and c1
xbins = data['profiles']['xbins']
rho = data['profiles']['rho_A']
muloc = data['profiles']['muloc_A']
elec = data['profiles']['elec']
elec_grad = data['profiles']['elec_grad']
c1_sim = data['profiles']['c1_A_scaledT']
n_sim = data['profiles']['n']
temp = data['params']['T']

fig, ax = plt.subplots(3, 1, figsize=(5,6), sharex='all')

ax[0].plot(xbins, elec, label='phi', color='gray')

ax[0].set_ylabel(r'$e\beta\phi(z)$')
ax[0].set_title(f'{random_sim}, {temp}')


ax[1].plot(xbins, rho, color='deepskyblue')


ax[2].plot(xbins, n_sim, label='sim', color='deepskyblue', lw=2)

n_pred = get_charge_density(model, rho, elec, {"T": temp})
ax[2].plot(xbins, n_pred, label='predicted', color='blue', ls='--')


ax[1].set_ylabel(r'$\rho(z)$')
ax[2].set_ylabel(r'$10^3 n^\mathrm{(1)}(z)$ [$ e\mathrm{\AA}^{-3}$]')
ax[2].set_xlabel(r'$z$ [$\mathrm{\AA}$]')

ax[2].legend()
ax[2].set_xlim(0, 20)
ax[2].set_ylim(-10, 10)


place(ax[0])
place(ax[1])
place(ax[2])

plt.show()