Imports
-------

In [61]:
from typing import List, Tuple, Union
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
import sklearn.model_selection as sk
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model

Data generation
---------------

In [50]:
def get_gaussian(amplitude, x, center, width):
    return amplitude * np.exp(-((x - center) ** 2) / (2 * width ** 2))

import numpy as np

def generate_gaussian_data(
    num_samples, 
    input_length, 
    num_gaussian=(1, 5), 
    amplitude_range=(1, 5), 
    center_range=(32, 96), 
    width_range=(5, 20), 
    noise_amplitude=0.0):
    """
    Generate a dataset of Gaussian curves with optional Gaussian noise.

    Parameters
    ----------
    num_samples : int
        Number of slices to generate.
    input_length : int
        Length of each slice.
    num_gaussian : int or tuple of int
        Number of Gaussian peaks per slice (fixed or random range).
    amplitude_range : tuple of float
        Range of amplitudes for Gaussian peaks.
    center_range : tuple of int
        Range of center positions for Gaussian peaks.
    width_range : tuple of float
        Range of standard deviations (widths) for Gaussian peaks.
    noise_amplitude : float
        Standard deviation of Gaussian noise added to each slice.

    Returns
    -------
    X_train : numpy.ndarray
        Array of slices with Gaussian peaks and added noise, shape (num_samples, input_length, 1).
    amplitudes_array : numpy.ndarray
        Array of amplitudes for each peak, shape (num_samples, max_peaks).
    num_peaks_array : numpy.ndarray
        Number of peaks per slice, shape (num_samples,).
    peak_positions_array : numpy.ndarray
        Positions of peaks, shape (num_samples, max_peaks).
    peak_widths_array : numpy.ndarray
        Widths of peaks, shape (num_samples, max_peaks).
    """
    if isinstance(num_gaussian, tuple):
        min_peaks, max_peaks = num_gaussian
    else:
        min_peaks = max_peaks = num_gaussian

    max_peaks = max(max_peaks, 1)  # Ensure at least one peak
    x = np.linspace(0, input_length - 1, input_length)  # Shared x-axis for all samples

    # Preallocate arrays
    X_train = np.zeros((num_samples, input_length, 1))
    amplitudes_array = np.zeros((num_samples, max_peaks))
    num_peaks_array = np.random.randint(min_peaks, max_peaks + 1, size=num_samples)
    peak_positions_array = np.zeros((num_samples, max_peaks))
    peak_widths_array = np.zeros((num_samples, max_peaks))

    # Generate Gaussian parameters
    for i in range(num_samples):
        num_peaks = num_peaks_array[i]
        
        amplitudes = np.random.uniform(amplitude_range[0], amplitude_range[1], size=num_peaks)
        centers = np.random.uniform(center_range[0], center_range[1], size=num_peaks)
        widths = np.random.uniform(width_range[0], width_range[1], size=num_peaks)

        # Compute Gaussian curves in vectorized form
        gaussians = amplitudes[:, None] * np.exp(-((x - centers[:, None])**2) / (2 * widths[:, None]**2))
        slice_curve = np.sum(gaussians, axis=0)

        # Add Gaussian noise if applicable
        if noise_amplitude > 0:
            slice_curve += np.random.normal(0, noise_amplitude, input_length)

        # Store results
        X_train[i, :, 0] = slice_curve
        amplitudes_array[i, :num_peaks] = amplitudes
        peak_positions_array[i, :num_peaks] = centers
        peak_widths_array[i, :num_peaks] = widths

    return X_train, amplitudes_array, num_peaks_array, peak_positions_array, peak_widths_array

Model generation
----------------

In [51]:
def build_model(input_length, max_number_of_peaks):
    """
    Build a model that predicts both the maximum value and the position of the maximum value
    from the input signal.

    Parameters
    ----------
    input_length : int
        Length of the input signal (e.g., 128).

    Returns
    -------
    tensorflow.keras.Model
        The constructed Keras model.
    """
    input_layer = layers.Input(shape=(input_length, 1))

    # Feature extraction
    x = layers.Conv1D(filters=32, kernel_size=3, activation="relu", padding="same")(input_layer)
    x = layers.BatchNormalization()(x)
    x = layers.Conv1D(filters=64, kernel_size=3, activation="relu", padding="same")(x)
    x = layers.BatchNormalization()(x)

    attention = layers.Attention()([x, x])

    x = layers.GlobalAveragePooling1D()(attention)
    

    # Flatten and dense layers
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation="relu")(x)
    x = layers.Dense(32, activation="relu")(x)

    # Output layer
    num_peaks_output = layers.Dense(max_number_of_peaks + 1, activation="softmax", name="num_peaks")(x)

    return models.Model(inputs=input_layer, outputs=num_peaks_output)

Utils
-----

In [145]:
from itertools import islice

def batched(iterable, n):
    # batched('ABCDEFG', 3) → ABC DEF G
    if n < 1:
        raise ValueError('n must be at least one')
    iterator = iter(iterable)
    while batch := tuple(islice(iterator, n)):
        yield batch

        
def dataset_split(test_size, random_state, max_number_of_peaks, **kwargs):

    values = list(kwargs.values())

    splitted = sk.train_test_split(*values, test_size=0.2, random_state=42)

    output = {
        'train': dict(), 'test': dict()
    }

    for (k, v), (train_data, test_data) in zip(kwargs.items(), batched(splitted, 2)):
        if k == 'num_peaks':
            train_data =  to_categorical(train_data, max_number_of_peaks + 1)
            test_data =  to_categorical(test_data, max_number_of_peaks + 1)
        
        output['train'][k] = train_data
        output['test'][k] = test_data

    return output


def plot_training_history(history):
    """
    Plot training and validation performance metrics (loss and accuracy).

    Parameters
    ----------
    history : tensorflow.keras.callbacks.History
        The training history object from model.fit().
    """
    plt.figure(figsize=(10, 5))
    ax = plt.gca()
    twin_ax = ax.twinx()

    # Plot training loss
    ax.plot(history.history['loss'], label='Training Loss', color='C0')
    ax.set_ylabel('Loss', color='C0')
    ax.tick_params(axis='y', labelcolor='C0')

    # Plot validation accuracy
    twin_ax.plot(history.history['val_accuracy'], label='Validation Accuracy', color='C1')
    twin_ax.set_ylabel('Accuracy', color='C1')
    twin_ax.tick_params(axis='y', labelcolor='C1')

    plt.xlabel('Epochs')
    ax.legend(loc='upper left')
    twin_ax.legend(loc='upper right')
    plt.title("Training History")
    plt.tight_layout()
    plt.show()


def visualize_validation_cases(model, validation_data, num_examples: int = 5, n_columns: int = 1):
    """
    Visualize validation cases by comparing true and predicted values.

    Parameters
    ----------
    model : tensorflow.keras.Model
        The trained Keras model.
    validation_data : dict
        Dictionary containing validation data and labels:
        {'raw': input signals, 'num_peaks': ground truth peak counts, 
         'positions': peak positions, 'widths': peak widths, 'amplitudes': peak amplitudes}.
    num_examples : int, optional
        Number of validation cases to visualize. Default is 5.
    """
    plt.close('all')
    num_rows = num_examples
    n_rows = int(np.ceil(num_examples / n_columns))

    fig, axes = plt.subplots(n_rows, n_columns, figsize=(3 * n_columns, num_rows * 3))
    axes = axes.ravel()

    validation_data = validation_data['test']

    indices = np.random.choice(len(validation_data['raw']), num_examples, replace=False)
    x = np.arange(validation_data['raw'].shape[1])  # Assuming fixed-length signals

    for ax, idx in zip(axes, indices):
        # Input signal
        input_signal = validation_data['raw'][idx, :, 0]

        # True labels
        true_num_peaks = validation_data['num_peaks'][idx]
        true_peak_positions = validation_data['positions'][idx]
        true_peak_widths = validation_data['widths'][idx]
        true_peak_amplitudes = validation_data['amplitudes'][idx]

        # Model prediction
        predicted_num_peaks = model.predict(validation_data['raw'][idx:idx + 1])[0]

        # Plot input signal
        ax.plot(input_signal, label=f"True: {np.argmax(true_num_peaks)}, Pred: {np.argmax(predicted_num_peaks)}")

        # Overlay true peaks as dashed lines and vertical markers
        for position, amplitude, width in zip(true_peak_positions, true_peak_amplitudes, true_peak_widths):
            if position != 0:
                ax.plot(x, get_gaussian(amplitude, x, position, width), linestyle='--', linewidth=1, color='black')
                ax.axvline(position, color='red', linestyle='dotted', alpha=0.7)

        ax.legend()
        ax.set_title(f"Validation Example {idx}")
        ax.set_xlabel("Signal Index")
        ax.set_ylabel("Amplitude")

    plt.tight_layout()
    plt.show()

def plot_model_performance(history, model, validation_data, num_examples=5):
    """
    Plot both the training history and validation cases.

    Parameters
    ----------
    history : tensorflow.keras.callbacks.History
        The training history object from model.fit().
    model : tensorflow.keras.Model
        The trained Keras model.
    validation_data : dict
        Dictionary containing validation data.
    num_examples : int, optional
        Number of validation cases to visualize. Default is 5.
    """
    plot_training_history(history)
    visualize_validation_cases(model, validation_data, num_examples=num_examples)

Script
------

In [146]:
# Generate data with up to 5 peaks per slice
INPUT_LENGTH = 200
MAX_NUMBER_OF_PEAKS = 3

gaussian_slices, amplitudes, num_peaks, positions, widths = generate_gaussian_data(
    num_samples=2000,
    input_length=INPUT_LENGTH,
    num_gaussian=(1, MAX_NUMBER_OF_PEAKS),
    amplitude_range=(1, 40),
    center_range=(50, 150),
    width_range=(5, 5),
    noise_amplitude=0.2
)

data = dataset_split(
    raw=gaussian_slices, 
    positions=positions, 
    num_peaks=num_peaks, 
    amplitudes=amplitudes, 
    widths=widths, 
    test_size=0.2, 
    random_state=None, 
    max_number_of_peaks=MAX_NUMBER_OF_PEAKS
)

# Define the callback to save the best model
checkpoint_callback = ModelCheckpoint(
    filepath="best_model.keras",          # Path to save the best model
    monitor="val_loss",                # Metric to monitor (e.g., "val_loss" or "val_accuracy")
    save_best_only=True,               # Save only the best model
    save_weights_only=False,           # Save the entire model (set True to save only weights)
    mode="min",                        # "min" for loss (smaller is better), "max" for accuracy
    verbose=1                          # Display a message when saving the model
)

model = build_model(input_length=INPUT_LENGTH, max_number_of_peaks=MAX_NUMBER_OF_PEAKS)

model.compile(
    optimizer='adam',
    loss={'num_peaks': 'categorical_crossentropy',},
    metrics={'num_peaks': ['accuracy'],}
)

history = model.fit(
    data['train']['raw'],
    data['train']['num_peaks'],
    validation_data=(data['test']['raw'], data['test']['num_peaks']),
    epochs=100,
    batch_size=32,
    callbacks=[checkpoint_callback]
)

best_model = load_model("best_model.keras")

plot_training_history(history)

Epoch 1/100
[1m49/50[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 14ms/step - accuracy: 0.4066 - loss: 1.1930  
Epoch 1: val_loss improved from inf to 1.43967, saving model to best_model.keras
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 17ms/step - accuracy: 0.4125 - loss: 1.1834 - val_accuracy: 0.4175 - val_loss: 1.4397
Epoch 2/100
[1m49/50[0m [32m━━━━━━━━━━━━━━━━━━━[0m[37m━[0m [1m0s[0m 14ms/step - accuracy: 0.7700 - loss: 0.5498
Epoch 2: val_loss improved from 1.43967 to 1.00537, saving model to best_model.keras
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 16ms/step - accuracy: 0.7700 - loss: 0.5490 - val_accuracy: 0.5400 - val_loss: 1.0054
Epoch 3/100
[1m47/50[0m [32m━━━━━━━━━━━━━━━━━━[0m[37m━━[0m [1m0s[0m 14ms/step - accuracy: 0.7957 - loss: 0.4625
Epoch 3: val_loss improved from 1.00537 to 0.93394, saving model to best_model.keras
[1m50/50[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 16ms/step - accuracy: 0

In [148]:
%matplotlib qt
# plot_model_performance(
#     history=history, 
#     model=model, 
#     validation_data=data, 
#     num_examples=5
# )
visualize_validation_cases(best_model, data, num_examples=12, n_columns=3)

# plot_training_history(history)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 12ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 11ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 10ms/step
