In [None]:
import numpy as np 
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline 
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, RocCurveDisplay, make_scorer
from sklearn.model_selection import learning_curve
import datetime
# Adapted from https://scikit-learn.org/stable/auto_examples/model_selection/plot_learning_curve.html
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import learning_curve
from imblearn.pipeline import Pipeline

In [None]:
def plot_learning_curve(
    title,
    train_sizes,
    train_scores,
    test_scores,
    fit_times,
    score_times,
    axes=None,
    ylim=None,
):
    """
    Generate 3 plots: the test and training learning curve, the training
    samples vs fit times curve, the fit times vs score curve.

    Parameters
    ----------
    estimator : estimator instance
        An estimator instance implementing `fit` and `predict` methods which
        will be cloned for each validation.

    title : str
        Title for the chart.

    X : array-like of shape (n_samples, n_features)
        Training vector, where ``n_samples`` is the number of samples and
        ``n_features`` is the number of features.

    y : array-like of shape (n_samples) or (n_samples, n_features)
        Target relative to ``X`` for classification or regression;
        None for unsupervised learning.

    axes : array-like of shape (3,), default=None
        Axes to use for plotting the curves.

    ylim : tuple of shape (2,), default=None
        Defines minimum and maximum y-values plotted, e.g. (ymin, ymax).

    cv : int, cross-validation generator or an iterable, default=None
        Determines the cross-validation splitting strategy.
        Possible inputs for cv are:

          - None, to use the default 5-fold cross-validation,
          - integer, to specify the number of folds.
          - :term:`CV splitter`,
          - An iterable yielding (train, test) splits as arrays of indices.

        For integer/None inputs, if ``y`` is binary or multiclass,
        :class:`StratifiedKFold` used. If the estimator is not a classifier
        or if ``y`` is neither binary nor multiclass, :class:`KFold` is used.

        Refer :ref:`User Guide <cross_validation>` for the various
        cross-validators that can be used here.

    n_jobs : int or None, default=None
        Number of jobs to run in parallel.
        ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
        ``-1`` means using all processors. See :term:`Glossary <n_jobs>`
        for more details.

    train_sizes : array-like of shape (n_ticks,)
        Relative or absolute numbers of training examples that will be used to
        generate the learning curve. If the ``dtype`` is float, it is regarded
        as a fraction of the maximum size of the training set (that is
        determined by the selected validation method), i.e. it has to be within
        (0, 1]. Otherwise it is interpreted as absolute sizes of the training
        sets. Note that for classification the number of samples usually have
        to be big enough to contain at least one sample from each class.
        (default: np.linspace(0.1, 1.0, 5))
    """
    if axes is None:
        fig, axes = plt.subplots(3, 2, figsize=(10, 15))

    axes = axes.reshape(-1)
    fig.subplots_adjust(hspace=0.5, wspace=0.5)
    fig = fig.delaxes(axes[-1])

    axes[0].set_title(title)
    if ylim is not None:
        axes[0].set_ylim(*ylim)
    axes[0].set_xlabel("Training examples")
    axes[0].set_ylabel("Score")

    train_scores_mean = np.mean(train_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)
    test_scores_std = np.std(test_scores, axis=1)
    fit_times_mean = np.mean(fit_times, axis=1)
    fit_times_std = np.std(fit_times, axis=1)
    score_times_mean = np.mean(score_times, axis=1)
    score_times_std = np.std(score_times, axis=1)

    # Plot learning curve
    axes[0].grid()
    axes[0].fill_between(
        train_sizes,
        train_scores_mean - train_scores_std,
        train_scores_mean + train_scores_std,
        alpha=0.1,
        color="r",
    )
    axes[0].fill_between(
        train_sizes,
        test_scores_mean - test_scores_std,
        test_scores_mean + test_scores_std,
        alpha=0.1,
        color="g",
    )
    axes[0].plot(
        train_sizes, train_scores_mean, "o-", color="r", label="Training score"
    )
    axes[0].plot(
        train_sizes, test_scores_mean, "o-", color="g", label="Cross-validation score"
    )
    axes[0].legend(loc="best")

    # Plot n_samples vs fit_times
    axes[1].grid()
    axes[1].plot(train_sizes, fit_times_mean, "o-")
    axes[1].fill_between(
        train_sizes,
        fit_times_mean - fit_times_std,
        fit_times_mean + fit_times_std,
        alpha=0.1,
    )
    axes[1].set_xlabel("Training examples")
    axes[1].set_ylabel("fit_times")
    axes[1].set_title("Scalability of the model")

    # Plot fit_time vs score
    fit_time_argsort = fit_times_mean.argsort()
    fit_time_sorted = fit_times_mean[fit_time_argsort]
    test_scores_mean_sorted = test_scores_mean[fit_time_argsort]
    test_scores_std_sorted = test_scores_std[fit_time_argsort]
    axes[2].grid()
    axes[2].plot(fit_time_sorted, test_scores_mean_sorted, "o-")
    axes[2].fill_between(
        fit_time_sorted,
        test_scores_mean_sorted - test_scores_std_sorted,
        test_scores_mean_sorted + test_scores_std_sorted,
        alpha=0.1,
    )
    axes[2].set_xlabel("fit_times")
    axes[2].set_ylabel("Score")
    axes[2].set_title("Performance of the model")

    # Plot n_samples vs score_times
    axes[3].grid()
    axes[3].plot(train_sizes, score_times_mean, "o-")
    axes[3].fill_between(
        train_sizes,
        score_times_mean - score_times_std,
        score_times_mean + score_times_std,
        alpha=0.1,
    )
    axes[3].set_xlabel("Training examples")
    axes[3].set_ylabel("score_times")
    axes[3].set_title("Scalability of the model")

    # Plot score_time vs score
    score_time_argsort = score_times_mean.argsort()
    score_time_sorted = score_times_mean[score_time_argsort]
    test_scores_mean_sorted = test_scores_mean[score_time_argsort]
    test_scores_std_sorted = test_scores_std[score_time_argsort]
    axes[4].grid()
    axes[4].plot(score_time_sorted, test_scores_mean_sorted, "o-")
    axes[4].fill_between(
        score_time_sorted,
        test_scores_mean_sorted - test_scores_std_sorted,
        test_scores_mean_sorted + test_scores_std_sorted,
        alpha=0.1,
    )
    axes[4].set_xlabel("score_times")
    axes[4].set_ylabel("Score")
    axes[4].set_title("Performance of the model")

    return plt

In [None]:
# The following helper functions are for training and evaluating the model


def show_confusion_matrix(cms, target_names):
    """
    This helper function plots the confusion matrices calculated when evaluating the model.
    """
    fig, ax = plt.subplots(figsize=(4, 4))
    fig.subplots_adjust(hspace=0.3, wspace=0.3)

    gnames = ["True Negative", "False Positive", "False Negative", "True Positive"]
    gcounts = [f"{v:0.0f}" for v in cms.flatten()]
    gpercentages = [f"{v:.2%}" for v in cms.flatten() / np.sum(cms)]
    annot = np.asarray(
        [
            f"{name}\n{count}\n{percentage}"
            for name, count, percentage in zip(gnames, gcounts, gpercentages)
        ]
    ).reshape(2, 2)

    sns.heatmap(
        cms,
        ax=ax,
        annot=annot,
        fmt="",
        cmap="Blues",
        xticklabels=target_names,
        yticklabels=target_names,
    )
    ax.set_ylabel("Actual")
    ax.set_xlabel("Predicted")
    ax.xaxis.tick_top()
    ax.xaxis.set_label_position("top")


def evaluate_model(model, X_test, y_test, sample_weight=None):
    """
    This helper function prints the report and evaluation metrics for the model.
    """
    predictions = model.predict(X_test)

    print("-" * 50)
    print(f"Evaluation metrics for {model.__class__.__name__}")
    print("-" * 50)

    score = model.score(X_test, y_test)
    print(f"{model.__class__.__name__}'s default score metric: {score}")

    print("Classification report")
    print(
        classification_report(
            y_test,
            predictions,
            sample_weight=sample_weight,
            digits=4,
            zero_division=1,
        )
    )

    accuracy = accuracy_score(y_test, predictions, sample_weight=sample_weight)
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1 Macro-Score: {f1_score(y_test, predictions, average='macro'):.4f}")

    print("-" * 50)


def train_and_evaluate(
    X,
    y,
    model,
    params,
    scoring,
    n_iter=None,
    sample_weight=None,
    random_state=42,
):
    """
    Trains the model and prints the evaluation metrics, as well as the confusion matrices, and learning and scalability plots.
    """

    X_train, X_test = X
    y_train, y_test = y

    pipeline = Pipeline([("model", model)])

    scoring = make_scorer(scoring, needs_proba=True)

    if n_iter == None:
        clf = GridSearchCV(pipeline, params, n_jobs=-1, scoring=scoring, verbose=0)
    else:
        clf = RandomizedSearchCV(
            pipeline,
            params,
            n_iter=n_iter,
            scoring=scoring,
            n_jobs=-1,
            random_state=random_state,
            verbose=0,
        )

    train_sizes, train_scores, test_scores, fit_times, score_times = learning_curve(
        clf,
        X_train,
        y_train,
        return_times=True,
        cv=5,
        n_jobs=-1,
        random_state=random_state,
    )

    plot_learning_curve(
        f"Learning curves for {model.__class__.__name__}",
        train_sizes,
        train_scores,
        test_scores,
        fit_times,
        score_times,
    )

    res_clf = clf.fit(X_train, y_train)

    print(f"Best params for {model.__class__.__name__}: {clf.best_params_}")
    evaluate_model(clf, X_test, y_test, sample_weight=sample_weight)

    return res_clf.best_estimator_["model"]


def predict_model(df, X_test, model):
    """
    A more convenient wrapper around train_and_evaluate, albeit less general.
    """

    results = df.loc[df["partition"] != "train", "utt"].copy()
    results["utt_predicted"] = model.predict(X_test)
    results.to_csv(f"output/predictive/{model.__class__.__name__}_results.csv", index=False)

    return model


def train_and_use_model(
    df,
    X,
    y,
    model,
    params,
    scoring="roc_auc",
    n_iter=None,
    sample_weight=None,
    random_state=42,
):
    """
    A more convenient wrapper around train_and_evaluate, albeit less general.
    """

    X_train, X_test = X
    y_train, y_test = y

    clf = train_and_evaluate(
        (X_train, X_test),
        (y_train, y_test),
        model,
        params,
        sample_weight=sample_weight,
        n_iter=n_iter,
        random_state=random_state,
        scoring=scoring,
    )
    return predict_model(df, X_test, clf)