#Clone the Github

In [1]:
! git clone https://github.com/elmerfer/DataLabLearningSeries.git

Cloning into 'DataLabLearningSeries'...
remote: Enumerating objects: 144, done.[K
remote: Counting objects: 100% (144/144), done.[K
remote: Compressing objects: 100% (140/140), done.[K
remote: Total 144 (delta 58), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (144/144), 1.19 MiB | 8.83 MiB/s, done.
Resolving deltas: 100% (58/58), done.


# Import necessary libraries

In [2]:
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras.utils import plot_model
import pickle
import pandas as pd
import numpy as np
from tabulate import tabulate
import seaborn as sns
from scipy.stats import linregress
from statsmodels.nonparametric.smoothers_lowess import lowess
from sklearn.preprocessing import StandardScaler
from keras.models import Sequential
from keras.layers import Dense, BatchNormalization, Activation, Dropout
from sklearn.model_selection import train_test_split
import os

# Import raw data

The following file path contains the CSV file with the dataset used in this example.

In [3]:
data_path = '/content/DataLabLearningSeries/BatchNormalization layer as Input Layer/Urea.csv'

In [4]:
data = pd.read_csv(data_path)

print(f'The CSV file has {data.shape[0]} samples and {data.shape[1]} features for each sample')
print(f'The CSV file contains the following information for each sample:\n{data.columns}')

The CSV file has 129 samples and 11 features for each sample
The CSV file contains the following information for each sample:
Index(['Upre', 'U120', 'Upos', 'PP', 'UF', 'Ueq', 'Uann', 'centro', 'Req',
       'Uf/pp', 'Ktveq'],
      dtype='object')


We should eliminate rows where Ueq is lower than Upos, as this is biologically incorrect.

In [5]:
rows_to_eliminate = data.loc[data['Ueq'] < data['Upos']]

if not rows_to_eliminate.empty:
    print("In the following samples Ueq is lower than Upos:")
    display(rows_to_eliminate)
else:
    print("Every Ueq value is equal or higher to Upos.")

print(f"\nThere are {len(rows_to_eliminate)} samples where Ueq is lower than Upos.")

In the following samples Ueq is lower than Upos:


Unnamed: 0,Upre,U120,Upos,PP,UF,Ueq,Uann,centro,Req,Uf/pp,Ktveq
8,123,68,51,76.9,3.2,49.0,50.4,1,0.398374,0.041612,1.120819
20,157,92,96,67.3,4.3,77.0,89.1,1,0.490446,0.063893,0.941477
23,122,99,93,88.5,5.5,86.0,87.9,1,0.704918,0.062147,0.513294
29,207,76,55,61.7,2.0,51.0,63.3,1,0.246377,0.032415,1.645721
31,137,84,72,119.0,4.1,63.0,56.0,1,0.459854,0.034454,0.939257
32,142,117,80,85.3,2.2,78.0,83.3,1,0.549296,0.025791,0.719805
43,112,49,38,86.0,3.0,35.0,49.4,1,0.3125,0.034884,1.378013
46,165,90,71,80.0,1.0,66.0,63.9,1,0.4,0.0125,1.034672
50,133,68,49,65.0,2.0,46.0,50.4,1,0.345865,0.030769,1.249944
53,69,28,30,64.5,2.5,20.0,49.3,1,0.289855,0.03876,1.476692



There are 20 samples where Ueq is lower than Upos.


In [6]:
clean_data = data.drop(rows_to_eliminate.index)

if len(clean_data) != len(data):
    print(f'{len(rows_to_eliminate)} rows were eliminated')
    print(f'The new dataframe has {len(clean_data)} rows')
else:
    print('No rows were eliminated')

20 rows were eliminated
The new dataframe has 109 rows


In [7]:
clean_data

Unnamed: 0,Upre,U120,Upos,PP,UF,Ueq,Uann,centro,Req,Uf/pp,Ktveq
0,121,63,47,94.5,2.9,51.0,49.5,1,0.421488,0.030688,1.026871
1,166,87,68,59.4,1.4,71.0,70.5,1,0.427711,0.023569,0.991105
2,196,68,40,61.6,1.9,42.0,51.4,1,0.214286,0.030844,1.805728
3,167,73,43,45.7,2.6,43.0,54.2,1,0.257485,0.056893,1.673126
4,128,64,46,54.8,1.1,46.0,49.9,1,0.359375,0.020073,1.175300
...,...,...,...,...,...,...,...,...,...,...,...
124,196,98,79,79.0,4.0,89.0,75.4,0,0.454082,0.050633,0.996114
125,160,63,35,60.0,2.0,40.0,42.9,0,0.250000,0.033333,1.631594
126,142,77,50,73.0,2.0,58.0,57.5,0,0.408451,0.027397,1.052986
127,181,98,69,86.0,2.0,78.0,69.3,0,0.430939,0.023256,0.981904


Please note that when deleting rows, the indices of each sample no longer match the position of that sample in the dataframe.

The first step is to divide the data into explanatory (X) and response variables (y_ueq).

In [8]:
X = clean_data[['Upre', 'U120', 'Upos', 'PP', 'UF']]

y_ueq = clean_data['Ueq']

# Model Training

After selecting the optimal end-to-end architecture, this model will be trained and evaluated using a 10-fold cross-validation.

The preferred architecture is as follows:

*   1 hidden layer
*   6 nodes in the hidden layer
*   Sigmoid activation function in the hidden layer
*   Linear activation function for the output layer
*   Batch Normalization layer as input layer
*   Batch Normalization layer after hidden layer

This model is denoted as DLNNe2e


In [9]:
def build_model_BN(num_capas_ocultas, num_nodos_por_capa, funcion_activacion, activacion_salida):
    model = Sequential()

    # Añadir capa de Batch Normalization
    model.add(BatchNormalization(input_shape=(X.shape[1],)))

    for i in range(num_capas_ocultas):
        # Añadir capa oculta con el número de nodos especificado y la función de activación correspondiente
        model.add(Dense(num_nodos_por_capa))
        model.add(BatchNormalization())
        model.add(Activation(funcion_activacion))

    model.add(Dense(1, activation=activacion_salida))  # Capa de salida con función de activación lineal o relu
    return model

## Train the model with raw data

In [None]:
# Path to store model checkpoints
checkpoint_path = "/content/DataLabLearningSeries/BatchNormalization layer as Input Layer/DLNNe2e_10fold_model_checkpoints"

# Path to store results
results_path = "/content/DataLabLearningSeries/BatchNormalization layer as Input Layer/DLNNe2e_10fold_model_checkpoints/results.pickle"

# Define optimizer
optimizer = 'SGD'

# Define metric for compile (Mean Squared Error)
metric = 'mean_squared_error'

# Define number of splits
num_splits = 10

# Define empty list to store results
results = []

for i in range(num_splits):

    print(i)

    X_t, X_test, y_t, y_test = train_test_split(X, y_ueq, test_size=0.2)
    X_train, X_val, y_train, y_val = train_test_split(X_t, y_t, test_size=0.2)

    # Normalize data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    X_val_scaled = scaler.transform(X_val)

    DLNNe2e = build_model_BN(1, 6, 'sigmoid','linear')
    DLNNe2e.compile(optimizer=optimizer, loss=metric)

    input_shape = (None, X_train.shape[1])
    DLNNe2e.build(input_shape)

    # Train model with early stopping
    history = DLNNe2e.fit(X_train, y_train, epochs=200, batch_size=32, verbose=0, validation_data=(X_val, y_val))

    # Save model
    DLNNe2e.save(os.path.join(checkpoint_path, f"DLNNe2e_{i}.h5"))

    # Make predictions
    predictions_DLNNe2e = DLNNe2e.predict(X_test).flatten()

    # Calculate errors
    errors_end_lineal = predictions_DLNNe2e - y_test

    weights_first_layer = DLNNe2e.layers[1].get_weights()[0]

    bn_layer = DLNNe2e.layers[0]
    mu = bn_layer.moving_mean.numpy()
    sd = np.sqrt(bn_layer.moving_variance.numpy())
    beta = bn_layer.beta.numpy()
    gamma = bn_layer.gamma.numpy()
    parametres = [mu, sd, beta, gamma]

    # Store relevant information in a dictionary for DLNNe2e model
    result = {}
    result['split'] = {'train_indices': X_train.index.tolist(), 'test_indices': X_test.index.tolist(), 'val_indices': X_val.index.tolist()}
    result['hidden_layer_weights'] = weights_first_layer
    result['scaler_mean'] = scaler.mean_
    result['scaler_std'] = scaler.scale_
    result['bn_layer_params'] = parametres
    result['mean_prediction_error'] = np.mean(errors_end_lineal)
    result['std_prediction_error'] = np.std(errors_end_lineal)
    result['mean_percentage_error'] = np.mean((errors_end_lineal / y_test) * 100)
    result['std_percentage_error'] = np.std((errors_end_lineal / y_test) * 100)
    result['predictions'] = predictions_DLNNe2e
    result['true_value'] = y_test

    # Agregar los diccionarios a la lista de resultados
    results.append({'DLNNe2e': result})

# Guardar los resultados en un archivo pickle
with open(results_path, 'wb') as f:
    pickle.dump(results, f)

## Train the model with normalized data

In [None]:
# Path to store model checkpoints
checkpoint_path = "/content/DataLabLearningSeries/BatchNormalization layer as Input Layer/DLNNe2e_norm_10fold_model_checkpoints"

# Path to store results
results_path = "/content/DataLabLearningSeries/BatchNormalization layer as Input Layer/DLNNe2e_norm_10fold_model_checkpoints/results.pickle"

# Define optimizer
optimizer = 'SGD'

# Define metric for compile (Mean Squared Error)
metric = 'mean_squared_error'

# Define number of splits
num_splits = 10

# Define empty list to store results
results = []

for i in range(num_splits):

    print(i)

    X_t, X_test, y_t, y_test = train_test_split(X, y_ueq, test_size=0.2)
    X_train, X_val, y_train, y_val = train_test_split(X_t, y_t, test_size=0.2)

    # Normalize data
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    X_val_scaled = scaler.transform(X_val)

    DLNNe2e = build_model_BN(1, 6, 'sigmoid','linear')
    DLNNe2e.compile(optimizer=optimizer, loss=metric)

    input_shape = (None, X_train_scaled.shape[1])
    DLNNe2e.build(input_shape)

    # Train model with early stopping
    history = DLNNe2e.fit(X_train_scaled, y_train, epochs=200, batch_size=32, verbose=0, validation_data=(X_val_scaled, y_val))

    # Save model
    DLNNe2e.save(os.path.join(checkpoint_path, f"DLNNe2e_{i}.h5"))

    # Make predictions
    predictions_DLNNe2e = DLNNe2e.predict(X_test_scaled).flatten()

    # Calculate errors
    errors_end_lineal = predictions_DLNNe2e - y_test

    weights_first_layer = DLNNe2e.layers[1].get_weights()[0]

    bn_layer = DLNNe2e.layers[0]
    mu = bn_layer.moving_mean.numpy()
    sd = np.sqrt(bn_layer.moving_variance.numpy())
    beta = bn_layer.beta.numpy()
    gamma = bn_layer.gamma.numpy()
    parametres = [mu, sd, beta, gamma]

    # Store relevant information in a dictionary for DLNNe2e model
    result = {}
    result['split'] = {'train_indices': X_train.index.tolist(), 'test_indices': X_test.index.tolist(), 'val_indices': X_val.index.tolist()}
    result['hidden_layer_weights'] = weights_first_layer
    result['scaler_mean'] = scaler.mean_
    result['scaler_std'] = scaler.scale_
    result['bn_layer_params'] = parametres
    result['mean_prediction_error'] = np.mean(errors_end_lineal)
    result['std_prediction_error'] = np.std(errors_end_lineal)
    result['mean_percentage_error'] = np.mean((errors_end_lineal / y_test) * 100)
    result['std_percentage_error'] = np.std((errors_end_lineal / y_test) * 100)
    result['predictions'] = predictions_DLNNe2e
    result['true_value'] = y_test

    # Agregar los diccionarios a la lista de resultados
    results.append({'DLNNe2e': result})

# Guardar los resultados en un archivo pickle
with open(results_path, 'wb') as f:
    pickle.dump(results, f)