In [None]:
import pandas as pd
from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score, roc_auc_score, roc_curve, confusion_matrix, classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
import matplotlib.pyplot as plt
import pickle
import os
import numpy as np
from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from itertools import cycle
import seaborn as sns
import time
import tensorflow as tf
import signal
from contextlib import contextmanager
import time
import warnings
import tensorflow as tf
from tensorflow.keras import layers, models
ignore_warnings = True  
if ignore_warnings:
    warnings.filterwarnings("ignore")
    
# Attention Layer
class AttentionLayer(layers.Layer):
    def __init__(self, **kwargs):
        super(AttentionLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.W = self.add_weight(name='attention_weight',
                                 shape=(input_shape[-1], 1),
                                 initializer='random_normal',
                                 trainable=True)
        self.b = self.add_weight(name='attention_bias',
                                 shape=(input_shape[1], 1),
                                 initializer='zeros',
                                 trainable=True)
        super(AttentionLayer, self).build(input_shape)

    def call(self, x):
        # e = tanh(xW + b)
        e = tf.tanh(tf.keras.backend.dot(x, self.W) + self.b)
        # a = softmax(e)
        a = tf.keras.backend.softmax(e, axis=1)
        # output = sum(a * x)
        output = tf.keras.backend.sum(a * x, axis=1)
        return output

# Model Architecture
def build_lstm_attention_model(input_shape, output_classes):
    model = models.Sequential()

    # Bi-directional LSTM Layer
    model.add(layers.Bidirectional(layers.LSTM(128, return_sequences=True), input_shape=input_shape))

    # Attention Layer
    model.add(AttentionLayer())

    # Additional layers as per your requirement
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(output_classes, activation='softmax'))  # Assuming 3 triage levels

    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model


class TFNeuralNetwork: 
    def __init__(self, input_shape, output_classes, lstm=False):
        if lstm:
            self.model = build_lstm_attention_model(input_shape, output_classes)
        else:
            self.model = create_tf_neural_network(input_shape, output_classes)

    def fit(self, X_train, y_train, epochs=10, batch_size=32):
        self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size)

    def predict(self, X_test):
        predictions = self.model.predict(X_test)
        return predictions.argmax(axis=1)
    
    def predict_proba(self, X_test):
        return self.model.predict(X_test)

    def score(self, X_test, y_test):
        loss, accuracy = self.model.evaluate(X_test, y_test)
        return accuracy

def create_tf_neural_network(input_shape, output_classes):
    model = tf.keras.models.Sequential([
        tf.keras.layers.InputLayer(input_shape=input_shape),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(output_classes, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])
    return model

class MLModel:
    def __init__(self, model):
        self.model = model
        self.training_time = None

    def train(self, X_train, y_train):
        start_time = time.time()
        self.model.fit(X_train, y_train)
        end_time = time.time()
        self.training_time = end_time - start_time


    def predict(self, X_test):
        self.y_pred = self.model.predict(X_test)
        return self.y_pred

    def evaluate(self, X_test, y_test):
        predictions = self.predict(X_test)
        predict_proba = self.model.predict_proba(X_test)
        
        #true_labels = np.argmax(y_test, axis=1)
        report = classification_report(y_test, predictions, target_names=['Red', 'Yellow','Green'])  # Update target names based on your classes
        acc = accuracy_score(y_test, predictions)
        rec = recall_score(y_test, predictions, average='macro')
        prec = precision_score(y_test, predictions, average='macro')
        f1 = f1_score(y_test, predictions, average='macro')
        auc = roc_auc_score(y_test, predict_proba, multi_class='ovo')
        return {
            'predictions': predictions,
            'predict_proba': predict_proba,
            'report': report,
            'accuracy': acc,
            'recall': rec,
            'precision': prec,
            'f1_score': f1,
            'auc': auc,
            'training_time': self.training_time,
            'model_object':self
        }
    
    
    def plot_confusion_matrix(self, X_test, y_test, experiment_name):
        # Confusion matrix plot logic
        cm = confusion_matrix(y_test, self.y_pred)
        # Plot the confusion matrix
        plt.title('Confusion Matrix')
        #plt.figure(figsize=(10,7))
        sns.heatmap(cm, annot=True, fmt='d')
        classes = ['Red', 'Yellow','Green']
        plt.xticks(np.arange(3), classes, rotation=45)
        plt.yticks(np.arange(3), classes)
        plt.xlabel('Predicted Label')
        plt.ylabel('True Label')
        if not os.path.exists(experiment_name):
            os.makedirs(experiment_name)
        plt.savefig(f"{experiment_name}/confusion_matrix.png")

    def plot_roc(self, X_test, y_test, experiment_name):
        # Binarize the output
        y = label_binarize(y_test, classes=[0, 1, 2])
        n_classes = y.shape[1]

        # Compute ROC curve and ROC area for each class
        y_pred = self.model.predict_proba(X_test)
        fpr = dict()
        tpr = dict()
        roc_auc = dict()
        for i in range(n_classes):
            fpr[i], tpr[i], _ = roc_curve(y[:, i], y_pred[:, i])
            roc_auc[i] = auc(fpr[i], tpr[i])
        classes = ['Red', 'Yellow','Green']
        # Plot all ROC curves
        plt.figure()
        colors = cycle(['aqua', 'darkorange', 'cornflowerblue'])
        for i, color in zip(range(n_classes), colors):
            plt.plot(fpr[i], tpr[i], color=color, lw=2,
                    label='ROC curve of class {0} (area = {1:0.2f})'.format(classes[i], roc_auc[i]))

        plt.plot([0, 1], [0, 1], 'k--', lw=2)
        plt.xlim([-0.05, 1.0])
        plt.ylim([0.0, 1.05])
        plt.xlabel('False Positive Rate (FPR)')
        plt.ylabel('True Positive Rate (TPR)')
        plt.title('Receiver Operating Characteristic (ROC)')
        plt.legend(loc="lower right")
        if not os.path.exists(experiment_name):
            os.makedirs(experiment_name)
        plt.savefig(f"{experiment_name}/roc.png")


    def save_results(self, results, experiment_name):
        if not os.path.exists(experiment_name):
            os.makedirs(experiment_name)
        with open(f"{experiment_name}/results.pkl", 'wb') as file:
            pickle.dump(results, file)

    def feature_importance(self, feature_names, experiment_name):
        if hasattr(self.model, 'feature_importances_'):
            # For models with feature_importances_ attribute (e.g., RandomForest)
            importances = self.model.feature_importances_
        elif hasattr(self.model, 'coef_'):
            # For models with coef_ attribute (e.g., LogisticRegression)
            importances = np.abs(self.model.coef_[0])
        else:
            print("Model does not have feature_importances_ or coef_ attribute")
            return
        feature_names_length = min(15, len(feature_names))
        indices = np.argsort(importances)[::-1][:feature_names_length]
        # Plot the feature importances
        plt.figure(figsize=(12, 6))  # Increase the plot size
        plt.title("Feature Importances")
        plt.bar(range(feature_names_length), importances[indices], color="b", align="center")
        # Rotate feature names for better visibility
        plt.xticks(range(feature_names_length), [feature_names[i] for i in indices], rotation=45, ha="right")
        # Adjust the font size and alignment if necessary
        plt.tick_params(axis='x', which='major', labelsize=9)  # Decrease label font size if needed
        plt.tight_layout()  # Adjust the padding between and around subplots.
        plt.show()
        if not os.path.exists(experiment_name):
            os.makedirs(experiment_name)
        plt.savefig(f"{experiment_name}/feature_importance.png")

def read_data():
    # read data section
    #####
    df_biobert = #***
    df_labels = #***
    df_numerical_vars = #***
    df_categorical_vars = #***
    df_categorical_vars_raw = #***
    df_target = #***
    y_one_hot_labels = #***
    y_int_labels = #***
    return df_biobert, df_labels, df_numerical_vars, df_categorical_vars, df_categorical_vars_raw, df_target, y_one_hot_labels, y_int_labels
df_biobert, df_labels, df_numerical_vars, df_categorical_vars, df_categorical_vars_raw, df_target, y_one_hot_labels, y_int_labels = read_data()

In [None]:
#'numerical', 'categorical', 'text_embeddings', 
#MLModel(RandomForestClassifier())
# 'GradientBoostingClassifier'
# MLModel(GradientBoostingClassifier())
data_count = "whole_data"
text_embedding_methods = ['BoW', 'Word2Vec100', 'Word2Vec768','BioBERT']
feature_set = ['all_features', 'all_features_except_text_embeddings', 'all_features_except_categorical', 'all_features_except_numerical']
model_names = ['LogisticRegression', 'TFNeuralNetwork','RandomForestClassifier', 'XGBClassifier']
for text_embedding_method in text_embedding_methods:
    if text_embedding_method == 'BoW':
        df_biobert =  pd.read_pickle('../pickles/input_data/bow_emb.pkl')
        df_biobert = pd.DataFrame.sparse.from_spmatrix(df_biobert)
    elif text_embedding_method == 'Word2Vec100':
        df_biobert =  pd.read_pickle('../pickles/input_data/w2vec_dim100_emb.pkl')
    elif text_embedding_method == 'Word2Vec768':
        df_biobert =  pd.read_pickle('../pickles/input_data/w2vec_768dim_emb.pkl')
    else:
        df_biobert = pd.read_pickle('../../../code/temp_data_v2/sentence_embeddings/BioBERT-mnli-snli-scinli-scitail-mednli-stsb-format-3.pickle')
    for features in feature_set:
        if features == 'numerical':
            X_train, X_test, y_train, y_test = train_test_split(df_numerical_vars, y_int_labels, test_size=0.2, random_state=42)
        elif features == 'categorical':
            X_train, X_test, y_train, y_test = train_test_split(df_categorical_vars, y_int_labels, test_size=0.2, random_state=42)
        elif features == 'text_embeddings':
            X_train, X_test, y_train, y_test = train_test_split(pd.DataFrame(df_biobert), y_int_labels, test_size=0.2, random_state=42)
        elif features == 'all_features':
            X_train, X_test, y_train, y_test = train_test_split(pd.concat([df_numerical_vars, df_categorical_vars,pd.DataFrame(df_biobert)], axis=1), y_int_labels, test_size=0.2, random_state=42)
        elif features == 'all_features_except_text_embeddings':
            X_train, X_test, y_train, y_test = train_test_split(pd.concat([df_numerical_vars, df_categorical_vars], axis=1), y_int_labels, test_size=0.2, random_state=42)
        elif features == 'all_features_except_categorical':
            X_train, X_test, y_train, y_test = train_test_split(pd.concat([df_numerical_vars,pd.DataFrame(df_biobert)], axis=1), y_int_labels, test_size=0.2, random_state=42)
        elif features == 'all_features_except_numerical':
            X_train, X_test, y_train, y_test = train_test_split(pd.concat([df_categorical_vars,pd.DataFrame(df_biobert)], axis=1), y_int_labels, test_size=0.2, random_state=42)
        else:
            raise ValueError("Invalid feature set")
        X_train.columns = X_train.columns.astype(str)
        X_test.columns = X_test.columns.astype(str)
        X_train = X_train.astype('float32')
        X_test = X_test.astype('float32')
        i = 0
        for model in [MLModel(LogisticRegression()), MLModel(TFNeuralNetwork(X_train.shape[1], 3)), MLModel(RandomForestClassifier()), MLModel(GradientBoostingClassifier())]:
            model.train(X_train, y_train)
            results = model.evaluate(X_test, y_test)
            print(results)
            experiment_name = 'experiment_{3}_{0}_n_{1}_text_embd_{2})'.format(model_names[i],data_count,text_embedding_method,features)
            model.plot_roc(X_test, y_test, "../results/" + experiment_name)
            model.save_results(results, "../results/" + experiment_name)
            model.plot_confusion_matrix(X_test, y_test, "../results/" + experiment_name)
            model.feature_importance(X_train.columns, "../results/" + experiment_name)
            i += 1
    