In [1]:
import pickle
from random import gauss

import numpy as np
import pandas as pd
from scipy.stats import entropy

import matplotlib.pylab as plt
import matplotlib.image as img

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.models import Sequential, load_model

from tensorflow.keras.layers import Input, Dense, LSTM, Dropout, concatenate
from tensorflow.keras.layers import concatenate, Dense, Softmax, Multiply, Add

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.utils import Sequence




import os
os.makedirs('trained_model', exist_ok=True)

import warnings
warnings.filterwarnings("ignore")

import tensorflow as tf
tf.get_logger().setLevel('ERROR')

import absl.logging
absl.logging.set_verbosity(absl.logging.ERROR)

pd.options.mode.chained_assignment = None

In [2]:
def load_datasets(path='simulation_data/', string='M1_M20_train_val_test_set'):
    if not path.endswith('/'):
        path += '/'
    filenames = ['x1_train', 'x2_train', 'y_train', 'x1_val', 'x2_val', 'y_val', 'x1_test', 'x2_test', 'y_test']
    data = []
    for file_name in filenames:
        file_path = path + file_name + '_' + string + '.pkl'
        with open(file_path, 'rb') as f:
            data.append(pickle.load(f))
    return data

x1_train, x2_train, y_train, x1_val, x2_val, y_val, x1_test, x2_test, y_test = \
    load_datasets(path='simulation_data/', string='M1_M20_train_val_test_set')

print("x1_train shape:", x1_train.shape)
print("x2_train shape:", x2_train.shape)
print("y_train shape:", y_train.shape)
print("x1_val shape:", x1_val.shape)
print("x2_val shape:", x2_val.shape)
print("y_val shape:", y_val.shape)
print("x1_test shape:", x1_test.shape)
print("x2_test shape:", x2_test.shape)
print("y_test shape:", y_test.shape)

x1_train shape: (4950000, 4)
x2_train shape: (4950000, 21, 12)
y_train shape: (4950000, 20)
x1_val shape: (50000, 4)
x2_val shape: (50000, 21, 12)
y_val shape: (50000, 20)
x1_test shape: (100000, 4)
x2_test shape: (100000, 7, 12)
y_test shape: (100000, 20)


In [3]:
def check_one_hot(arr):
    if arr.ndim != 2 or arr.shape[1] != 20:
        return False
    row_sums = np.sum(arr, axis=1)
    return np.all((row_sums == 1) & (np.count_nonzero(arr, axis=1) == 1))

def print_label_info(arr, name, n_show=5, seed=42):
    print(f"{name} shape: {arr.shape}")
    if check_one_hot(arr):
        print("Format: one-hot encoding\n")
        unique_indices = np.unique(np.argmax(arr, axis=1))
        print(f"Unique mechanism labels in {name}: {[f'M{idx+1:02d}' for idx in unique_indices]}")
        np.random.seed(seed)
        indices = np.random.choice(arr.shape[0], n_show, replace=False)
        for i, idx in enumerate(indices):
            label = np.argmax(arr[idx])
            one_hot_str = ', '.join(str(int(x)) for x in arr[idx])
            print(f"Sample {i+1} (index {idx}): One-hot: [{one_hot_str}], Mechanism Label: M{label+1:02d}")
    else:
        print("Format: NOT one-hot encoding")
    print()
    print('-'*80)

print_label_info(y_train, "y_train")
print_label_info(y_val, "y_val")
print_label_info(y_test, "y_test")

print("The label for each sample is the corresponding mechanism used to generate its kinetic data, encoded as a one-hot vector as follows:\n")
for i in range(20):
    vec = ['0']*20
    vec[i] = '1'
    print(f"M{i+1:>2}: ({', '.join(vec)})")

y_train shape: (4950000, 20)
Format: one-hot encoding

Unique mechanism labels in y_train: ['M01', 'M02', 'M03', 'M04', 'M05', 'M06', 'M07', 'M08', 'M09', 'M10', 'M11', 'M12', 'M13', 'M14', 'M15', 'M16', 'M17', 'M18', 'M19', 'M20']
Sample 1 (index 2647414): One-hot: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0], Mechanism Label: M11
Sample 2 (index 1027977): One-hot: [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], Mechanism Label: M05
Sample 3 (index 3993827): One-hot: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0], Mechanism Label: M17
Sample 4 (index 2955156): One-hot: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0], Mechanism Label: M12
Sample 5 (index 319154): One-hot: [0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], Mechanism Label: M02

--------------------------------------------------------------------------------
y_val shape: (50000, 20)
Format: one-hot encoding

Unique mechanism labels in y_val: ['M01', 'M02

In [None]:
def build_lstm_classifier(input1_shape, input2_shape, output_shape, dropout_rate=0.3):
    initial_concentrations = Input(shape=input1_shape)
    kinetics = Input(shape=(None, input2_shape[-1]))

    c = Dense(64, activation='relu')(initial_concentrations)
    c = Dropout(dropout_rate)(c)

    k = LSTM(64, return_sequences=True, dropout=dropout_rate)(kinetics)
    k = LSTM(64, dropout=dropout_rate)(k)
    k = Dropout(dropout_rate)(k)

    combined = concatenate([c, k])
    pred = Dense(64, activation='relu', kernel_initializer='he_uniform')(combined)
    pred = Dropout(dropout_rate)(pred)
    pred = Dense(output_shape[1], activation='softmax', name='Dense_5')(pred)

    model = Model(inputs=[initial_concentrations, kinetics], outputs=pred)
    return model

class KineticsBatchGenerator(Sequence):
    def __init__(self, X, y, tps, error_range, seed_value=1, batch_size=1024, shuffle=True):
        self.x1, self.x2 = X
        self.y = y
        self.tps = tps
        self.error_range = error_range
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.total_batches = self.__len__()
        self.n_runs = 4
        self.species = [1, 2]
        self.error_species = len(self.species)
        self.error_dict = {key: [] for key in error_range}
        np.random.seed(seed_value)
        self.randomstate = np.random.default_rng(seed_value)
        self.on_epoch_end()
        examples, timepoints, curves = self.x2.shape
        columns = curves // self.n_runs
        self.index_species = []
        for i in range(self.n_runs):
            t_species = [a + (i * columns) for a in self.species]
            self.index_species = self.index_species + t_species

    def __len__(self):
        return int(np.floor(len(self.y) / self.batch_size))

    def __getitem__(self, index):
        return self.__data_generation(index)

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.y))
        if self.shuffle:
            np.random.shuffle(self.indexes)
        for error in self.error_range:
            if error != 0:
                preu = np.array([gauss(0, error / 100) for _ in range(self.batch_size * np.max(self.tps) * self.error_species * self.n_runs)])
                self.error_dict[error] = np.reshape(preu, (self.batch_size, np.max(self.tps), self.error_species * self.n_runs))

    def __data_generation(self, index):
        x1 = self.x1[self.batch_size * index: self.batch_size * (index + 1)]
        x2 = self.introduce_error(self.trim_x2(self.x2[self.batch_size * index: self.batch_size * (index + 1)], self.tps, index))
        y = self.y[self.batch_size * index: self.batch_size * (index + 1)]
        x = [x1, x2]
        return x, y

    def trim_x2(self, original_x2, tps, index):
        original_tps = original_x2.shape[1]
        n_tps = self.randomstate.choice(tps)
        idx = np.sort(np.append(self.randomstate.choice(original_tps - 1, n_tps, replace=False) + 1, [0]))
        return original_x2[:, idx].copy()

    def introduce_error(self, x2):
        examples, timepoints, curves = x2.shape
        error = self.randomstate.choice(self.error_range)
        if error != 0:
            x2[:, 1:, self.index_species] = x2[:, 1:, self.index_species] + self.error_dict[error][:, 0:timepoints - 1]
        return x2

def get_top_mechanism_indices(predictions):
    threshold = 0.99
    index = np.argsort(predictions)
    prob = 0
    grouping = []
    for j in index[::-1]:
        prob += predictions[j]
        grouping.append(j)
        if prob >= threshold:
            break
    return grouping

def mechanism_indices_to_names(indices):
    mechanism_list = ['M' + str(i) for i in range(1, 21)]
    grouping_names = [mechanism_list[i] for i in indices]
    return grouping_names

def plot_kinetic_profiles_and_mechanisms(x_list, mechanism_names, columns=3, s=30, A0=1, show=True, overlay=False, labels=None):
    x1, x2 = x_list
    rows = len(x1)
    curves = x2.shape[2] // columns
    if not overlay:
        plt.figure(figsize=(4 * columns, 7 * rows))
    i = 1
    for row in range(rows):
        for column in range(columns):
            plt.subplot(2, columns, i)
            if not overlay:
                if column == 3:
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5))
                else:
                    percentage = ' (' + str(round(x1[row, column] / A0 * 100, 3)) + ' mol%)'
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5)) + percentage
                plt.title(title)
            if type(labels) == list:
                label = labels
            else:
                label = None
            for j in range(1, curves):
                if type(labels) == list:
                    label = labels[j - 1]
                else:
                    label = None
                plt.scatter(x2[row, :, column * curves], x2[row, :, (j) + column * curves], s, label=label)
            if not overlay:
                plt.xlim(left=0, right=max(x2[row, :, column * curves]) * 1.1)
                plt.ylim(bottom=0)
            i += 1
    plt.tight_layout()
    mechs = []
    for i, name in enumerate(mechanism_names):
        mechs.append(img.imread('Images/' + name + '.jpg'))
    length = len(mechs)
    for i, mech in enumerate(mechs):
        plt.subplot(2, length, length + i + 1)
        plt.axis('off')
        plt.imshow(mech)

MODEL_COLUMN_MAP = {
    'M1_20_model_bayes': ['Time', 'S', 'P', 'catT']
}

In [None]:
def train_mechanism_model(
    tps = [3,4,5,6,7,8,9,10,15,20],
    error_range = [0],
    epochs = 3000,
    start = 0,
    batch_size_training = 4092
):
    x1_train, x2_train, y_train, x1_val, x2_val, y_val, x1_test, x2_test, y_test = load_datasets()

    model = build_lstm_classifier(
        input1_shape = x1_train.shape[1:],
        input2_shape = x2_train.shape[1:],
        output_shape = y_train.shape  
    )
    opt = Adam(learning_rate=0.001)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

    early_stopping = EarlyStopping(
        monitor='val_categorical_accuracy',
        verbose=1,
        patience=10,
        mode='max',
        restore_best_weights=True
    )
    checkpoint = ModelCheckpoint(
        "trained_model/best_model_weights",
        monitor='val_categorical_accuracy',
        verbose=1,
        save_best_only=True,
        save_weights_only=True
    )

    model.fit(
        KineticsBatchGenerator([x1_train, x2_train], y_train, tps, error_range, batch_size=batch_size_training),
        validation_data=KineticsBatchGenerator([x1_val, x2_val], y_val, tps, error_range, batch_size=25),
        callbacks=[checkpoint, early_stopping],
        epochs=start+epochs,
        initial_epoch=start
    )

    model.save('trained_model/M1_20_model_bayes')
    print('Training completed. Optimal weights saved to: trained_model/best_model_weights')
    print('Trained model saved to: trained_model/M1_20_model_bayes')
    return model

model = train_mechanism_model()

In [None]:
def mc_dropout_predict(model, x_inputs, n_mc=100):
    preds = [model(x_inputs, training=True).numpy() for _ in range(n_mc)]
    all_probs = np.stack(preds, axis=0)
    mean_probs = all_probs.mean(axis=0)
    std_probs = all_probs.std(axis=0)
    return mean_probs, std_probs, all_probs

def credible_set(probs, threshold=0.95):
    idx = np.argsort(probs)[::-1]
    cumsum = np.cumsum(probs[idx])
    k = np.searchsorted(cumsum, threshold) + 1
    return idx[:k]

def mechanism_indices_to_names(indices):
    mechanism_list = ['M' + str(i) for i in range(1, 21)]
    return [mechanism_list[i] for i in indices]

model = load_model('trained_model/M1_20_model_bayes', compile=False)
model.load_weights('trained_model/best_model_weights')

mean_probs, std_probs, all_probs = mc_dropout_predict(model, [x1_test, x2_test], n_mc=100)
entropies = entropy(mean_probs.T).T
credible_sets = [credible_set(p, 0.95) for p in mean_probs]

np.savez('bayesian_results_mc_dropout.npz',
         mean_probs=mean_probs,
         std_probs=std_probs,
         entropies=entropies,
         credible_sets=credible_sets)

In [None]:
def build_lstm_classifier(input1_shape, input2_shape, output_shape, dropout_rate=0.3):
    initial_concentrations = Input(shape=input1_shape)
    kinetics = Input(shape=(None, input2_shape[-1]))

    c = Dense(64, activation='relu')(initial_concentrations)
    c = Dropout(dropout_rate)(c)

    k = LSTM(64, return_sequences=True, dropout=dropout_rate)(kinetics)
    k = LSTM(64, dropout=dropout_rate)(k)
    k = Dropout(dropout_rate)(k)

    #combined = concatenate([c, k])
    
    combined_raw = concatenate([c, k])
    attention_weights = Dense(combined_raw.shape[-1], activation='softmax')(combined_raw)
    combined = Multiply()([combined_raw, attention_weights])

    pred = Dense(64, activation='relu', kernel_initializer='he_uniform')(combined)
    pred = Dropout(dropout_rate)(pred)
    pred = Dense(output_shape[1], activation='softmax', name='Dense_5')(pred)

    model = Model(inputs=[initial_concentrations, kinetics], outputs=pred)
    return model

class KineticsBatchGenerator(Sequence):
    def __init__(self, X, y, tps, error_range, seed_value=1, batch_size=1024, shuffle=True):
        self.x1, self.x2 = X
        self.y = y
        self.tps = tps
        self.error_range = error_range
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.total_batches = self.__len__()
        self.n_runs = 4
        self.species = [1, 2]
        self.error_species = len(self.species)
        self.error_dict = {key: [] for key in error_range}
        np.random.seed(seed_value)
        self.randomstate = np.random.default_rng(seed_value)
        self.on_epoch_end()
        examples, timepoints, curves = self.x2.shape
        columns = curves // self.n_runs
        self.index_species = []
        for i in range(self.n_runs):
            t_species = [a + (i * columns) for a in self.species]
            self.index_species = self.index_species + t_species

    def __len__(self):
        return int(np.floor(len(self.y) / self.batch_size))

    def __getitem__(self, index):
        return self.__data_generation(index)

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.y))
        if self.shuffle:
            np.random.shuffle(self.indexes)
        for error in self.error_range:
            if error != 0:
                preu = np.array([gauss(0, error / 100) for _ in range(self.batch_size * np.max(self.tps) * self.error_species * self.n_runs)])
                self.error_dict[error] = np.reshape(preu, (self.batch_size, np.max(self.tps), self.error_species * self.n_runs))

    def __data_generation(self, index):
        x1 = self.x1[self.batch_size * index: self.batch_size * (index + 1)]
        x2 = self.introduce_error(self.trim_x2(self.x2[self.batch_size * index: self.batch_size * (index + 1)], self.tps, index))
        y = self.y[self.batch_size * index: self.batch_size * (index + 1)]
        x = [x1, x2]
        return x, y

    def trim_x2(self, original_x2, tps, index):
        original_tps = original_x2.shape[1]
        n_tps = self.randomstate.choice(tps)
        idx = np.sort(np.append(self.randomstate.choice(original_tps - 1, n_tps, replace=False) + 1, [0]))
        return original_x2[:, idx].copy()

    def introduce_error(self, x2):
        examples, timepoints, curves = x2.shape
        error = self.randomstate.choice(self.error_range)
        if error != 0:
            x2[:, 1:, self.index_species] = x2[:, 1:, self.index_species] + self.error_dict[error][:, 0:timepoints - 1]
        return x2

def get_top_mechanism_indices(predictions):
    threshold = 0.99
    index = np.argsort(predictions)
    prob = 0
    grouping = []
    for j in index[::-1]:
        prob += predictions[j]
        grouping.append(j)
        if prob >= threshold:
            break
    return grouping

def mechanism_indices_to_names(indices):
    mechanism_list = ['M' + str(i) for i in range(1, 21)]
    grouping_names = [mechanism_list[i] for i in indices]
    return grouping_names

def plot_kinetic_profiles_and_mechanisms(x_list, mechanism_names, columns=3, s=30, A0=1, show=True, overlay=False, labels=None):
    x1, x2 = x_list
    rows = len(x1)
    curves = x2.shape[2] // columns
    if not overlay:
        plt.figure(figsize=(4 * columns, 7 * rows))
    i = 1
    for row in range(rows):
        for column in range(columns):
            plt.subplot(2, columns, i)
            if not overlay:
                if column == 3:
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5))
                else:
                    percentage = ' (' + str(round(x1[row, column] / A0 * 100, 3)) + ' mol%)'
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5)) + percentage
                plt.title(title)
            if type(labels) == list:
                label = labels
            else:
                label = None
            for j in range(1, curves):
                if type(labels) == list:
                    label = labels[j - 1]
                else:
                    label = None
                plt.scatter(x2[row, :, column * curves], x2[row, :, (j) + column * curves], s, label=label)
            if not overlay:
                plt.xlim(left=0, right=max(x2[row, :, column * curves]) * 1.1)
                plt.ylim(bottom=0)
            i += 1
    plt.tight_layout()
    mechs = []
    for i, name in enumerate(mechanism_names):
        mechs.append(img.imread('Images/' + name + '.jpg'))
    length = len(mechs)
    for i, mech in enumerate(mechs):
        plt.subplot(2, length, length + i + 1)
        plt.axis('off')
        plt.imshow(mech)

MODEL_COLUMN_MAP = {
    'M1_20_model_bayes': ['Time', 'S', 'P', 'catT']
}

In [None]:
def train_mechanism_model(
    tps = [3,4,5,6,7,8,9,10,15,20],
    error_range = [0],
    epochs = 3000,
    start = 0,
    batch_size_training = 4092
):
    x1_train, x2_train, y_train, x1_val, x2_val, y_val, x1_test, x2_test, y_test = load_datasets()

    model = build_lstm_classifier(
        input1_shape = x1_train.shape[1:],
        input2_shape = x2_train.shape[1:],
        output_shape = y_train.shape  
    )
    opt = Adam(learning_rate=0.001)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

    early_stopping = EarlyStopping(
        monitor='val_categorical_accuracy',
        verbose=1,
        patience=10,
        mode='max',
        restore_best_weights=True
    )
    checkpoint = ModelCheckpoint(
        "trained_model/best_model_weights",
        monitor='val_categorical_accuracy',
        verbose=1,
        save_best_only=True,
        save_weights_only=True
    )

    model.fit(
        KineticsBatchGenerator([x1_train, x2_train], y_train, tps, error_range, batch_size=batch_size_training),
        validation_data=KineticsBatchGenerator([x1_val, x2_val], y_val, tps, error_range, batch_size=25),
        callbacks=[checkpoint, early_stopping],
        epochs=start+epochs,
        initial_epoch=start
    )

    model.save('trained_model/M1_20_model_bayes')
    print('Training completed. Optimal weights saved to: trained_model/best_model_weights')
    print('Trained model saved to: trained_model/M1_20_model_bayes')
    return model

model = train_mechanism_model()

In [7]:
def build_lstm_classifier(input1_shape, input2_shape, output_shape, dropout_rate=0.3):
    initial_concentrations = Input(shape=input1_shape)
    kinetics = Input(shape=(None, input2_shape[-1]))

    c = Dense(64, activation='relu')(initial_concentrations)
    c = Dropout(dropout_rate)(c)

    k = LSTM(64, return_sequences=True, dropout=dropout_rate)(kinetics)
    k = LSTM(64, dropout=dropout_rate)(k)
    k = Dropout(dropout_rate)(k)

    #combined = concatenate([c, k])
    
    #ccombined_raw = concatenate([c, k])
    #cattention_weights = Dense(combined_raw.shape[-1], activation='softmax')(combined_raw)
    #ccombined = Multiply()([combined_raw, attention_weights])

    c_proj = Dense(64)(c)
    k_proj = Dense(64)(k)
    alpha = Dense(64, activation='sigmoid')(c_proj) 
    beta = Dense(64, activation='sigmoid')(k_proj)   
    c_weighted = Multiply()([alpha, c_proj])
    k_weighted = Multiply()([beta, k_proj])
    combined = Add()([c_weighted, k_weighted])

    pred = Dense(64, activation='relu', kernel_initializer='he_uniform')(combined)
    pred = Dropout(dropout_rate)(pred)
    pred = Dense(output_shape[1], activation='softmax', name='Dense_5')(pred)

    model = Model(inputs=[initial_concentrations, kinetics], outputs=pred)
    return model

class KineticsBatchGenerator(Sequence):
    def __init__(self, X, y, tps, error_range, seed_value=1, batch_size=1024, shuffle=True):
        self.x1, self.x2 = X
        self.y = y
        self.tps = tps
        self.error_range = error_range
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.total_batches = self.__len__()
        self.n_runs = 4
        self.species = [1, 2]
        self.error_species = len(self.species)
        self.error_dict = {key: [] for key in error_range}
        np.random.seed(seed_value)
        self.randomstate = np.random.default_rng(seed_value)
        self.on_epoch_end()
        examples, timepoints, curves = self.x2.shape
        columns = curves // self.n_runs
        self.index_species = []
        for i in range(self.n_runs):
            t_species = [a + (i * columns) for a in self.species]
            self.index_species = self.index_species + t_species

    def __len__(self):
        return int(np.floor(len(self.y) / self.batch_size))

    def __getitem__(self, index):
        return self.__data_generation(index)

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.y))
        if self.shuffle:
            np.random.shuffle(self.indexes)
        for error in self.error_range:
            if error != 0:
                preu = np.array([gauss(0, error / 100) for _ in range(self.batch_size * np.max(self.tps) * self.error_species * self.n_runs)])
                self.error_dict[error] = np.reshape(preu, (self.batch_size, np.max(self.tps), self.error_species * self.n_runs))

    def __data_generation(self, index):
        x1 = self.x1[self.batch_size * index: self.batch_size * (index + 1)]
        x2 = self.introduce_error(self.trim_x2(self.x2[self.batch_size * index: self.batch_size * (index + 1)], self.tps, index))
        y = self.y[self.batch_size * index: self.batch_size * (index + 1)]
        x = [x1, x2]
        return x, y

    def trim_x2(self, original_x2, tps, index):
        original_tps = original_x2.shape[1]
        n_tps = self.randomstate.choice(tps)
        idx = np.sort(np.append(self.randomstate.choice(original_tps - 1, n_tps, replace=False) + 1, [0]))
        return original_x2[:, idx].copy()

    def introduce_error(self, x2):
        examples, timepoints, curves = x2.shape
        error = self.randomstate.choice(self.error_range)
        if error != 0:
            x2[:, 1:, self.index_species] = x2[:, 1:, self.index_species] + self.error_dict[error][:, 0:timepoints - 1]
        return x2

def get_top_mechanism_indices(predictions):
    threshold = 0.99
    index = np.argsort(predictions)
    prob = 0
    grouping = []
    for j in index[::-1]:
        prob += predictions[j]
        grouping.append(j)
        if prob >= threshold:
            break
    return grouping

def mechanism_indices_to_names(indices):
    mechanism_list = ['M' + str(i) for i in range(1, 21)]
    grouping_names = [mechanism_list[i] for i in indices]
    return grouping_names

def plot_kinetic_profiles_and_mechanisms(x_list, mechanism_names, columns=3, s=30, A0=1, show=True, overlay=False, labels=None):
    x1, x2 = x_list
    rows = len(x1)
    curves = x2.shape[2] // columns
    if not overlay:
        plt.figure(figsize=(4 * columns, 7 * rows))
    i = 1
    for row in range(rows):
        for column in range(columns):
            plt.subplot(2, columns, i)
            if not overlay:
                if column == 3:
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5))
                else:
                    percentage = ' (' + str(round(x1[row, column] / A0 * 100, 3)) + ' mol%)'
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5)) + percentage
                plt.title(title)
            if type(labels) == list:
                label = labels
            else:
                label = None
            for j in range(1, curves):
                if type(labels) == list:
                    label = labels[j - 1]
                else:
                    label = None
                plt.scatter(x2[row, :, column * curves], x2[row, :, (j) + column * curves], s, label=label)
            if not overlay:
                plt.xlim(left=0, right=max(x2[row, :, column * curves]) * 1.1)
                plt.ylim(bottom=0)
            i += 1
    plt.tight_layout()
    mechs = []
    for i, name in enumerate(mechanism_names):
        mechs.append(img.imread('Images/' + name + '.jpg'))
    length = len(mechs)
    for i, mech in enumerate(mechs):
        plt.subplot(2, length, length + i + 1)
        plt.axis('off')
        plt.imshow(mech)

MODEL_COLUMN_MAP = {
    'M1_20_model_bayes': ['Time', 'S', 'P', 'catT']
}

In [9]:
def train_mechanism_model(
    tps = [3,4,5,6,7,8,9,10,15,20],
    error_range = [0],
    epochs = 3000,
    start = 0,
    batch_size_training = 4092
):
    x1_train, x2_train, y_train, x1_val, x2_val, y_val, x1_test, x2_test, y_test = load_datasets()

    model = build_lstm_classifier(
        input1_shape = x1_train.shape[1:],
        input2_shape = x2_train.shape[1:],
        output_shape = y_train.shape  
    )
    opt = Adam(learning_rate=0.001)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

    early_stopping = EarlyStopping(
        monitor='val_categorical_accuracy',
        verbose=1,
        patience=10,
        mode='max',
        restore_best_weights=True
    )
    checkpoint = ModelCheckpoint(
        "trained_model/best_model_weights",
        monitor='val_categorical_accuracy',
        verbose=1,
        save_best_only=True,
        save_weights_only=True
    )

    model.fit(
        KineticsBatchGenerator([x1_train, x2_train], y_train, tps, error_range, batch_size=batch_size_training),
        validation_data=KineticsBatchGenerator([x1_val, x2_val], y_val, tps, error_range, batch_size=25),
        callbacks=[checkpoint, early_stopping],
        epochs=start+epochs,
        initial_epoch=start
    )

    model.save('trained_model/M1_20_model_bayes')
    print('Training completed. Optimal weights saved to: trained_model/best_model_weights')
    print('Trained model saved to: trained_model/M1_20_model_bayes')
    return model

model = train_mechanism_model()

Epoch 1/3000
  72/1209 [>.............................] - ETA: 1:47 - loss: 3.0047 - categorical_accuracy: 0.0360

KeyboardInterrupt: 

In [11]:
def build_lstm_classifier(input1_shape, input2_shape, output_shape, dropout_rate=0.3):
    initial_concentrations = Input(shape=input1_shape)
    kinetics = Input(shape=(None, input2_shape[-1]))

    c = Dense(64, activation='relu')(initial_concentrations)
    c = Dropout(dropout_rate)(c)

    k = LSTM(64, return_sequences=True, dropout=dropout_rate)(kinetics)
    k = LSTM(64, dropout=dropout_rate)(k)
    k = Dropout(dropout_rate)(k)

    #combined = concatenate([c, k])
    
    #ccombined_raw = concatenate([c, k])
    #cattention_weights = Dense(combined_raw.shape[-1], activation='softmax')(combined_raw)
    #ccombined = Multiply()([combined_raw, attention_weights])

    #c_proj = Dense(64)(c)
    # k_proj = Dense(64)(k)
    # alpha = Dense(64, activation='sigmoid')(c_proj) 
    # beta = Dense(64, activation='sigmoid')(k_proj)   
    # c_weighted = Multiply()([alpha, c_proj])
    # k_weighted = Multiply()([beta, k_proj])
    # combined = Add()([c_weighted, k_weighted])

    dim = 64
    c_proj = Dense(dim)(c)
    k_proj = Dense(dim)(k)
    combined = tf.keras.layers.Multiply()([c_proj, k_proj])
    combined = Dense(64, activation='relu')(combined)  
 
    pred = Dense(64, activation='relu', kernel_initializer='he_uniform')(combined)
    pred = Dropout(dropout_rate)(pred)
    pred = Dense(output_shape[1], activation='softmax', name='Dense_5')(pred)

    model = Model(inputs=[initial_concentrations, kinetics], outputs=pred)
    return model

class KineticsBatchGenerator(Sequence):
    def __init__(self, X, y, tps, error_range, seed_value=1, batch_size=1024, shuffle=True):
        self.x1, self.x2 = X
        self.y = y
        self.tps = tps
        self.error_range = error_range
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.total_batches = self.__len__()
        self.n_runs = 4
        self.species = [1, 2]
        self.error_species = len(self.species)
        self.error_dict = {key: [] for key in error_range}
        np.random.seed(seed_value)
        self.randomstate = np.random.default_rng(seed_value)
        self.on_epoch_end()
        examples, timepoints, curves = self.x2.shape
        columns = curves // self.n_runs
        self.index_species = []
        for i in range(self.n_runs):
            t_species = [a + (i * columns) for a in self.species]
            self.index_species = self.index_species + t_species

    def __len__(self):
        return int(np.floor(len(self.y) / self.batch_size))

    def __getitem__(self, index):
        return self.__data_generation(index)

    def on_epoch_end(self):
        self.indexes = np.arange(len(self.y))
        if self.shuffle:
            np.random.shuffle(self.indexes)
        for error in self.error_range:
            if error != 0:
                preu = np.array([gauss(0, error / 100) for _ in range(self.batch_size * np.max(self.tps) * self.error_species * self.n_runs)])
                self.error_dict[error] = np.reshape(preu, (self.batch_size, np.max(self.tps), self.error_species * self.n_runs))

    def __data_generation(self, index):
        x1 = self.x1[self.batch_size * index: self.batch_size * (index + 1)]
        x2 = self.introduce_error(self.trim_x2(self.x2[self.batch_size * index: self.batch_size * (index + 1)], self.tps, index))
        y = self.y[self.batch_size * index: self.batch_size * (index + 1)]
        x = [x1, x2]
        return x, y

    def trim_x2(self, original_x2, tps, index):
        original_tps = original_x2.shape[1]
        n_tps = self.randomstate.choice(tps)
        idx = np.sort(np.append(self.randomstate.choice(original_tps - 1, n_tps, replace=False) + 1, [0]))
        return original_x2[:, idx].copy()

    def introduce_error(self, x2):
        examples, timepoints, curves = x2.shape
        error = self.randomstate.choice(self.error_range)
        if error != 0:
            x2[:, 1:, self.index_species] = x2[:, 1:, self.index_species] + self.error_dict[error][:, 0:timepoints - 1]
        return x2

def get_top_mechanism_indices(predictions):
    threshold = 0.99
    index = np.argsort(predictions)
    prob = 0
    grouping = []
    for j in index[::-1]:
        prob += predictions[j]
        grouping.append(j)
        if prob >= threshold:
            break
    return grouping

def mechanism_indices_to_names(indices):
    mechanism_list = ['M' + str(i) for i in range(1, 21)]
    grouping_names = [mechanism_list[i] for i in indices]
    return grouping_names

def plot_kinetic_profiles_and_mechanisms(x_list, mechanism_names, columns=3, s=30, A0=1, show=True, overlay=False, labels=None):
    x1, x2 = x_list
    rows = len(x1)
    curves = x2.shape[2] // columns
    if not overlay:
        plt.figure(figsize=(4 * columns, 7 * rows))
    i = 1
    for row in range(rows):
        for column in range(columns):
            plt.subplot(2, columns, i)
            if not overlay:
                if column == 3:
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5))
                else:
                    percentage = ' (' + str(round(x1[row, column] / A0 * 100, 3)) + ' mol%)'
                    title = r'[Cat]$_{0}$ = ' + str(round(x1[row, column], 5)) + percentage
                plt.title(title)
            if type(labels) == list:
                label = labels
            else:
                label = None
            for j in range(1, curves):
                if type(labels) == list:
                    label = labels[j - 1]
                else:
                    label = None
                plt.scatter(x2[row, :, column * curves], x2[row, :, (j) + column * curves], s, label=label)
            if not overlay:
                plt.xlim(left=0, right=max(x2[row, :, column * curves]) * 1.1)
                plt.ylim(bottom=0)
            i += 1
    plt.tight_layout()
    mechs = []
    for i, name in enumerate(mechanism_names):
        mechs.append(img.imread('Images/' + name + '.jpg'))
    length = len(mechs)
    for i, mech in enumerate(mechs):
        plt.subplot(2, length, length + i + 1)
        plt.axis('off')
        plt.imshow(mech)

MODEL_COLUMN_MAP = {
    'M1_20_model_bayes': ['Time', 'S', 'P', 'catT']
}

In [13]:
def train_mechanism_model(
    tps = [3,4,5,6,7,8,9,10,15,20],
    error_range = [0],
    epochs = 3000,
    start = 0,
    batch_size_training = 4092
):
    x1_train, x2_train, y_train, x1_val, x2_val, y_val, x1_test, x2_test, y_test = load_datasets()

    model = build_lstm_classifier(
        input1_shape = x1_train.shape[1:],
        input2_shape = x2_train.shape[1:],
        output_shape = y_train.shape  
    )
    opt = Adam(learning_rate=0.001)
    model.compile(optimizer=opt, loss='categorical_crossentropy', metrics=['categorical_accuracy'])

    early_stopping = EarlyStopping(
        monitor='val_categorical_accuracy',
        verbose=1,
        patience=10,
        mode='max',
        restore_best_weights=True
    )
    checkpoint = ModelCheckpoint(
        "trained_model/best_model_weights",
        monitor='val_categorical_accuracy',
        verbose=1,
        save_best_only=True,
        save_weights_only=True
    )

    model.fit(
        KineticsBatchGenerator([x1_train, x2_train], y_train, tps, error_range, batch_size=batch_size_training),
        validation_data=KineticsBatchGenerator([x1_val, x2_val], y_val, tps, error_range, batch_size=25),
        callbacks=[checkpoint, early_stopping],
        epochs=start+epochs,
        initial_epoch=start
    )

    model.save('trained_model/M1_20_model_bayes')
    print('Training completed. Optimal weights saved to: trained_model/best_model_weights')
    print('Trained model saved to: trained_model/M1_20_model_bayes')
    return model

model = train_mechanism_model()

Epoch 1/3000

KeyboardInterrupt: 