In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt

In [2]:
def prepare_data(input_file_path, sol):

    df = pd.read_csv(input_file_path, low_memory=False)
    columns_for_X = [f'{sol}_seq_A2', f'{sol}_seq_A3',f'{sol}_seq_A4',f'{sol}_seq_A5',f'{sol}_seq_A6',
                    f'{sol}_seq_C2', f'{sol}_seq_C3',f'{sol}_seq_C4',f'{sol}_seq_C5',f'{sol}_seq_C6',
                    f'{sol}_seq_G2', f'{sol}_seq_G3',f'{sol}_seq_G4',f'{sol}_seq_G5',f'{sol}_seq_G6',
                    f'{sol}_seq_T2', f'{sol}_seq_T3',f'{sol}_seq_T4',f'{sol}_seq_T5',f'{sol}_seq_T6']

    X = df[columns_for_X]
    X = X.dropna(axis=0, how='any')

    y = df[f'{sol}_FRET']

    new_feature_name = ['A1','A2','A3','A4','A5','C1','C2','C3','C4','C5','G1','G2','G3','G4','G5','T1','T2','T3','T4','T5']
    X.columns = new_feature_name

    return X, y


def prepare_data_3dots(input_file_path, sol):

    df = pd.read_csv(input_file_path, low_memory=False)
    columns_to_drop = ['N5_seq','N5_FRET','N50_seq','N50_FRET','N500_seq','N500_FRET','N5M10_seq','N5M10_FRET','N5M100_seq','N5M100_FRET']

    X = df.drop(columns=columns_to_drop)
    X = X.dropna(axis=0, how='any')
    X = X.astype(np.int32)
    
    y = df[f'{sol}_FRET']
    y = y.dropna()

    return X, y


def compile_model_MLP(activation='sigmoid', optimizer='adam', loss='mae'):

    model = Sequential()
    model.add(Dense(32, activation=activation))
    model.add(Dense(1))

    model.compile(optimizer=optimizer, loss=loss)

    return model


def train_model(model, X_train, y_train, val_size, batch_size=64, epochs=30):

    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_size, random_state=42)
    history = model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(X_val, y_val), verbose = 1)

    fig = plt.figure()

    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'batch_size:{batch_size} & epochs:{epochs}')
    plt.legend()
    plt.show()

    return model, fig


def test_model(model, X_test, y_test):

    y_pred = model.predict(X_test)

    y_pred = y_pred.flatten()
    y_diff = y_test - y_pred

    y_abs = np.abs(y_diff)

    fig = plt.figure()

    plt.plot(y_abs, label='|test-pred|')
    plt.legend()

    test_loss = mean_absolute_error(y_test, y_pred)

    print(f"test_loss:{test_loss:.4f}")
    return fig


def baseline_error(y_train, y_test):
    baseline_value = np.mean(y_train)
    baseline_predictions = np.full_like(y_test, baseline_value)
    
    baseline_mae = mean_absolute_error(y_test, baseline_predictions)
    baseline_rmse = np.sqrt(mean_squared_error(y_test, baseline_predictions))
    baseline_mse = mean_squared_error(y_test, baseline_predictions)
    return baseline_mae, baseline_rmse, baseline_mse

In [None]:
solution = ['N5', 'N50', 'N500', 'N5M10', 'N5M100']

output_directory_path = f'C:\\Users\\chw10\\2024_BNEM\\aug_08\\result\\mlp'

epochs = 300
batch_size = 128

for sol in solution:
    
    if sol in ['N5', 'N50']:
        continue 
    
    input_file_path = f'C:\\Users\\chw10\\2024BNEM\\data\\{sol}_v2.csv'
    output_file_path_model = f'{output_directory_path}\\model\\{sol}_ep_{epochs}_bs_{batch_size}.keras'
    output_file_path_fig_train=f'{output_directory_path}\\fig_train\\{sol}_ep_{epochs}_bs_{batch_size}_train.png'
    output_file_path_fig_test=f'{output_directory_path}\\fig_test\\{sol}_ep_{epochs}_bs_{batch_size}_test.png'

    # prepare X, y
    X, y = prepare_data(input_file_path, sol)
    
    # Split into train and test dataset
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    baseline_mae, baseline_rmse, baseline_mse = baseline_error(y_train, y_test)

    print(f'{sol}\nbaseline_mae:{baseline_mae:.4f}\nbaseline_rmse:{baseline_rmse:.4f}\nbaseline_mse:{baseline_mse:.4f}\n')
    
    # complie model
    model_MLP = compile_model_MLP()

    # train
    print(f'{sol}')
    model_MLP, fig_train = train_model(model_MLP, X_train, y_train, val_size=0.15, batch_size=batch_size, epochs=epochs)

    # test
    fig_test = test_model(model_MLP, X_test, y_test)

    # save model and fig
    model_MLP.save(output_file_path_model)
    fig_train.savefig(output_file_path_fig_train)
    fig_test.savefig(output_file_path_fig_test)

In [20]:
model_MLP.summary()

Model: "sequential_13"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_26 (Dense)            (None, 32)                5152      
                                                                 
 dense_27 (Dense)            (None, 1)                 33        
                                                                 
Total params: 5185 (20.25 KB)
Trainable params: 5185 (20.25 KB)
Non-trainable params: 0 (0.00 Byte)
_________________________________________________________________
