# Step 5: Machine Learning

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import os
from typing import Literal, Union

from imblearn.over_sampling import ADASYN, SMOTE, RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler

from sklearn.ensemble import HistGradientBoostingClassifier
import xgboost as xgb
import lightgbm as lgb

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, classification_report, ConfusionMatrixDisplay
import shap

# Set font family to a font that supports CJK characters
from matplotlib import rcParams
rcParams['font.sans-serif'] = ['PingFang HK']
rcParams['axes.unicode_minus'] = False  # Ensure minus sign renders correctly

# Ignore warnings 
import warnings
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)

## 0. Global Variables

In [2]:
# Constants
LEARNING_RATE = 0.005
RANDOM_STATE = 101

# Study targets
TARGET_COLS = ["Premature Rupture of Membranes",    # 胎膜早破
               "Fetal Distress",                    # 胎儿宫内窘迫
               "Macrosomia",                        # 巨大儿
               "Amniotic Fluid Contamination"]      # 羊水污染

In [3]:
#Path to save all model metrics
BASE_DIR = os.path.join(os.getcwd(), "Model_Metrics")
if not os.path.isdir(BASE_DIR):
    os.makedirs(BASE_DIR)

#Path to load imputed datasets    
IMPUTED_DATASETS_DIR = os.path.join(os.getcwd(), "MICE", "Imputed_Datasets")
if not os.path.isdir(IMPUTED_DATASETS_DIR):
    raise IOError("Imputed datasets directory not found. Run MICE script first.")

## 1. Initialize and Wrap Models

In [4]:
# Function to initialize and wrap models
def get_models(is_balanced: bool=True, L1_regularization: float=1.0, L2_regularization: float=1.0) -> dict:
    ''' 
    Returns a dictionary `{Model_Name: Model}` with new instances of:
        * "XGB" - Extreme Gradient Boosting Classifier (XGBoost).
        * "LGBM" - Light Gradient Boosting Machine Classifier (LightGBM).
        * "HistGB" - Histogram-Based Gradient Boosting Classifier (HistGB).
        
    If working with imbalanced datasets, set `is_balanced=False`.
    '''
    
    # Initialize XGBoost Classifier
    xgb_model = xgb.XGBClassifier(
        n_estimators=200,
        max_depth=5,                  # tunable
        learning_rate=LEARNING_RATE,  # tunable
        subsample=0.8,                # tunable
        colsample_bytree=0.8,         # tunable
        random_state=RANDOM_STATE,     # added for reproducibility
        
        scale_pos_weight=1 if is_balanced else 8,           # tunable
        reg_alpha=L1_regularization,                 # tunable L1 regularization
        reg_lambda=L2_regularization,                # tunable L2 regularization
        eval_metric='aucpr',    # tunable, default is 'logloss'
    )
    
    # Initialize LightGBM Classifier
    lgbm_model = lgb.LGBMClassifier(
        n_estimators=200,
        learning_rate=LEARNING_RATE,  # tunable
        max_depth=5,                 # tunable
        subsample=0.8,                # tunable
        colsample_bytree=0.8,         # tunable
        random_state=RANDOM_STATE,     # added for reproducibility
        # force_col_wise=True,        #Use if there are more columns than rows
        verbose=-1,              #Requires TESTING
        
        class_weight=None if is_balanced else 'balanced',  # tunable
        # is_unbalance= not is_balanced,                     # tunable alternative to class_weight, choose one
        boosting_type='goss' if is_balanced else 'dart',   # tunable, DART requires more iterations and is slower
        reg_alpha=L1_regularization,                 # tunable L1 regularization
        reg_lambda=L2_regularization,                # tunable L2 regularization
    )
    
    # Initialize HistGradientBoostingClassifier
    hist_model = HistGradientBoostingClassifier(
        max_iter=200,                 # tunable
        learning_rate=LEARNING_RATE,  # tunable
        max_depth=5,                  # tunable
        min_samples_leaf=30,          # tunable
        random_state=RANDOM_STATE,    # added for reproducibility
        
        class_weight=None if is_balanced else 'balanced',  # tunable
        l2_regularization=L2_regularization,          # tunable
        scoring='loss' if is_balanced else 'balanced_accuracy',       # tunable
    )
    
    # Wrap classifiers with MultiOutputClassifier
    models = {
        "XGB": xgb_model,
        "LGBM": lgbm_model,
        "HistGB": hist_model
    }
    
    return models

## 2. Functions to split, scale, and resample data in a single dataset

In [5]:
# function to split data into train and test
def _split_data(features, target, test_size):
    X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=test_size, random_state=RANDOM_STATE, stratify=target)   
    return X_train, X_test, y_train, y_test

# function to standardize the data
def _standardize_data(train_features, test_features):
    scaler = StandardScaler()
    train_scaled = scaler.fit_transform(train_features)
    test_scaled = scaler.transform(test_features)
    return train_scaled, test_scaled

# Over-sample minority class (Positive cases) and return several single target datasets
def _resample(X_train_scaled: np.ndarray, y_train: pd.Series, 
                strategy: Literal[r"ADASYN", r'SMOTE', r'RANDOM', r'UNDERSAMPLE', None]=None):
    ''' 
    Oversample minority class or undersample majority class.
    
    Returns a Tuple `(Features: nD-Array, Target: 1D-array)`
    '''
    if strategy is None:
        return X_train_scaled, y_train
    elif strategy == 'SMOTE':
        resample_algorithm = SMOTE(random_state=RANDOM_STATE, k_neighbors=3)
    elif strategy == 'RANDOM':
        resample_algorithm = RandomOverSampler(random_state=RANDOM_STATE)
    elif strategy == 'UNDERSAMPLE':
        resample_algorithm = RandomUnderSampler(random_state=RANDOM_STATE)
    elif strategy == 'ADASYN':
        resample_algorithm = ADASYN(random_state=RANDOM_STATE, n_neighbors=3)
    else:
        raise ValueError(f"Invalid resampling strategy: {strategy}")
    
    X_res, y_res = resample_algorithm.fit_resample(X_train_scaled, y_train)
    return X_res, y_res


# DATASET PIPELINE
def dataset_pipeline(df_features: pd.DataFrame, df_target: pd.Series, resample_strategy: Union[str, None], test_size: float=0.2, debug: bool=False):
    ''' 
    1. Make Train/Test splits
    2. Standardize Train and Test Features
    3. Oversample imbalanced classes and create individual DataFrames
    
    Return a processed Tuple: (X_train, y_train, X_test, y_test)
    
    `(nD-array, 1D-array, nD-array, Series)`
    '''
    #DEBUG
    if debug:
        print(f"Split Dataframes Shapes - Features DF: {df_features.shape}, Target DF: {df_target.shape}")
        unique_values = df_target.unique()  # Get unique values for the target column
        print(f"\tUnique values for '{df_target.name}': {unique_values}")
    
    #Train test split
    X_train, X_test, y_train, y_test = _split_data(features=df_features, target=df_target, test_size=test_size)
    
    #DEBUG
    if debug:
        print(f"Shapes after train test split - X_train: {X_train.shape}, y_train: {y_train.shape}, X_test: {X_test.shape}, y_test: {y_test.shape}")
    
    # Standardize    
    X_train_scaled, X_test_scaled = _standardize_data(train_features=X_train, test_features=X_test)
    
    #DEBUG
    if debug:
        print(f"Shapes after scaling features - X_train: {X_train_scaled.shape}, y_train: {y_train.shape}, X_test: {X_test_scaled.shape}, y_test: {y_test.shape}")
 
    # Scale
    X_train_oversampled, y_train_oversampled = _resample(X_train_scaled=X_train_scaled, y_train=y_train, strategy=resample_strategy)
    
    #DEBUG
    if debug:
        print(f"Shapes after resampling - X_train: {X_train_oversampled.shape}, y_train: {y_train_oversampled.shape}, X_test: {X_test_scaled.shape}, y_test: {y_test.shape}")
    
    return X_train_oversampled, y_train_oversampled, X_test_scaled, y_test

## 3. Functions to train and evaluate the model

In [6]:
# Trainer function
def _train_model(model, train_features, train_target):
    model.fit(train_features, train_target)
    return model

# function to evaluate the model and save metrics
def _evaluate_model(model, model_name: str, dataset_id: str, x_test_scaled: np.ndarray, single_y_test: np.ndarray, target_id: str):
    model_dir = os.path.join(BASE_DIR, model_name)
    if not os.path.isdir(model_dir):
        os.makedirs(model_dir)
        
    dataset_dir = os.path.join(model_dir, dataset_id)
    if not os.path.isdir(dataset_dir):
        os.makedirs(dataset_dir)
    
    y_pred = model.predict(x_test_scaled)
    accuracy = accuracy_score(single_y_test, y_pred)
    report = classification_report(single_y_test, y_pred, target_names=["Negative", "Positive"])
    
    report_path = os.path.join(dataset_dir, "classification_reports.txt")
    # Open the file in append mode
    with open(report_path, "a") as f:
        f.write(f"{model_name} - {target_id}            Accuracy: {accuracy:.2f}\n")
        f.write(f"Classification Report:\n")
        f.write(report)
        f.write("\n\n")  # Extra spacing between entries
        
    #Generate confusion matrix
    disp = ConfusionMatrixDisplay.from_predictions(
        y_true=single_y_test, y_pred=y_pred,
        display_labels=["Negative", "Positive"],
        cmap=plt.cm.Blues,
        normalize="true"
    )
    plt.title(f"{model_name} - Confusion Matrix for {target_id}")
    plt.savefig(os.path.join(dataset_dir, f"Confusion_Matrix_{target_id}.png"))
    plt.close()

    return y_pred

# Get SHAP values
def get_shap_values(model, model_name: str, dataset_id: str, features_to_explain: np.ndarray, feature_names: list[str], target_id: str):
    """
    Generate and save SHAP summary plots for each target in a multi-output model.
    
    `features`: Use scaled values if the model was trained on scaled values.
    
    * Use `X_train` (or a subsample of it) to see how the model explains the data it was trained on. (Default)
	* Use `X_test` (or a hold-out set) to see how the model explains unseen data.
	* Use the entire dataset to get the global view. 
    """    
    # Check directories
    model_dir = os.path.join(BASE_DIR, model_name)
    if not os.path.isdir(model_dir):
        raise IOError(f"{model_dir} does not exist.\nTrain and evaluate the model first.")
    
    dataset_dir = os.path.join(model_dir, dataset_id)
    if not os.path.isdir(dataset_dir):
        raise IOError(f"{dataset_dir} does not exist.\nTrain and evaluate the model first.")
    

    # SHAP explainer for the target estimator
    explainer = shap.TreeExplainer(model)
    
    # Compute SHAP values
    shap_values = explainer.shap_values(features_to_explain)
    
    # TEST, WARNING for LGBM, list of arrays [negative, positive]
    # if isinstance(model, lgb.LGBMClassifier):
    #     shap_values = shap_values[1]
        

    shap.summary_plot(
        shap_values, 
        features_to_explain, 
        feature_names=feature_names,
        plot_type="bar",
        show=False
    )
    
    plt.gcf().set_size_inches(12, 20)  #(width, height)
    plt.xlabel("Mean Absolute SHAP Value")
    plt.title(f"{model_name} - SHAP Summary for {target_id}")
    file_name = os.path.join(dataset_dir, f"SHAP_{target_id}.png")
    plt.savefig(file_name, bbox_inches='tight')
    plt.close()
        
        
# TRAIN TEST PIPELINE
def train_test_pipeline(model, model_name: str, dataset_id: str, 
             train_features: np.ndarray, train_target: np.ndarray,
             test_features: np.ndarray, test_target: np.ndarray,
             feature_names: list[str], target_id: str,
             debug: bool=False):
    ''' 
    1. Train model.
    2. Evaluate model.
    3. SHAP values.
    
    Returns: Tuple(Trained model, Test-set Predictions)
    '''
    print(f"\tWorking with Model: {model_name} for Target: {target_id}...")
    trained_model = _train_model(model=model, train_features=train_features, train_target=train_target)
    y_pred = _evaluate_model(model=trained_model, model_name=model_name, dataset_id=dataset_id, 
                             x_test_scaled=test_features, single_y_test=test_target, target_id=target_id)
    get_shap_values(model=trained_model, model_name=model_name, dataset_id=dataset_id,
                    features_to_explain=train_features, feature_names=feature_names, target_id=target_id)
    print(f"\tprocess complete.")
    return trained_model, y_pred

## 4. Function to Load Datasets

In [7]:
#Load imputed datasets as a generator
def yield_imputed_dataframe(datasets_dir: str):
    '''
    Yields a tuple `(dataframe, dataframe_name)`
    '''
    dataset_filenames = [dataset for dataset in os.listdir(datasets_dir) if dataset.endswith(".csv")]
    if not dataset_filenames:
        raise IOError(f"No imputed datasets have been found at {datasets_dir}")
    
    for dataset_filename in dataset_filenames:
        full_path = os.path.join(datasets_dir, dataset_filename)
        df = pd.read_csv(full_path)
        #remove extension
        filename = os.path.splitext(os.path.basename(dataset_filename))[0]
        print(f"Working on file: {filename}")
        yield (df, filename)
        
#Split a dataset into features and targets datasets
def dataset_yielder(df: pd.DataFrame, target_cols: list[str]=TARGET_COLS):
    ''' 
    Yields one Tuple at a time: `(df_features, df_target, feature_names, target_name)`
    '''
    df_features = df.drop(columns=target_cols)
    feature_names = df_features.columns
    
    for target_col in target_cols:
        df_target = df[target_col]
        yield (df_features, df_target, feature_names, target_col)

## 5. Execution

In [8]:
#Main function
def main(imputed_datasets_dir: str, target_cols: list[str], resample_strategy: Literal[r"ADASYN", r'SMOTE', r'RANDOM', r'UNDERSAMPLE', None]=None, 
         test_size: float=0.2, debug:bool=False, L1_regularization: float=1.0, L2_regularization: float=1.0):
    #Increase L1 and L2 if model is overfitting
    #Yield imputed dataset
    for dataframe, dataframe_name in yield_imputed_dataframe(imputed_datasets_dir):
        #Yield features dataframe and target dataframe
        for df_features, df_target, feature_names, target_name in dataset_yielder(df=dataframe, target_cols=target_cols):
            #Dataset pipeline
            X_train, y_train, X_test, y_test = dataset_pipeline(df_features=df_features, df_target=df_target, resample_strategy=resample_strategy, test_size=test_size, debug=debug)
            #Get models
            models_dict = get_models(is_balanced=False if resample_strategy is None else True, L1_regularization=L1_regularization, L2_regularization=L2_regularization)
            for model_name, model in models_dict.items():
                train_test_pipeline(model=model, model_name=model_name, dataset_id=dataframe_name,
                                    train_features=X_train, train_target=y_train,
                                    test_features=X_test, test_target=y_test,
                                    feature_names=feature_names,target_id=target_name,
                                    debug=debug)
    print("\nTraining and evaluation complete.")

In [17]:
main(imputed_datasets_dir=IMPUTED_DATASETS_DIR, target_cols=TARGET_COLS, test_size=0.2, debug=False,
     resample_strategy=None, L1_regularization=1.5, L2_regularization=1.5)

Working on file: imputed_01
	Working with Model: XGB for Target: Premature Rupture of Membranes...
	process complete.
	Working with Model: LGBM for Target: Premature Rupture of Membranes...
	process complete.
	Working with Model: HistGB for Target: Premature Rupture of Membranes...
	process complete.
	Working with Model: XGB for Target: Fetal Distress...
	process complete.
	Working with Model: LGBM for Target: Fetal Distress...
	process complete.
	Working with Model: HistGB for Target: Fetal Distress...
	process complete.
	Working with Model: XGB for Target: Macrosomia...
	process complete.
	Working with Model: LGBM for Target: Macrosomia...
	process complete.
	Working with Model: HistGB for Target: Macrosomia...
	process complete.
	Working with Model: XGB for Target: Amniotic Fluid Contamination...
	process complete.
	Working with Model: LGBM for Target: Amniotic Fluid Contamination...
	process complete.
	Working with Model: HistGB for Target: Amniotic Fluid Contamination...
	process 