In [None]:
! pip install keras-tuner --upgrade -q

In [None]:
import os, math, time, random, datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
print(__doc__)

import tensorflow as tf
print(tf.__version__)

# from tensorflow.python.keras.models import Sequential, load_model
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, GRU, Input, LSTM, Conv1D, MaxPool1D, BatchNormalization, Flatten, Dropout
from tensorflow.keras.optimizers import Adam, RMSprop, SGD
from keras_tuner.tuners import RandomSearch, Hyperband
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from tensorflow.keras import regularizers
import keras_tuner as kt

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder, MinMaxScaler

from sklearn.model_selection import cross_val_score, cross_val_predict, StratifiedKFold
from sklearn.metrics import roc_auc_score, roc_curve, auc, confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import accuracy_score, matthews_corrcoef, classification_report, f1_score, precision_recall_curve, average_precision_score

from sklearn.feature_selection import chi2
import itertools
from mlxtend.plotting import plot_decision_regions
import joblib

### Utility functions

In [None]:
def calculate_performace(test_num, y_prob, y_test):

    y_pred = (y_prob >= 0.5).astype(int)
    # Metrics
    _f1 = f1_score(y_test, y_pred)
    _acc = accuracy_score(y_test, y_pred)
    _auc = roc_auc_score(y_test, y_prob)
    _mcc = matthews_corrcoef(y_test, y_pred)
    
    _cm = confusion_matrix(y_test, y_pred)
    TN, FP, FN, TP = _cm.ravel()
    
    # Calculate Sensitivity
    sens = TP / (TP + FN)
    
    # Calculate Specificity
    spec = TN / (TN + FP)

    acc = _acc #float(tp + tn) / test_num
    precision = float(TP) / (TP + FP)
    sensitivity = sens # float(tp) / (tp + fn)
    specificity = spec # float(tn) / (tn + fp)
    MCC = _mcc # (float(tp) * tn - fp * fn) / math.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
    return acc, precision, sensitivity, specificity, MCC

def plot_confusion_matrix(cm, classes,
                        normalize=True,
                        title='Confusion matrix',
                        cmap=plt.cm.Blues, save_path=None):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)
    xmin, xmax = plt.xlim()  # return the current xlim
    plt.xlim((xmin, xmax))  # set the xlim to xmin, xmax
    plt.ylim(xmin, xmax)  # set the xlim to xmin, xmax

    if normalize:
        cm_norm = np.round((cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]*100),2)
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, f"{cm[i, j]}\n{cm_norm[i, j]}%",
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    if save_path != None:
        plt.savefig(save_path)
    plt.show()

def plot_performance(train_loss,val_loss,train_acc,val_acc):
    # plot train and validation loss across multiple runs
    plt.plot(train_loss, color='blue', label='train')
    plt.plot(val_loss, color='orange', label='validation')
    plt.title('model train vs validation loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.show()

    plt.plot(train_acc, color='blue', label='train')
    plt.plot(val_acc, color='orange', label='validation')
    plt.title('model train vs validation loss')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper right')
    plt.show()
def transfer_label_from_prob(proba):
    label = [1 if val >= 0.5 else 0 for val in proba]
    return label
def plot_roc_curve(labels, probality, legend_text, auc_tag=True, save_path=None):
    # fpr2, tpr2, thresholds = roc_curve(labels, pred_y)
    fpr, tpr, thresholds = roc_curve(labels, probality)  # probas_[:, 1])
    roc_auc = auc(fpr, tpr)
    if auc_tag:
        rects1 = plt.plot(fpr, tpr, label=legend_text + ' (AUC=%6.3f) ' % roc_auc)
    else:
        rects1 = plt.plot(fpr, tpr, label=legend_text)
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Area under receiver operating characteristic curve')
    plt.legend(loc="lower right")
    plt.grid()
    if save_path != None:
        plt.savefig(save_path)
    plt.show()
    return roc_auc

In [None]:
str1='250904'

In [None]:
# Get splitted data for deep learning models models
def getSplitDataSet(X, y, ratio=0.2):
    
    scaler = StandardScaler()
    scaler.fit(X)
    X = scaler.transform(X)

    #split data into training and test data. 
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=ratio, random_state=245)

    return X_train, X_test, y_train, y_test, scaler

### #########################
### input feature set ###
### #########################

In [None]:
url_dataset ='features\\train_ibce\\PSTPP_train.csv' 
df_main = pd.read_csv(url_dataset, header=0)

In [None]:
y = df_main.iloc[:,:1].values
X = df_main.iloc[:,1:].values

encoder = LabelEncoder()
labels = y = encoder.fit_transform(y.ravel())


# Split the data into training and testing sets
X_train, X_test, y_train, y_test, _scaler = getSplitDataSet(X, y, ratio=0.2)
joblib.dump(_scaler, f"{str1}_standard_scalsr.pkl")

fea_dim = X_train.shape[1]
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)

# Dataset reshaping according to models requirements

In [None]:
# Reshaping for CNN models
XTrainCNN =np.array(X_train).reshape(-1, fea_dim, 1)
XTestCNN = np.array(X_test).reshape(-1, fea_dim, 1)
print("XTrainCNN Shape",XTrainCNN.shape)

# Reshaping for RNN models
XTrainRNN =np.array(X_train).reshape(-1, 1, fea_dim)
XTestRNN = np.array(X_test).reshape(-1, 1, fea_dim)
print("XTrainRNN Shape",XTrainRNN.shape)

### Class weights calculation to handle imbalicing

In [None]:
from sklearn.utils import class_weight
cw = class_weight.compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
cw_dict = {0: cw[0], 1: cw[1]}
print(cw_dict)

In [None]:
input_shape=X_train.shape[1:]
checkpoint_filepath = f'{str1}_FCNN.h5'
callbacks = [
                tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", verbose=1, mode="max", patience=100, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", mode="max", factor=0.5, patience=8, min_lr=1e-6, verbose=1),
                # tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor="val_accuracy", mode="max", save_best_only=True)
            ]
def build_dnn_model(hp):
    model = Sequential()

    # Model Input
    model.add(tf.keras.layers.Input(shape=input_shape))

    # Number of layers the dense and its l2 kernal_regularizer, the activation constant to 'relu', and dropout
    for i in range(hp.Int("n_layers", 1, 2)):
        model.add(Dense(units=hp.Int(f'dense_{i}_units', min_value=32, max_value=256, step=32), 
                        activation='relu', kernel_regularizer = regularizers.l2(hp.Choice(f'{i}_l2Reg', [1e-2, 1e-3, 1e-4]))))
        model.add(Dropout(rate=hp.Choice(f'dropout_rate{i}', [0.2, 0.3, 0.4])))
    
    model.add(Dense(1, activation='sigmoid'))
    
    # Optimizer selection
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd']) 
    learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    if optimizer_choice == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate)

    # Compile model with tunable loss
    loss_choice = hp.Choice('loss', ['binary_crossentropy'])
    model.compile(optimizer=optimizer, loss=loss_choice, metrics=['accuracy', 'mae', 'AUC'])

    
    return model

In [None]:
tuner = kt.RandomSearch(
    build_dnn_model,
    objective='val_accuracy',
    max_trials=100,  # Number of hyperparameter combinations to try
    executions_per_trial=1,  # How many models to build and fit for each trial
    directory='dlm-optim-train-models/dlm',
    project_name='dnn_tuning',
    overwrite=True
)

tuner.search(X_train, y_train,
                 epochs=200,
                 validation_data=(X_test, y_test),
                 batch_size=32,
                 callbacks=callbacks,
                 verbose=0,  # <- controls output verbosity
                 class_weight=cw_dict)

tuner.reload()
best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best Hyperparameters:")
for param in best_hps.values:
    print(f"{param}: {best_hps.get(param)}")

model = tuner.get_best_models(num_models=1)[0]
model.save(checkpoint_filepath)

In [None]:
# Build model function for tuner (SimpleRNN)
input_shape = XTrainRNN.shape[1:] # X_train.shape[1] #
checkpoint_filepath = f'{str1}_RNN.h5'
callbacks = [
                tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", verbose=1, mode="max", patience=20, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", mode="max", factor=0.5, patience=8, min_lr=1e-6, verbose=1),
                # tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor="val_accuracy", mode="max", save_best_only=True)
            ]
def build_simple_rnn_model(hp):
    from tensorflow.keras import regularizers
    from tensorflow.keras.optimizers import Adam, RMSprop, SGD
    from tensorflow.keras import Sequential
    import tensorflow as tf
    from tensorflow.keras.layers import SimpleRNN, Dense, Dropout

    model = Sequential()
    # SimpleRNN block
    model.add(tf.keras.layers.Input(shape=input_shape))
    for i in range(hp.Int("n_layers", 1, 2)):
        model.add(SimpleRNN(
            units=hp.Int(f'rnn_{i}_units', 32, 256, step=32),
            activation='relu',

            kernel_regularizer = regularizers.l2(hp.Choice(f'{i}_l2Reg', [1e-2, 1e-3, 1e-4])),
            return_sequences=True  # last output only, so no Flatten needed
        ))
        
        model.add(Dropout(rate=hp.Choice(f'dropout_rate{i}', [0.2, 0.3, 0.4])))        
    
    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_units', 32, 256, step=32), activation='relu', kernel_regularizer = regularizers.l2(hp.Choice(f'dense_l2Reg', [1e-2, 1e-3, 1e-4]))))
    model.add(Dropout(hp.Float('dense_dropout', 0.2, 0.5, step=0.1)))
    # Output
    model.add(Dense(1, activation='sigmoid'))

    # Optimizer + LR
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd'])
    learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    if optimizer_choice == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate)

    # Loss
    loss_choice = hp.Choice('loss', ['binary_crossentropy'])

    model.compile(optimizer=optimizer, loss=loss_choice, metrics=['accuracy', 'mae', 'AUC'])
    return model


In [None]:
tuner = kt.RandomSearch(
    build_simple_rnn_model,
    objective='val_accuracy',
    max_trials=100,  # Number of hyperparameter combinations to try
    executions_per_trial=1,  # How many models to build and fit for each trial
    directory='dlm-optim-train-models/dlm',
    project_name='rnn_tuning',
    overwrite=True
)

tuner.search(XTrainRNN, y_train,
                 epochs=200,
                 validation_data=(XTestRNN, y_test),
                 batch_size=32,
                 callbacks=callbacks,
                 verbose=0,  # <- controls output verbosity
                 class_weight=cw_dict)

tuner.reload()
best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best Hyperparameters:")
for param in best_hps.values:
    print(f"{param}: {best_hps.get(param)}")
    
model = tuner.get_best_models(num_models=1)[0]
model.save(checkpoint_filepath)

In [None]:
# Build model function for tuner
input_shape=XTrainRNN.shape[1:]
checkpoint_filepath = f'{str1}_GRU.h5'
callbacks = [
                tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", verbose=1, mode="max", patience=20, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", mode="max", factor=0.5, patience=8, min_lr=1e-6, verbose=1),
                # tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor="val_accuracy", mode="max", save_best_only=True)
            ]
def build_gru_model(hp):
    model = Sequential()

    model.add(tf.keras.layers.Input(shape=input_shape))

    for i in range(hp.Int("n_layers", 1, 2)):
        model.add(GRU(units=hp.Int(f'gru_{i}_units', 32, 256, step=32),
                  activation='relu',
                  recurrent_activation='relu',
                  # dropout=hp.Float(f'gru_{i}_dropout', 0.1, 0.5, step=0.1),
                  kernel_regularizer = regularizers.l2(hp.Choice(f'{i}_l2Reg', [1e-2, 1e-3, 1e-4])),
                    return_sequences=True  # last output only, so no Flatten needed
                  ))
        
        model.add(Dropout(rate=hp.Choice(f'dropout_rate{i}', [0.2, 0.3, 0.4]))) # hp.Float(f'dropout_{i}_rate', min_value=0.2, max_value=0.4, step=0.1)


    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_units', 32, 256, step=32), activation='relu', kernel_regularizer = regularizers.l2(hp.Choice(f'dense_l2Reg', [1e-2, 1e-3, 1e-4]))))
    model.add(Dropout(hp.Float('dense_dropout', 0.2, 0.5, step=0.1)))
    model.add(Dense(1, activation='sigmoid'))

    # Optimizer and learning rate
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd'])
    learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    if optimizer_choice == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate)

    # Loss function
    loss_choice = hp.Choice('loss', ['binary_crossentropy'])

    model.compile(optimizer=optimizer,
                  loss=loss_choice,
                  metrics=['accuracy', 'mae', 'AUC'])
    return model

In [None]:
tuner = kt.RandomSearch(
    build_gru_model,
    objective='val_accuracy',
    max_trials=100,  # Number of hyperparameter combinations to try
    executions_per_trial=1,  # How many models to build and fit for each trial
    directory='dlm-optim-train-models/dlm',
    project_name='gru_tuning',
    overwrite=True
)

tuner.search(XTrainRNN, y_train,
                 epochs=200,
                 validation_data=(XTestRNN, y_test),
                 batch_size=32,
                 callbacks=callbacks,
                 verbose=0,  # <- controls output verbosity
                 class_weight=cw_dict)

tuner.reload()
best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best Hyperparameters:")
for param in best_hps.values:
    print(f"{param}: {best_hps.get(param)}")

model = tuner.get_best_models(num_models=1)[0]
model.save(checkpoint_filepath)

In [None]:
input_shape=XTrainRNN.shape[1:]
checkpoint_filepath = f'{str1}_LSTM.h5'
callbacks = [
                tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", verbose=1, mode="max", patience=20, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", mode="max", factor=0.5, patience=8, min_lr=1e-6, verbose=1),
                # tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor="val_accuracy", mode="max", save_best_only=True)
            ]
def build_lstm_model(hp):
    model = Sequential()
    kernel_regularizer = regularizers.l2(hp.Choice('l2_reg', [1e-2, 1e-3, 1e-4]))
    # First LSTM layer
    model.add(tf.keras.layers.Input(shape=input_shape))

    for i in range(hp.Int("n_layers", 1, 2)):
        model.add(LSTM(units=hp.Int(f'lstm_{i}_units', 32, 256, step=32),
                  activation='relu',
                  recurrent_activation='relu',
                  # dropout=hp.Float(f'gru_{i}_dropout', 0.1, 0.5, step=0.1),
                  kernel_regularizer = regularizers.l2(hp.Choice(f'{i}_l2Reg', [1e-2, 1e-3, 1e-4])),
                    return_sequences=True  # last output only, so no Flatten needed
                  ))
        
        model.add(Dropout(rate=hp.Choice(f'dropout_rate{i}', [0.2, 0.3, 0.4]))) 


    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_units', 32, 256, step=32), activation='relu', kernel_regularizer = regularizers.l2(hp.Choice(f'dense_l2Reg', [1e-2, 1e-3, 1e-4]))))
    model.add(Dropout(hp.Float('dense_dropout', 0.2, 0.5, step=0.1)))
    model.add(Dense(1, activation='sigmoid'))

    # Optimizer and learning rate
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd'])
    learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    if optimizer_choice == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate)

    # Loss function
    loss_choice = hp.Choice('loss', ['binary_crossentropy'])

    model.compile(optimizer=optimizer, loss=loss_choice, metrics=['accuracy', 'mae', 'AUC'])
    return model

In [None]:
tuner = kt.RandomSearch(
    build_lstm_model,
    objective='val_accuracy',
    max_trials=100,  # Number of hyperparameter combinations to try
    executions_per_trial=1,  # How many models to build and fit for each trial
    directory='dlm-optim-train-models/dlm',
    project_name='lstm_tuning',
    overwrite=True
)

tuner.search(XTrainRNN, y_train,
                 epochs=200,
                 validation_data=(XTestRNN, y_test),
                 batch_size=32,
                 callbacks=callbacks,
                 verbose=0,  # <- controls output verbosity
                 class_weight=cw_dict)
tuner.reload()
best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best Hyperparameters:")
for param in best_hps.values:
    print(f"{param}: {best_hps.get(param)}")

model = tuner.get_best_models(num_models=1)[0]
model.save(checkpoint_filepath)

In [None]:
input_shape= XTrainCNN.shape[1:] #[fea_dim, 1]#
checkpoint_filepath = f'{str1}_CNN.h5'
callbacks = [
                tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", verbose=1, mode="max", patience=20, restore_best_weights=True),
                tf.keras.callbacks.ReduceLROnPlateau(monitor="val_accuracy", mode="max", factor=0.5, patience=8, min_lr=1e-6, verbose=1),
                # tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_filepath, verbose=1, monitor="val_accuracy", mode="max", save_best_only=True)
            ]
def build_cnn_model(hp):
    model = Sequential()

    model.add(tf.keras.layers.Input(shape=input_shape))
    for i in range(hp.Int("n_layers", 1, 2)):
        model.add(Conv1D(filters=hp.Int(f'filters_{i}', 8, 64, step=4), 
                     kernel_size=hp.Int(f'kernel_size_{i}', 2, 8, step=1), 
                     activation='relu', padding='same',
                     kernel_regularizer = regularizers.l2(hp.Choice(f'{i}_l2Reg', [1e-2, 1e-3, 1e-4])), 
                     ))
        model.add(MaxPool1D(pool_size=2))
        model.add(BatchNormalization())
        
        model.add(Dropout(rate=hp.Choice(f'dropout_rate{i}', [0.2, 0.3, 0.4]))) 

    model.add(Flatten())
    model.add(Dense(units=hp.Int('dense_units', 32, 256, step=32), activation='relu', kernel_regularizer = regularizers.l2(hp.Choice(f'dense_l2Reg', [1e-2, 1e-3, 1e-4]))))
    model.add(Dropout(hp.Float('dense_dropout', 0.2, 0.5, step=0.1)))
    model.add(Dense(1, activation='sigmoid'))

    # Optimizer selection
    optimizer_choice = hp.Choice('optimizer', ['adam', 'rmsprop', 'sgd'])
    learning_rate = hp.Choice('learning_rate', [1e-2, 1e-3, 1e-4])
    if optimizer_choice == 'adam':
        optimizer = Adam(learning_rate=learning_rate)
    elif optimizer_choice == 'rmsprop':
        optimizer = RMSprop(learning_rate=learning_rate)
    else:
        optimizer = SGD(learning_rate=learning_rate)

    # Compile model with tunable loss
    loss_choice = hp.Choice('loss', ['binary_crossentropy'])
    model.compile(optimizer=optimizer, loss=loss_choice, metrics=['accuracy', 'mae', 'AUC'])
    return model

In [None]:
tuner = kt.RandomSearch(
    build_cnn_model,
    objective='val_accuracy',
    max_trials=100,  # Number of hyperparameter combinations to try
    executions_per_trial=1,  # How many models to build and fit for each trial
    directory='dlm-optim-train-models/dlm',
    project_name='cnn_tuning',
    overwrite=True
)

tuner.search(XTrainCNN, y_train,
                 epochs=200,
                 validation_data=(XTestCNN, y_test),
                 batch_size=32,
                 callbacks=callbacks,
                 verbose=1,  # <- controls output verbosity
                 class_weight=cw_dict)
tuner.reload()
best_hps = tuner.get_best_hyperparameters(1)[0]
print("Best Hyperparameters:")
for param in best_hps.values:
    print(f"{param}: {best_hps.get(param)}")

model = tuner.get_best_models(num_models=1)[0]
model.save(checkpoint_filepath)

In [None]:
defined_path = 'dlm-optim-train-models/dlm/'
def predict_pipeline(
    fasta_list, models_list, combine_feature=False, output_dir="results"
):
    if not os.path.exists(output_dir):
        print(f"Creating output directory: {output_dir}")
        os.makedirs(output_dir)
    results = []  ### rename to results_train
    model_preds = []

    # Load Scalar to Scale features
    scaler_path = f"{defined_path}minmax_scaler.pkl"
    minmax_scaler = joblib.load(scaler_path)
    print("Scaler loaded")
    
    for fasta in fasta_list:

        print(fasta)
        file_name = fasta.split("/")[-2] #os.path.splitext(fasta).split("/")[-2]
        print(file_name)
        # Define file paths for training and independent feature files
        
        # url_dataset ="features/ind_ibce/PSTPP_ind.csv" #"lbtope_bcell_variable/new_lbtope_ind/ind-ibce50_1_pstpp.csv" #"features/ind_abcpred/PSTPP_abcpred.csv" # "lbtope_bcell_variable/new_lbtope_ind/abcpred_f50_pstpp.csv" # 'lbtope_bcell_variable/lbtope_all_pstpp_new.csv' #'/kaggle/input/bcell-lbtope-features/lbtope_all_PSTPP.csv' # 'Benchmark Dataset\PSCV_all_new.csv'#'hm5c_All_XGBModel.csv' # '_PSCV_Features.csv' # 'Benchmark Dataset\PSCV_desc_new.csv' #
        fv_set = pd.read_csv(fasta, header=0)
        print(f"\n=== Processing: {file_name} having shape {fv_set.shape} ===")

        for model_name in models_list:
            print(f"Training {model_name} on {file_name}...")
            # print(fv_set.head(10))
            X_test, y_test = fv_set.drop(columns=["label"]), fv_set["label"]

            if X_test.isnull().values.any():
                count_nan = X_test.isnull().sum().sum()
                print(
                    f"X_test contains {count_nan} NaN values. Filling NaN values with 0."
                )
                X_test = X_test.fillna(0)

            # Ensure labels are integers
            y_test = y_test.astype(int)

            # Label encode the labels
            y_test = LabelEncoder().fit_transform(y_test)
                
            X_test = minmax_scaler.transform(X_test)

            if model_name.upper() == "CNN":
                X_test = X_test.reshape(-1, X_test.shape[1], 1)
            elif model_name.upper() in ["RNN", "LSTM", "GRU"]:
                X_test = X_test.reshape(-1, 1, X_test.shape[1])

            # Print shapes
            print(
                f"Validation set shape: {X_test.shape}, Labels shape: {y_test.shape}"
                )

            # Load model model = tf.keras.models.load_model(f'{str1}_FCNN.h5')
            file_path = f"{defined_path}{str1}_{model_name}.h5"
            model = tf.keras.models.load_model(file_path)
            print(f"{file_path} Model Loaded")

            # Predict
            y_pred_prob = model.predict(X_test).ravel()
            y_pred = (y_pred_prob >= 0.5).astype(int)
            # Metrics
            f1 = f1_score(y_test, y_pred)
            acc = accuracy_score(
                    y_test,
                    y_pred,
                )
            _auc = roc_auc_score(y_test, y_pred_prob)
            mAP = average_precision_score(y_test, y_pred_prob)
            mcc = matthews_corrcoef(y_test, y_pred)

            cm = confusion_matrix(y_test, y_pred)
            # import matplotlib as mpl

            # cmap = mpl.colormaps["jet"]
            # cm_path = os.path.join(output_dir, f"{model_name}_CM_Internal Independent Set.svg")
            if file_name.lower().__contains__("ibce"):
                cm_title = (
                        f"{model_name} Confusion Matrix on the Internal Set"
                    )
                cm_path = os.path.join(
                        output_dir, f"{model_name}_CM_Internal Set.svg"
                    )
            if file_name.lower().__contains__("clbe"):
                cm_title = (
                        f"{model_name} Confusion Matrix on the External Set I"
                )
                cm_path = os.path.join(
                        output_dir, f"{model_name}_CM_External Set I.svg"
                    )
            if file_name.lower().__contains__("abcpred"):
                cm_title = (
                        f"{model_name} Confusion Matrix on the External Set II"
                    )
                cm_path = os.path.join(
                        output_dir, f"{model_name}_CM_External Set II.svg"
                    )
            plot_confusion_matrix(
                    cm,
                    classes=["nBCE", "pBCE"],
                    title=cm_title,
                    # cmap=cmap,
                    normalize=True,
                    save_path=cm_path,
                )
            # plot_confusion_matrix(cm, classes=[0, 1], title=f"{model_name}_{file_name}", normalize=True, output_dir=output_dir)
            TN, FP, FN, TP = cm.ravel()

            # Calculate Sensitivity
            sens = TP / (TP + FN)

            # Calculate Specificity
            spec = TN / (TN + FP)

            # Save results
            model_preds.append(
                pd.DataFrame(
                    {
                        "Model": model_name,
                        "Dataset": file_name,
                        "orig_idx": fv_set.index,  # position in original X_all
                        "y_true": y_test.astype(int).ravel(),
                        "proba": y_pred_prob.astype(float).ravel(),
                        "y_pred": y_pred.astype(int).ravel(),
                    }
                )
            )

            # Save results
            results.append(
                {
                    # "Feature_Set": feat_name,
                    "Model": model_name,
                    "Dataset": file_name,
                    "Accuracy": acc,
                    "F1_Score": f1,
                    "Sensitivity": sens,
                    "Specificity": spec,
                    "mAP": mAP,
                    "AuROC": _auc,
                    "MCC": mcc,
                }
            )


    # # Save results CSV
    results_df = pd.DataFrame(results)
    results_csv_path = os.path.join(output_dir, f"{file_name}_ind_results.csv")
    results_df.to_csv(results_csv_path, index=False)

    # Save results CSV
    model_preds_df = pd.concat(
        model_preds, ignore_index=True
    )  # pd.DataFrame(model_preds)
    results_csv_path = os.path.join(output_dir, f"{file_name}_prediction.csv")
    model_preds_df.to_csv(results_csv_path, index=False)

    print(f"\nResults saved to {results_csv_path}")
    return results_df

In [None]:
ind_fasta_list = [
        "/kaggle/input/lbtope-ibce-final-dataset-features/features/ind_ibce/PSTPP_ind.csv",
        "/kaggle/input/lbtope-ibce-final-dataset-features/features/ind_clbe/PSTPP_ind.csv",
        "/kaggle/input/lbtope-ibce-final-dataset-features/features/ind_abcpred/PSTPP_ind.csv",
    ]  
models_list = ["CNN", "FCNN", "GRU", "RNN", "LSTM"]

predict_results_df = predict_pipeline(
            ind_fasta_list,
            models_list,
            combine_feature=True,
            output_dir="results/",
        )