# NN training for crystallite size prediction
A. Boulle and A. Souesme, 2025

In [None]:
# Required libraries
import matplotlib.pyplot as plt
import numpy as np
import pickle
import os
import tensorflow as tf
from tensorflow import keras
from utilities import NormalizeL, Log10Layer
%matplotlib ipympl

### Load and format data

In [None]:
# ======== Begin user input ========
# Path to input data
data_path = "/PATH/TO/DATA/FOLDER"
# Path to output data
save_path = "./checkpoints/"
# Define phases and and selected labels
list_phases = ["m_ZrO2"]
list_labels = ["size"]
# Index corresponding to "size" in train_label
indices_labels = [20]
# Angular range used when creating the dataset. For display purpose only
tth = np.arange(15, 120.01, 0.01)
# Train/validation split
validation_split = 0.1
# Batch size for training
batch_size = 32
# ======== End user input ========

### 01. Create output folder if it does not exist
try:
    os.mkdir(save_path)
    print("Created output folder:", save_path)
except:
    print("Folder already exists:", save_path)
    pass

### 02. Load training data
print("Loading and formating data, please wait...")
raw_train = np.load(
    data_path + "/train.npz", mmap_mode="r"
)  # memory map to avoid huge load
sampling = 1  # can be used to reduce the dataset size for testing purpose
train_data = raw_train["train_data"][::sampling, :]
train_label = raw_train["train_label"][::sampling, indices_labels]
copy_for_plot_label = np.copy(train_label)  # For display purpose only

### 03. Prepare data and labels for training
# Add empty dimension and channel
train_data = train_data[:, :, np.newaxis, np.newaxis]

# Normalize labels
normL = NormalizeL(train_label, norm="MinMax")
train_label = normL.forward(train_label)

# Save normalization class. Important to be able to convert back to real values after prediction.
with open(save_path + "/" + "normL.class", "wb") as f:
    pickle.dump(normL, f)

# Shuffle indices
nb_data = train_data.shape[0]
indices = np.arange(nb_data)
np.random.shuffle(indices)
train_data = train_data[indices]
train_label = train_label[indices]
copy_for_plot_label = copy_for_plot_label[indices]  # For display purpose only

# Get shapes
input_shape = train_data[0].shape
label_shape = train_label[0].shape


### 04. Instead of loading the full dataset in memory, we use the TF Dataset format and a generator
# This prevents memory issues when using large datasets.
# Generator function
def data_generator():
    for x, y in zip(train_data, train_label):
        yield x.astype(np.float32), y.astype(np.float32)


# Dataset definition
output_signature = (
    tf.TensorSpec(shape=input_shape, dtype=tf.float32),
    tf.TensorSpec(shape=label_shape, dtype=tf.float32),
)
full_dataset = tf.data.Dataset.from_generator(
    data_generator, output_signature=output_signature
)

nb_val = int(nb_data * validation_split)

train_dataset = full_dataset.skip(nb_val).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = full_dataset.take(nb_val).batch(batch_size).prefetch(tf.data.AUTOTUNE)

print("Done. Dataset ready for training.")

### Plot data

In [None]:
# run multiple times to see different patterns
random_data = np.random.randint(nb_data)
print(f"size = {copy_for_plot_label[random_data][0]:.3f} Âµm")
plt.cla()
plt.semilogy(tth, train_data[random_data, :, 0, 0])
plt.show()

### Neural network architecture

In [None]:
# Custom layer with two parallel convolutions
def conv_block(x, filters, strideC1=(2, 1), strideC2=(1, 1)):
    shortcut = x
    x = keras.layers.Conv2D(
        filters, (filter_length, 1), strides=strideC1, padding="same"
    )(x)
    x = keras.layers.Activation("relu")(x)
    x = keras.layers.Conv2D(
        filters, (filter_length, 1), strides=strideC2, padding="same"
    )(x)
    if strideC2 != (1, 1) or shortcut.shape[-1] != filters:
        shortcut = keras.layers.Conv2D(
            filters, (filter_length, 1), strides=strideC1, padding="same"
        )(shortcut)
    x = keras.layers.Add()([x, shortcut])
    x = keras.layers.Activation("relu")(x)
    return x

In [None]:
# NN architecture
filter_length = 10
pool_length = 2
activation = "relu"
use_bias = True
kernels_per_layer = [32, 64, 128, 256, 512]
# Number of output neurons
nbr_output_neurons = len(list_phases) * len(list_labels)

inputs = keras.Input(shape=input_shape)

x = Log10Layer()(inputs)  # Log10 normalization
# x = MinMaxScalingLayer(axis=1)(x)  # Min-Max scaling
x = conv_block(x, 16)
x = keras.layers.MaxPool2D((pool_length, 1))(x)

for kernels in kernels_per_layer:
    x = conv_block(x, kernels)
    x = keras.layers.MaxPool2D((pool_length, 1))(x)

x = keras.layers.Flatten()(x)
x = keras.layers.Dense(1000, activation=activation)(x)

outputs = keras.layers.Dense(nbr_output_neurons)(x)

cnn = keras.Model(inputs=inputs, outputs=outputs, name="CNN")
cnn.compile(optimizer=keras.optimizers.Adam(learning_rate=1e-4), loss="mse")

cnn.summary()

### Training

In [None]:
# Callbacks used for training: reduce LR on plateau and save best model
callbacks_list = [
    keras.callbacks.ReduceLROnPlateau(
        monitor="val_loss",
        factor=0.1,
        patience=10,
    ),
    keras.callbacks.ModelCheckpoint(
        filepath=save_path + "/CNN.hdf5",
        monitor="val_loss",
        verbose=0,
        save_best_only=True,
        save_weights_only=False,
        mode="auto",
    ),
]

# Training. Adjust number of epochs as needed.
epochs = 5

cnn_hist = cnn.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    callbacks=callbacks_list,
)

In [None]:
# Plot learning curve
fig, ax1 = plt.subplots()
ax1.semilogy(cnn_hist.history["loss"], label="Training Loss")
ax1.semilogy(cnn_hist.history["val_loss"], label="Validation Loss")
ax1.set_xlabel("Epochs")
ax1.set_ylabel("MSE")
ax1.legend()

ax2 = ax1.twinx()
ax2.semilogy(cnn_hist.history["lr"], ":r")
ax2.set_ylabel("Learning Rate")