# A Machine Learning Framework for Stroke Prediction: Balancing Precision and Recall in Healthcare Analytics (Worker Notebook)

## Importing the libraries and loading the dataset

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import SMOTE, ADASYN, KMeansSMOTE, RandomOverSampler
from imblearn.under_sampling import NearMiss, RandomUnderSampler
from sklearn.naive_bayes import GaussianNB
from imblearn.combine import SMOTETomek
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from imblearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import MinMaxScaler,StandardScaler
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, precision_recall_curve, auc, roc_curve, matthews_corrcoef
from sklearn.metrics import roc_auc_score
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import mutual_info_classif, chi2
import scrapbook
import joblib
import seaborn as sns
import math
from sklearn.model_selection import GridSearchCV
import tensorflow as tf
import random

In [None]:
seed = 42
np.random.seed(seed)
random.seed(seed)
tf.random.set_seed(seed)

In [None]:
dataset_train = pd.read_csv('dataset_train.csv')
dataset_val = pd.read_csv('dataset_val.csv')
dataset_test = pd.read_csv('dataset_test.csv')

dataset_train

## Configurations

In [None]:
imbalanced_action = 2
use_PCA = False
normalize = True
scale = True
feature_binning = False
k_features = 5
feature_selection_mode = 1
threshold = 0.5
model_type = 10
loss_type = 2
learning_rate = 0.001
model_file_path = '/slaves/models/model_x.pkl'
pca_file_path = '/slaves/models/pca_x.pkl'
scaler_file_path = '/slaves/models/scaler_x.pkl'
normalizer_file_path = '/slaves/models/normalizer_x.pkl'
encoder_file_path = '/slaves/models/encoder_x.pkl'


## Feature Binning

In [None]:
if feature_binning:
    age_bins = [0, 30, 45, 60, 80, np.inf]
    age_labels = ['Under30', '30-45', '45-60', '60-80', 'Over80']

    dataset_train['age_bin'] = pd.cut(dataset_train['age'], bins=age_bins, labels=age_labels)
    dataset_val['age_bin'] = pd.cut(dataset_val['age'], bins=age_bins, labels=age_labels)
    dataset_test['age_bin'] = pd.cut(dataset_test['age'], bins=age_bins, labels=age_labels)

    if 'bmi' in dataset_train.columns:
        bmi_bins = [0, 18.5, 25, 30, np.inf]
        bmi_labels = ['Underweight', 'Normal', 'Overweight', 'Obese']

        dataset_train['bmi_bin'] = pd.cut(dataset_train['bmi'], bins=bmi_bins, labels=bmi_labels)
        dataset_val['bmi_bin'] = pd.cut(dataset_val['bmi'], bins=bmi_bins, labels=bmi_labels)
        dataset_test['bmi_bin'] = pd.cut(dataset_test['bmi'], bins=bmi_bins, labels=bmi_labels)
        
    glucose_bins = [0, 70, 99, 125, np.inf]
    glucose_labels = ['Low', 'Normal', 'Prediabetic', 'Diabetic']
    dataset_train['glucose_bin'] = pd.cut(dataset_train['avg_glucose_level'], 
                                    bins=glucose_bins, 
                                    labels=glucose_labels)
    dataset_val['glucose_bin'] = pd.cut(dataset_val['avg_glucose_level'],
                                    bins=glucose_bins, 
                                    labels=glucose_labels)
    dataset_test['glucose_bin'] = pd.cut(dataset_test['avg_glucose_level'],
                                    bins=glucose_bins, 
                                    labels=glucose_labels)

In [None]:
if feature_binning:
    categorical_cols = ['age_bin', 'bmi_bin', 'glucose_bin']
    plots_per_row = 3
    rows = math.ceil(len(categorical_cols) / plots_per_row)

    fig, axes = plt.subplots(rows, plots_per_row, figsize=(5 * plots_per_row, 4 * rows))
    axes = np.atleast_2d(axes)

    for i, col in enumerate(categorical_cols):
        r = i // plots_per_row
        c = i % plots_per_row
        
        sns.countplot(x=col, hue='stroke', data=dataset_train, palette='viridis', ax=axes[r][c])
        axes[r][c].set_title(f"Distribution of {col} by Stroke Status")
        axes[r][c].tick_params(axis='x', rotation=45)

    total_subplots = rows * plots_per_row
    for j in range(i+1, total_subplots):
        r = j // plots_per_row
        c = j % plots_per_row
        axes[r][c].set_visible(False)

    fig.tight_layout()
    plt.show()

### If using the feature binning, we need to get rid of the age and bmi columns (we will use age_bin and bmi_bin)

In [None]:
if feature_binning:
    dataset_train = dataset_train.drop(['age', 'avg_glucose_level', 'bmi'], axis=1)
    dataset_val = dataset_val.drop(['age', 'avg_glucose_level', 'bmi'], axis=1)
    dataset_test = dataset_test.drop(['age', 'avg_glucose_level', 'bmi'], axis=1)

In [None]:
categorical_features = [
    col for col in dataset_train.columns 
    if dataset_train[col].dtype == 'object' or dataset_train[col].nunique() <= 10 
]


numerical_features = [col for col in dataset_train.columns if col not in categorical_features]

categorical_features = categorical_features[:-1]

categorical_features = [dataset_train.columns.get_loc(col) for col in categorical_features]
numerical_features = [dataset_train.columns.get_loc(col) for col in numerical_features]
categorical_features, numerical_features

In [None]:
encoder = LabelEncoder()

for column in dataset_train.select_dtypes(include=['object', 'category']).columns:
    dataset_train[column] = encoder.fit_transform(dataset_train[column])
    dataset_val[column] = encoder.transform(dataset_val[column])
    dataset_test[column] = encoder.transform(dataset_test[column])

joblib.dump(encoder, encoder_file_path)

dataset_train

In [None]:
X_train, y_train = dataset_train.drop('stroke', axis=1), dataset_train['stroke']
X_val, y_val = dataset_val.drop('stroke', axis=1), dataset_val['stroke']
X_test, y_test = dataset_test.drop('stroke', axis=1), dataset_test['stroke']

## Preprocessing

### Combating imbalanced dataset

In [None]:
if imbalanced_action == 1:
        smote = SMOTE(random_state=42)
        X_train, y_train = smote.fit_resample(X_train, y_train)
elif imbalanced_action == 2:
        under = RandomUnderSampler(random_state=42)
        X_train, y_train = under.fit_resample(X_train, y_train)
elif imbalanced_action == 3:
        over = RandomOverSampler(random_state=42)
        X_train, y_train = over.fit_resample(X_train, y_train)
elif imbalanced_action == 4:
        over = SMOTE(sampling_strategy = 1, random_state=42)
        under = RandomUnderSampler(sampling_strategy = 0.1, random_state=42)
        steps = [('u', under), ('o', over)]
        pipeline = Pipeline(steps=steps)
        X_train, y_train = pipeline.fit_resample(X_train, y_train)

### Selection of most important features

#### Method 1: via Filter Methods

#### Checking categorical features first using mutual information score, the Chi index

In [None]:
if feature_selection_mode == 0:
    colors = 'coolwarm'
    fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (12,5))

    plt.subplot(1,1,1)
    features = X_train.iloc[:, categorical_features]
    target = y_train

    best_features = SelectKBest(score_func = mutual_info_classif,k = 'all')
    fit = best_features.fit(features,target)

    featureScores = pd.DataFrame(data = fit.scores_,index = list(features.columns),columns = ['Mutual Information Score']) 
    sns.heatmap(featureScores.sort_values(ascending = False,by = 'Mutual Information Score'),annot = True,cmap = colors,linewidths = 0.4,linecolor = 'black',fmt = '.2f');
    plt.title('Categorical Feature Importances using Mutual Information Score');

In [None]:
if feature_selection_mode == 0:
    fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (12,5))

    plt.subplot(1,1,1)
    features = X_train.iloc[:, categorical_features]
    target = y_train

    best_features = SelectKBest(score_func = chi2,k = 'all')
    fit = best_features.fit(features,target)

    featureScores_Chi = pd.DataFrame(data = fit.scores_,index = list(features.columns),columns = ['Chi-Square Test']) 
    sns.heatmap(featureScores_Chi.sort_values(ascending = False,by = 'Chi-Square Test'),annot = True,cmap = colors,linewidths = 0.4,linecolor = 'black',fmt = '.2f');
    plt.title('Feature Importances using Chi-Square Test');


#### Now checking for the numerical features

In [None]:
from sklearn.feature_selection import f_classif

if feature_selection_mode == 0 and not feature_binning:
    fig, ax = plt.subplots(nrows = 1, ncols = 2, figsize = (12,5))

    plt.subplot(1,1,1)
    features = X_train.iloc[:,numerical_features]
    target = y_train

    best_features = SelectKBest(score_func = f_classif,k = 'all')
    fit = best_features.fit(features,target)

    featureScores_ANOVA = pd.DataFrame(data = fit.scores_,index = list(features.columns),columns = ['ANOVA Score']) 
    sns.heatmap(featureScores_ANOVA.sort_values(ascending = False,by = 'ANOVA Score'),annot = True,cmap = colors,linewidths = 0.4,linecolor = 'black',fmt = '.2f');
    plt.title('Selection of Numerical Features');


#### Dropping the columns where the Chi-Square score and ANOVA are not in the top k_features list

In [None]:
if feature_selection_mode == 0:
    categorical_features_names = X_train.columns[categorical_features]
    numerical_features_names = X_train.columns[numerical_features]

    chi2_features  = featureScores_Chi.sort_values(ascending = False,by = 'Chi-Square Test').head(k_features).index
    if not feature_binning:
        anova_features  = featureScores_ANOVA.sort_values(ascending = False,by = 'ANOVA Score').head(k_features).index
    else:
        anova_features = []
    X_train = X_train[chi2_features.union(anova_features)]
    X_val = X_val[chi2_features.union(anova_features)]
    X_test = X_test[chi2_features.union(anova_features)]

    numerical_features = [X_train.columns.get_loc(col) for col in numerical_features_names if col in X_train.columns]
    categorical_features = [X_train.columns.get_loc(col) for col in categorical_features_names if col in X_train.columns]

In [None]:
if feature_selection_mode == 1:
    clf = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)
    clf.fit(X_train, y_train)

    feature_importances = pd.Series(clf.feature_importances_, index=X_train.columns)
    feature_importances.nlargest(10).plot(kind='barh')


    most_important_features = feature_importances.nlargest(k_features).index
    X_train = X_train[most_important_features]
    X_val = X_val[most_important_features]
    X_test = X_test[most_important_features]

    numerical_features = [X_train.columns.get_loc(col) for col in most_important_features if col in X_train.columns]
    categorical_features = [X_train.columns.get_loc(col) for col in most_important_features if col in X_train.columns]


In [None]:
if normalize and not feature_binning:
    normalizer = MinMaxScaler()
    X_train.iloc[:, numerical_features] = normalizer.fit_transform(X_train.iloc[:, numerical_features])
    X_val.iloc[:, numerical_features] = normalizer.transform(X_val.iloc[:, numerical_features])
    X_test.iloc[:, numerical_features] = normalizer.transform(X_test.iloc[:, numerical_features])
    joblib.dump(normalizer, normalizer_file_path)

In [None]:
if scale and not feature_binning:
    scaler = StandardScaler()
    X_train.iloc[:, numerical_features] = scaler.fit_transform(X_train.iloc[:, numerical_features])
    X_val.iloc[:, numerical_features] = scaler.transform(X_val.iloc[:, numerical_features])
    X_test.iloc[:, numerical_features] = scaler.transform(X_test.iloc[:, numerical_features])
    joblib.dump(scaler, scaler_file_path)

## Model training

In [None]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Dense, Dropout, BatchNormalization, Input,
                                     Conv1D, Flatten, LSTM, GRU, Bidirectional,
                                     LeakyReLU, PReLU, ELU, Add, Concatenate, Lambda,
                                     Multiply, Average)
from tensorflow.keras.regularizers import l2
import tensorflow.keras.backend as K


num_features = X_train.shape[1]

if model_type == 0:
    model = Sequential([
        Dense(32, activation='relu', input_dim=num_features),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 1:
    model = Sequential([
        Dense(64, activation='relu', input_dim=num_features),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 2:
    model = Sequential([
        Dense(128, activation='relu', input_dim=num_features),
        Dense(64, activation='relu'),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 3:
    model = Sequential([
        Dense(64, activation='relu', input_dim=num_features),
        Dropout(0.5),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 4:
    model = Sequential([
        Dense(128, activation='relu', input_dim=num_features),
        Dropout(0.4),
        Dense(64, activation='relu'),
        Dropout(0.4),
        Dense(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 5:
    model = Sequential([
        Dense(64, input_dim=num_features),
        BatchNormalization(),
        Dense(32, activation='relu'),
        BatchNormalization(),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 6:
    model = Sequential([
        Dense(128, activation='relu', input_dim=num_features),
        BatchNormalization(),
        Dropout(0.5),
        Dense(64, activation='relu'),
        BatchNormalization(),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 7:
    model = Sequential([
        Dense(64, activation='relu', kernel_regularizer=l2(0.01), input_dim=num_features),
        Dense(32, activation='relu', kernel_regularizer=l2(0.01)),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 8:
    model = Sequential([
        Dense(64, input_dim=num_features),
        LeakyReLU(alpha=0.1),
        Dense(32),
        LeakyReLU(alpha=0.1),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 9:
    model = Sequential([
        Dense(64, input_dim=num_features),
        PReLU(),
        Dense(32),
        PReLU(),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 10:
    model = Sequential([
        Dense(64, input_dim=num_features),
        ELU(alpha=1.0),
        Dense(32),
        ELU(alpha=1.0),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 11:
    input_layer = Input(shape=(num_features, 1))
    conv = Conv1D(32, kernel_size=3, activation='relu', padding='same')(input_layer)
    flat = Flatten()(conv)
    output = Dense(1, activation='sigmoid')(flat)
    model = Model(inputs=input_layer, outputs=output)
elif model_type == 12:
    input_layer = Input(shape=(num_features, 1))
    lstm = LSTM(32)(input_layer)
    output = Dense(1, activation='sigmoid')(lstm)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 13:
    input_layer = Input(shape=(num_features, 1))
    gru = GRU(32)(input_layer)
    output = Dense(1, activation='sigmoid')(gru)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 14:
    input_layer = Input(shape=(num_features, 1))
    bi_lstm = Bidirectional(LSTM(32))(input_layer)
    output = Dense(1, activation='sigmoid')(bi_lstm)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 15:
    input_layer = Input(shape=(num_features, 1))
    lstm1 = LSTM(64, return_sequences=True)(input_layer)
    lstm2 = LSTM(32)(lstm1)
    output = Dense(1, activation='sigmoid')(lstm2)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 16:
    input_layer = Input(shape=(num_features, 1))
    gru1 = GRU(64, return_sequences=True)(input_layer)
    gru2 = GRU(32)(gru1)
    output = Dense(1, activation='sigmoid')(gru2)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 17:
    input_layer = Input(shape=(num_features, 1))
    lstm_out = LSTM(64, return_sequences=True)(input_layer)
    attention = Dense(1, activation='tanh')(lstm_out)
    attention = Flatten()(attention)
    attention = Dense(num_features, activation='softmax')(attention)
    attention = Lambda(lambda x: K.expand_dims(x, axis=-1))(attention)
    context = Lambda(lambda x: x[0] * x[1])([lstm_out, attention])
    context = Lambda(lambda x: K.sum(x, axis=1))(context)
    output = Dense(1, activation='sigmoid')(context)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 18:
    from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization
    input_layer = Input(shape=(num_features, 1))
    proj = Dense(64)(input_layer)
    attn = MultiHeadAttention(num_heads=4, key_dim=16)(proj, proj)
    attn = LayerNormalization()(attn + proj)
    flat = Flatten()(attn)
    output = Dense(1, activation='sigmoid')(flat)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 19:
    input_layer = Input(shape=(num_features,))
    attention_probs = Dense(num_features, activation='softmax')(input_layer)
    attended = Multiply()([input_layer, attention_probs])
    dense = Dense(64, activation='relu')(attended)
    output = Dense(1, activation='sigmoid')(dense)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 20:
    input_layer = Input(shape=(num_features,))
    wide = Dense(1, activation='linear')(input_layer)
    deep = Dense(64, activation='relu')(input_layer)
    deep = Dense(32, activation='relu')(deep)
    deep = Dense(1, activation='linear')(deep)
    combined = Add()([wide, deep])
    output = Dense(1, activation='sigmoid')(combined)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 21:
    input_layer = Input(shape=(num_features,))
    x = Dense(64, activation='relu')(input_layer)
    shortcut = x
    x = Dense(64, activation='relu')(x)
    x = Add()([x, shortcut])
    x = Dense(32, activation='relu')(x)
    output = Dense(1, activation='sigmoid')(x)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 22:
    input_layer = Input(shape=(num_features,))
    x1 = Dense(32, activation='relu')(input_layer)
    x2 = Dense(32, activation='relu')(Concatenate()([input_layer, x1]))
    x3 = Dense(32, activation='relu')(Concatenate()([input_layer, x1, x2]))
    output = Dense(1, activation='sigmoid')(x3)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 23:
    input_layer = Input(shape=(num_features,))
    branch1 = Dense(32, activation='relu')(input_layer)
    branch2 = Dense(32, activation='relu')(input_layer)
    branch3 = Dense(32, activation='relu')(input_layer)
    merged = Concatenate()([branch1, branch2, branch3])
    output = Dense(1, activation='sigmoid')(merged)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 24:
    input_layer = Input(shape=(num_features,))
    encoded = Dense(32, activation='relu')(input_layer)
    classifier = Dense(16, activation='relu')(encoded)
    output = Dense(1, activation='sigmoid')(classifier)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 25:
    input_layer = Input(shape=(num_features,))
    hidden = Dense(64, activation='relu')(input_layer)
    z_mean = Dense(16)(hidden)
    z_log_var = Dense(16)(hidden)
    latent = z_mean
    output = Dense(1, activation='sigmoid')(latent)
    model = Model(inputs=input_layer, outputs=output)

elif model_type == 26:
    input_layer = Input(shape=(num_features,))
    shared = Dense(64, activation='relu')(input_layer)
    task1 = Dense(32, activation='relu')(shared)
    output1 = Dense(1, activation='sigmoid', name='stroke')(task1)
    task2 = Dense(32, activation='relu')(shared)
    output2 = Dense(1, activation='sigmoid', name='other')(task2)
    model = Model(inputs=input_layer, outputs=[output1, output2])

elif model_type == 27:
    model = Sequential([
        Dense(64, activation='relu', input_dim=num_features),
        Dropout(0.5),
        Dense(32, activation='relu'),
        Dropout(0.5),
        Dense(1, activation='sigmoid')
    ])

elif model_type == 28:
    input_layer = Input(shape=(num_features,))
    branch1 = Dense(64, activation='relu')(input_layer)
    branch1 = Dense(1, activation='sigmoid')(branch1)
    branch2 = Dense(32, activation='relu')(input_layer)
    branch2 = Dense(1, activation='sigmoid')(branch2)
    ensemble_output = Average()([branch1, branch2])
    model = Model(inputs=input_layer, outputs=ensemble_output)
elif model_type == 29:
    input_layer = Input(shape=(num_features,))
    mlp = Dense(64, activation='relu')(input_layer)
    mlp = Dense(32, activation='relu')(mlp)
    reshaped = Lambda(lambda x: K.expand_dims(x, axis=-1))(input_layer)
    cnn = Conv1D(32, kernel_size=3, activation='relu', padding='same')(reshaped)
    cnn = Flatten()(cnn)
    combined = Concatenate()([mlp, cnn])
    output = Dense(1, activation='sigmoid')(combined)
    model = Model(inputs=input_layer, outputs=output)

In [None]:
def weighted_binary_crossentropy(weights):
    def loss(y_true, y_pred):
        y_true = K.cast(y_true, y_pred.dtype)
        loss = -weights[1] * y_true * K.log(y_pred + K.epsilon()) - \
               weights[0] * (1 - y_true) * K.log(1 - y_pred + K.epsilon())
        return K.mean(loss)
    return loss

In [None]:
def focal_loss(gamma=2., alpha=0.25):
    def loss(y_true, y_pred):
        y_pred = tf.clip_by_value(y_pred, K.epsilon(), 1. - K.epsilon())
        loss_value = -y_true * alpha * K.pow(1 - y_pred, gamma) * K.log(y_pred) - \
                     (1 - y_true) * (1 - alpha) * K.pow(y_pred, gamma) * K.log(1 - y_pred)
        return K.mean(loss_value)
    return loss

In [None]:
def tversky_loss(y_true, y_pred, alpha=0.5, beta=0.5, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    tp = K.sum(y_true_f * y_pred_f)
    fn = K.sum(y_true_f * (1 - y_pred_f))
    fp = K.sum((1 - y_true_f) * y_pred_f)
    tversky_index = (tp + smooth) / (tp + alpha * fn + beta * fp + smooth)
    return 1 - tversky_index

def focal_tversky_loss(y_true, y_pred, alpha=0.5, beta=0.5, gamma=1.0, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    tp = K.sum(y_true_f * y_pred_f)
    fn = K.sum(y_true_f * (1 - y_pred_f))
    fp = K.sum((1 - y_true_f) * y_pred_f)
    tversky_index = (tp + smooth) / (tp + alpha * fn + beta * fp + smooth)
    return K.pow((1 - tversky_index), gamma)

def dice_loss(y_true, y_pred, smooth=1e-6):
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    dice_coeff = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return 1 - dice_coeff

def bce_dice_loss(y_true, y_pred, smooth=1e-6):
    bce = tf.keras.losses.binary_crossentropy(y_true, y_pred)
    d_loss = dice_loss(y_true, y_pred, smooth)
    return bce + d_loss

def fbeta_loss(y_true, y_pred, beta=2, smooth=1e-6):
    y_pred = K.clip(y_pred, K.epsilon(), 1 - K.epsilon())
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    tp = K.sum(y_true_f * y_pred_f)
    fp = K.sum((1 - y_true_f) * y_pred_f)
    fn = K.sum(y_true_f * (1 - y_pred_f))
    fbeta = (1 + beta**2) * tp / ((1 + beta**2) * tp + beta**2 * fn + fp + smooth)
    return 1 - fbeta

In [None]:
from sklearn.utils import class_weight
y_train_np = np.array(y_train).flatten()

weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train_np),
    y=y_train_np
)

class_weights = {i: weight for i, weight in enumerate(weights)}
print("Class weights:", class_weights)

In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

def f2_metric(y_true, y_pred):
    y_true = K.cast(y_true, 'float32')
    y_pred = K.cast(y_pred, 'float32')
    
    y_pred = K.round(y_pred)
    
    tp = K.sum(y_true * y_pred, axis=0)
    fp = K.sum((1 - y_true) * y_pred, axis=0)
    fn = K.sum(y_true * (1 - y_pred), axis=0)
    
    precision = tp / (tp + fp + K.epsilon())
    recall = tp / (tp + fn + K.epsilon())
    
    beta = 2
    f2 = (1 + beta**2) * (precision * recall) / (beta**2 * precision + recall + K.epsilon())
    return K.mean(f2)

if loss_type == 0:
    loss = 'binary_crossentropy'
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 1:
    loss = weighted_binary_crossentropy(weights=class_weights)
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 2:
    loss = focal_loss(gamma=2., alpha=0.25)
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 3:
    loss = tversky_loss
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 4:
    loss = focal_tversky_loss
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 5:
    loss = bce_dice_loss
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])
elif loss_type == 6:
    loss = fbeta_loss
    model.compile(optimizer='adam', loss=loss, metrics=[f2_metric])


In [None]:
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, TensorBoard, CSVLogger, TerminateOnNaN, LearningRateScheduler

callbacks = [
    EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=1e-6, verbose=1),
    TerminateOnNaN(),
]

history = model.fit(
    X_train, 
    y_train, 
    epochs=1000, 
    batch_size=128, 
    validation_data=(X_val, y_val),
    callbacks=callbacks
)

In [None]:
y_pred_probabilities = model.predict(X_val)
threshold = 0.5
y_pred = (y_pred_probabilities > threshold).astype(int)

## Evaluation Metrics on Training Set

In [None]:
y_pred_train = model.predict(X_train)
threshold = 0.5
y_pred_train = (y_pred_train > threshold).astype(int)

In [None]:
precision_train = precision_score(y_train, y_pred_train)
print(f"Train Precision: {precision_train:.2f}")
scrapbook.glue("precision_train", precision_train)

In [None]:
recall_train = recall_score(y_train, y_pred_train)
print(f"Train Recall: {recall_train:.2f}")
scrapbook.glue("recall_train", recall_train)

In [None]:
f1_score_train = f1_score(y_train, y_pred_train)
print(f"Train F1 Score: {f1_score_train:.2f}")
scrapbook.glue("f1_score_train", f1_score_train)

In [None]:
accuracy_score_train = accuracy_score(y_train, y_pred_train)
print(f"Train Accuracy: {accuracy_score_train:.2f}")
scrapbook.glue("accuracy_score_train", accuracy_score_train)

## Evaluation Metrics on Validation Set

In [None]:
report = classification_report(y_val, y_pred)
scrapbook.glue("classification_report", report)
print(report)

In [None]:
cm = confusion_matrix(y_val, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d')
plt.title('Confusion Matrix on Validation Set')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()
scrapbook.glue("confusion_matrix", plt.gcf(), encoder='display')

In [None]:
precision = precision_score(y_val, y_pred)
print(f"Precision: {precision}")
scrapbook.glue("precision", precision)

In [None]:
recall = recall_score(y_val, y_pred)
print(f"Recall: {recall}")
scrapbook.glue("recall", recall)

In [None]:
f1_score_v = f1_score(y_val, y_pred)
print(f"F1 Score: {f1_score_v}")
scrapbook.glue("f1_score", f1_score_v)

In [None]:
accuracy = accuracy_score(y_val, y_pred)
print(f"Accuracy: {accuracy}")
scrapbook.glue("accuracy", accuracy)

In [None]:
mcc = matthews_corrcoef(y_val, y_pred)
print(f"Matthews correlation coefficient: {mcc}")
scrapbook.glue("mcc", mcc)

In [None]:
precision, recall, thresholds = precision_recall_curve(y_val, y_pred_probabilities)
pr_auc = auc(recall, precision)

plt.figure()
plt.plot(recall, precision, label=f'PR curve (AUC = {pr_auc:.2f})')
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
plt.legend()
plt.show()
scrapbook.glue("precision_recall_curve", plt.gcf(), encoder='display')
scrapbook.glue("pr_auc", pr_auc)

## Computing validation metrics using best threshold for F1-score (balanced recall and precision)

In [None]:
f1_scores = 2 * (precision[:-1] * recall[:-1]) / (precision[:-1] + recall[:-1])

thresholds_ = thresholds[~np.isnan(f1_scores)]
f1_scores = f1_scores[~np.isnan(f1_scores)]

best_idx = np.argmax(f1_scores)
best_threshold = thresholds_[best_idx]

print(f"The threshold that maximizes F1 (and thus balances precision and recall) is: {best_threshold}")
scrapbook.glue("best_threshold_f1", float(best_threshold))

In [None]:
y_pred_probabilties = model.predict(X_val)
y_pred = (y_pred_probabilties > best_threshold).astype(int)

In [None]:
cm = confusion_matrix(y_val, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d')
plt.title('Confusion Matrix on Validation Set')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()
scrapbook.glue("confusion_matrix_f1", plt.gcf(), encoder='display')

In [None]:
precision_ = precision_score(y_val, y_pred)
print(f"Precision (Optimal): {precision_}")
scrapbook.glue("precision_optimal_f1", precision_)

In [None]:
recall_ = recall_score(y_val, y_pred)
print(f"Recall (Optimal): {recall_}")
scrapbook.glue("recall_optimal_f1", recall_)

In [None]:
f1_score_v = f1_score(y_val, y_pred)
print(f"F1 Score Best: {f1_score_v}")
scrapbook.glue("f1_score_optimal", f1_score_v)

In [None]:
mcc = matthews_corrcoef(y_val, y_pred)
print(f"Matthews correlation coefficient: {mcc}")
scrapbook.glue("mcc_optimal_f1", mcc)

In [None]:
accuracy = accuracy_score(y_val, y_pred)
print(f"Accuracy (Optimal): {accuracy}")
scrapbook.glue("accuracy_optimal_f1", accuracy)

## Computing validation metrics using best threshold for F2-score (recall more important than precision)

In [None]:
f2_scores = 5 * (precision[:-1] * recall[:-1]) / (4 * precision[:-1] + recall[:-1])

thresholds_ = thresholds[~np.isnan(f2_scores)]
f2_scores = f2_scores[~np.isnan(f2_scores)]

best_idx = np.argmax(f2_scores)
best_threshold = thresholds_[best_idx]

print(f"The threshold that maximizes F2 (more recall importance than precision) is: {best_threshold}")
scrapbook.glue("best_threshold_f2", float(best_threshold))

In [None]:
y_pred_probabilties = model.predict(X_val)
y_pred = (y_pred_probabilties > best_threshold).astype(int)

In [None]:
cm = confusion_matrix(y_val, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d')
plt.title('Confusion Matrix on Validation Set')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()
scrapbook.glue("confusion_matrix_f2", plt.gcf(), encoder='display')

In [None]:
precision_ = precision_score(y_val, y_pred)
print(f"Precision (Optimal): {precision_}")
scrapbook.glue("precision_optimal_f2", precision_)

In [None]:
recall_ = recall_score(y_val, y_pred)
print(f"Recall (Optimal): {recall_}")
scrapbook.glue("recall_optimal_f2", recall_)

In [None]:
f2_score = 5 * (precision_ * recall_) / (4 * precision_ + recall_)
print(f"F2 Score Best: {f2_score}")
scrapbook.glue("f2_score_optimal_thres", f2_score)

In [None]:
mcc = matthews_corrcoef(y_val, y_pred)
print(f"Matthews correlation coefficient: {mcc}")
scrapbook.glue("mcc_optimal_f2", mcc)

In [None]:
accuracy = accuracy_score(y_val, y_pred)
print(f"Accuracy: {accuracy}")
scrapbook.glue("accuracy_optimal_f2", accuracy)

In [None]:
joblib.dump(model, model_file_path)

## Computing test metrics

In [None]:
y_pred_probabilties = model.predict(X_test)
y_pred = (y_pred_probabilties > best_threshold).astype(int)

In [None]:
cm = confusion_matrix(y_test, y_pred)

plt.figure(figsize=(6, 4))
sns.heatmap(cm, annot=True, cmap='Blues', fmt='d')
plt.title('Confusion Matrix on Test Set')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.show()
scrapbook.glue("confusion_matrix_test", plt.gcf(), encoder='display')

In [None]:
precision_ = precision_score(y_test, y_pred)
print(f"Precision (Test): {precision_}")
scrapbook.glue("precision_test", precision_)

In [None]:
recall_ = recall_score(y_test, y_pred)
print(f"Recall (Test): {recall_}")
scrapbook.glue("recall_test", recall_)

In [None]:
f1_score_v = f1_score(y_test, y_pred)
print(f"F1 Score (Test): {f1_score_v}")
scrapbook.glue("f1_score_test", f1_score_v)

In [None]:
f2_score = 5 * (precision_ * recall_) / (4 * precision_ + recall_)
print(f"F2 Score (Test): {f2_score}")
scrapbook.glue("f2_score_test", f2_score)

In [None]:
mcc_test = matthews_corrcoef(y_test, y_pred)
print(f"Matthews correlation coefficient: {mcc_test}")
scrapbook.glue("mcc_test", mcc_test)

In [None]:
accuracy_score_test = accuracy_score(y_test, y_pred)
print(f"Accuracy (Test): {accuracy_score_test}")
scrapbook.glue("accuracy_score_test", accuracy_score_test)