In [1]:
import pandas as pd
import numpy as np

In [24]:
def load_xls_to_pandas(filepath , sheet_name):
    xls = pd.ExcelFile(filepath)
    df = pd.read_excel(xls, sheet_name)
    df.columns = df.iloc[1]
    # remove row 1 and 2
    df = df.iloc[2:]
    return df

df_raw = load_xls_to_pandas('data/hp_retro_data.xls', 'Discretized Data (Final)')
print(df_raw.shape)
df_raw.head(10)

In [3]:
# num_cols = ["TEMP", "HEART_RATE", "AGE", "DURATION", "WBC", "ESR"]
# cat_cols = ["SEX", "PREV_VISIT", "HX_TRAUMA", "COMPLAINT_SITE", "HX_ILLNESS",
#             "GAIT_REPORTED", "APPEARANCE", "HIP_REST", "HIP_ROM", "HIP_INT_ROT",
#             "HIP_INT_ROT", "HIP_FLEXION", "GAIT_OBSERVED", "PAIN_ROM_HIP", "OTHER_PAIN_SITE",
#             "PAIN_PALPATION", "SWELLING", "CURRENT_ILLNESS", "PREV_PROBLEMS"]

cat_bins = {
        "TRIAGE": {"DISCHARGE": 0, "XRAY": 0, "LAB_XRAY_BSCAN": 1},
        "OTHER_PAIN_SITE": {np.nan: 0, 'NO': 1, 'OTHER': 2,
                            'PELVIS': 3,'BACK': 4, 'LEG': 5},
        }

def preproc_df(df):
    df = df.replace('?', np.nan)
    print(df.isnull().sum())

    # df = df.dropna(axis=1, thresh=200)
    
    # create maps for all variables
    for col in set(df.columns) - set(["NUMBER", "TRIAGE"]):
        if col not in cat_bins:
            if np.nan in list(df[col].unique()):
                vals = [np.nan] + list(set(df[col].unique()) - {np.nan})
            else:
                vals = list(set(df[col].unique()))
            cat_bins[col] = {k: v for v, k in enumerate(vals)}

    # map categorical variables using cat_bins
    for col in df.columns:
        if col in cat_bins:
            df[col] = df[col].map(cat_bins[col])

    # encode categorical variables
    # print number of different variables in categorical columns

    # encode nans as a new category
    return df

df = preproc_df(df_raw.copy(deep=True))
df.head(10)

In [22]:
# count unique in TRIAGE
triage_counts = df['TRIAGE'].value_counts()
print(triage_counts)

# plot
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")
ax = sns.barplot(triage_counts, alpha=0.8)
ax.set_ylabel('Number of examples')
ax.set_xlabel('Encoded triage class')
plt.title('Triage class distribution')
plt.show()

print("Percent of classes:")
print(triage_counts[0] / triage_counts.sum())
print(triage_counts[1] / triage_counts.sum())

In [5]:
df.describe()

In [6]:
print(df.shape)

## Split, supersample, encode (one-hot)

In [7]:
from imblearn.over_sampling import SMOTE
from collections import Counter
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import KFold, RepeatedStratifiedKFold
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns

X = df.drop(columns=['NUMBER', 'TRIAGE'])
y = df['TRIAGE']

print('Original dataset shape %s' % Counter(y))


In [8]:
def show_correlations(df: pd.DataFrame) -> None:
    plt.figure(figsize=(24, 10))
    correlation_matrix = df.corr(method="kendall")
    sns.heatmap(correlation_matrix, annot=True, cmap="Blues")


show_correlations(pd.concat([X, y], axis=1))

In [9]:
# split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print('Train and test datasets shape %s' % Counter(y_train), Counter(y_test))

sm = SMOTE(random_state=42)

encoder = OneHotEncoder()
X, y = encoder.fit_transform(X.values), y.values

k_folds = 5
kf = RepeatedStratifiedKFold(n_splits=k_folds, n_repeats=5, random_state=42)

In [23]:
X.shape

In [10]:
from sklearn.metrics import confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, auc

def calculate_risk_thresholds(y: np.ndarray, y_pred: np.ndarray) -> tuple[float, float]:
    fpr, tpr, thresholds = roc_curve(y, y_pred)
    sensitivity, specificity = tpr, 1 - fpr

    medium_risk = (
        thresholds[np.where(sensitivity >= 0.99)[0][0]] if np.any(sensitivity >= 0.99) else None
    )
    high_risk = (
        thresholds[np.where(specificity >= 0.90)[0][-1]] if np.any(specificity >= 0.90) else None
    )
    return medium_risk, high_risk

def make_prediction(y_pred_proba: np.ndarray, medium_risk: float, high_risk: float) -> np.ndarray:
    return np.array(
        [
            False if response < medium_risk else True if response >= high_risk else np.nan
            for response in y_pred_proba
        ]
    )

def calculate_rates(y: np.ndarray, y_pred: np.ndarray) -> tuple[float, float, float, float]:
    tn, fp, fn, tp = confusion_matrix(y, y_pred).ravel()
    # positives, negatives = tp + fn, tn + fp

    tpr = tp / (tp + fn)  # sensitivity
    fnr = fn / (fn + tp)  # miss_rate
    fpr = fp / (fp + tn)  # fall_out
    tnr = tn / (tn + fp)  # specificity

    return tpr, fnr, fpr, tnr

def auprc_score(y: np.ndarray, y_pred: np.ndarray) -> float:
    precision, recall, _ = precision_recall_curve(y, y_pred)
    return auc(recall, precision)

In [11]:
def assess_classifier(X, y, classifier):
    
    auprc, auroc = [], []
    negatives, positives, unknowns = [], [], []
    rates = []
    
    for train, test in kf.split(X, y):

        X_train, X_test = X[train], X[test]
        y_train, y_test = y[train], y[test]

        y_pred_proba_train = classifier.predict_proba(X_train)[:, 1]
        medium_risk, high_risk = calculate_risk_thresholds(y_train, y_pred_proba_train)
        
        y_pred_proba = classifier.predict_proba(X_test)[:, 1]
        auprc.append(auprc_score(y_test, y_pred_proba))
        auroc.append(roc_auc_score(y_test, y_pred_proba))

        predicted = make_prediction(y_pred_proba, medium_risk, high_risk)

        positive = np.sum(predicted == True)
        negative = np.sum(predicted == False)
        unknown = np.sum(np.isnan(predicted))

        number_of_samples: int = X_test.shape[0]
        negatives.append(negative / number_of_samples)
        positives.append(positive / number_of_samples)
        unknowns.append(unknown / number_of_samples)

        mask = ~np.isnan(predicted)
        rates.append(calculate_rates(y_test[mask], predicted[mask]))

    return {
        "auprc": np.mean(auprc),
        "auroc": np.mean(auroc),
        "negatives": np.mean(negatives),
        "positives": np.mean(positives),
        "unknowns": np.mean(unknowns),
        "rates": np.mean(rates, axis=0),
        }

## Model

### baseline model - logistic regression

In [12]:
from sklearn.linear_model import LogisticRegression 

# Create a RandomForestClassifier instance

classifier = LogisticRegression(random_state=42, max_iter=1000)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
classifier.fit(X_train, y_train)

baseline_model = classifier
baseline_assessment = assess_classifier(X, y, baseline_model)
baseline_assessment


## Best model

In [13]:
from sklearn.ensemble import RandomForestClassifier 
import matplotlib.pyplot as plt

# Create a RandomForestClassifier instance

rf_models = []
baseline_models_assessments = []
for train, test in kf.split(X, y):    
    X_train, X_test = X[train], X[test]
    y_train, y_test = y[train], y[test]

    classifier = RandomForestClassifier(random_state=42)

    X_train_res, y_train_res = sm.fit_resample(X_train, y_train)
    classifier.fit(X_train_res, y_train_res)

    rf_models.append({
        'model': classifier,
        'assesment': assess_classifier(X, y, classifier)
            })

best_model = max(rf_models, key=lambda x: x['assesment']['auroc'])

best_model['assesment']

In [14]:
kfold_results = {
    'base': baseline_assessment,
    'best': best_model['assesment']
}

In [15]:
def show_result(result: dict, classifier: str) -> None:
    two_colors, three_colors, four_colors = [
        sns.color_palette("magma", number) for number in (2, 3, 4)
    ]
    fig, axs = plt.subplots(2, 2, figsize=(12, 8), tight_layout=True)
    ax1, ax2, ax3, ax4 = axs.flatten()

    ax1.bar(
        ["AUPRC", "AUROC"],
        [result["auprc"], result["auroc"]],
        color=two_colors,
    )
    ax1.grid(axis="y", linestyle="--", alpha=0.25)
    for i, value in enumerate([result["auprc"], result["auroc"]]):
        ax1.text(i, value, f"{value * 100:.2f}%", ha="center", va="bottom")
    ax1.set_xlabel("metric")
    ax1.set_ylabel("score")
    ax1.set_title("Scores")

    ax2.bar(
        ["Negatives", "Positives", "Unknowns"],
        [result["negatives"], result["positives"], result["unknowns"]],
        color=three_colors,
    )
    ax2.grid(axis="y", linestyle="--", alpha=0.25)
    for i, value in enumerate([result["negatives"], result["positives"], result["unknowns"]]):
        ax2.text(i, value, f"{value * 100:.2f}%", ha="center", va="bottom")
    ax2.set_xlabel("class")
    ax2.set_ylabel("percentage")
    ax2.set_title("Classification Distribution")

    ax3.bar(["TPR", "FNR", "FPR", "TNR"], result["rates"], color=four_colors)
    ax3.grid(axis="y", linestyle="--", alpha=0.25)
    for i, value in enumerate(result["rates"]):
        ax3.text(i, value, f"{value * 100:.2f}%", ha="center", va="bottom")
    ax3.set_xlabel("type")
    ax3.set_ylabel("rate")
    ax3.set_title("Classification Rates")

    # sns.heatmap(result["confusion_matrix"], annot=True, cmap="Blues", ax=ax4)
    # ax4.set_xticklabels(["Predicted 0", "Predicted 1"])
    # ax4.set_yticklabels(["Actual 0", "Actual 1"])
    # ax4.set_title("Confusion Matrix (mean)")
    ax4.axis("off")

    fig.suptitle(f"Results for {classifier.capitalize()}")
    fig.savefig(f"./resources/figures/hp_retro/{classifier}.png")
    plt.show()


for classifier, result in kfold_results.items():
    show_result(result, classifier)

In [28]:
import pickle

pickle.dump(baseline_model, open('baseline_model.p', 'wb'))

pickle.dump(best_model['model'], open('best_model.p', 'wb'))