In [None]:
# import standard libraries #
from dataclasses import dataclass
import itertools
import random
import time
from typing import Any

# import 3rd party packages #
from matplotlib import pyplot as plt
import matplotlib.lines
import numpy as np
import scipy.sparse
import sklearn.base
import sklearn.datasets
import sklearn.discriminant_analysis
import sklearn.ensemble
import sklearn.feature_extraction
import sklearn.linear_model
import sklearn.metrics
import sklearn.model_selection
import sklearn.naive_bayes
import sklearn.neural_network
import sklearn.pipeline
import sklearn.preprocessing
import sklearn.tree
import sklearn.compose
import tensorflow_hub
import xgboost

# Specify Training Parameters

In [None]:
@dataclass
class trainConfig:
    """Specifies training process settings"""

    k_cross_valid_folds: int = 10  # number of cross-validation folds
    use_n_cores: int = (
        6  # passed to n_jobs arg of sklearn.model_selection.cross_validate
    )
    train_undersample_frac: float = (
        0.5  # I only used 50% of the training data because I was sick of waiting for models to finish training
    )


training_config: trainConfig = trainConfig()

# Import Data

In [None]:
# import the data #
train_data_dict = sklearn.datasets.fetch_20newsgroups(
    subset="train",
    shuffle=True,
    random_state=69,
    remove=("headers", "footers", "quotes"),
    # return_X_y=True,
)

test_data_dict = sklearn.datasets.fetch_20newsgroups(
    subset="test",
    shuffle=True,
    random_state=69,
    remove=("headers", "footers", "quotes"),
    # return_X_y=True,
)

label_encoder = sklearn.preprocessing.LabelEncoder()

n_train_samples = int(
    len(train_data_dict["data"]) * training_config.train_undersample_frac
)
x_train = train_data_dict["data"][:n_train_samples]
x_test = test_data_dict["data"]

y_labels_train = [
    train_data_dict["target_names"][y_idx]
    for y_idx in train_data_dict["target"][:n_train_samples]
]
y_labels_test = [
    test_data_dict["target_names"][y_idx] for y_idx in test_data_dict["target"]
]

label_encoder.fit(y_labels_train)
y_codes_train = label_encoder.transform(y_labels_train)
y_codes_test = label_encoder.transform(y_labels_test)

del train_data_dict, test_data_dict

In [None]:
# look at a random example #
random_idx = random.randint(0, len(x_train))
print("<", y_labels_train[random_idx], ">")
print(x_train[random_idx])

# Feature Engineering

In [None]:
# specify tokenization strategy for bag-of-words (n-gram) features #
tokenizer = sklearn.feature_extraction.text.CountVectorizer(
    strip_accents="ascii",
    lowercase=True,
    ngram_range=(1, 3),
    max_features=10_000,  # only use the top "max_features" features
    max_df=0.1,  # don't consider n-grams more common that this
)

# create bag-of-words (n-gram) features #
tokenizer.fit(x_train)
x_train_bag_of_words: scipy.sparse._csr.csr_matrix = tokenizer.transform(x_train)
x_test_bag_of_words: scipy.sparse._csr.csr_matrix = tokenizer.transform(x_test)

In [None]:
# load document embedding model #
universal_sentence_encoder = tensorflow_hub.load(
    "https://tfhub.dev/google/universal-sentence-encoder/4"
)

# create document embeddings #
x_train_embeddings: np.ndarray = universal_sentence_encoder(x_train).numpy()
x_test_embeddings: np.ndarray = universal_sentence_encoder(x_test).numpy()

In [None]:
# combine bag-of-words and embeddings into a single dataset #
# this requires the (dense) embeddings to be converted to a sparse matrix #
# (this seemed wiser than converting the very sparse data to dense) #
x_train_embeddings_sparse = scipy.sparse.csr_matrix(x_train_embeddings)
x_train_bagofwords_and_embeddings_sparse = scipy.sparse.hstack(
    (x_train_bag_of_words, x_train_embeddings_sparse)
)

# Specify Experiments to Run

In [None]:
# specify models to fit #
models: dict[str, Any] = {
    "adaboost": sklearn.ensemble.AdaBoostClassifier(),
    "decision_tree": sklearn.tree.DecisionTreeClassifier(),
    "extremely_random_trees": sklearn.ensemble.ExtraTreesClassifier(),
    # "gbm": sklearn.ensemble.GradientBoostingClassifier(), # too slow
    # "hist_gbm": HistGradientBoostingClassifier(), # doesn't support sparse X
    "logistic_regression": sklearn.linear_model.LogisticRegression(
        penalty=None,
        max_iter=1_000,
    ),
    # "naive_bayes": sklearn.naive_bayes.MultinomialNB(), # requires non-negative X
    "neural_net": sklearn.neural_network.MLPClassifier(
        hidden_layer_sizes=(50, 30, 10, 10), activation="relu", max_iter=5_000
    ),
    # "qda": sklearn.discriminant_analysis.QuadraticDiscriminantAnalysis(), # doesn't support sparse X
    "random_forest": sklearn.ensemble.RandomForestClassifier(),
    "xg_boost": xgboost.XGBClassifier(objective="multi:softmax"),
}

models["stacked_ensemble"] = sklearn.ensemble.StackingClassifier(
    estimators=[
        (
            model_name,
            sklearn.base.clone(models[model_name]),
        )
        for model_name in models.keys()
    ],
    final_estimator=xgboost.XGBClassifier(objective="multi:softmax"),
    cv=5,
    stack_method="predict_proba",
    n_jobs=1,
    passthrough=False,  # False=train final_estimator on estimators preds only (i.e. exclude original covariates)
)

# specify datasets to fit #
datasets_to_fit: list[str] = [
    "bag_of_words_only",
    "embeddings_only",
    "bag_of_words_and_embeddings",
]

# list out all model/dataset combinations #
experiments_to_run = tuple(itertools.product(datasets_to_fit, models.keys()))
print("-- All Experiments To Run --")
for experiment_num, experiment_contents in enumerate(experiments_to_run):
    dataset_name, model_name = experiment_contents
    print(
        f"experiment_id=[{experiment_num}]  dataset=[{dataset_name}], model=[{model_name}]"
    )

The metric calculated in each holdout fold in order to evaluate overall model performance on that fold is called [roc_auc_ovr](https://scikit-learn.org/stable/modules/model_evaluation.html).

This metric is generated by calculating a [ROC AUC](https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve) score for each unique class (using a [one-vs-rest](https://scikit-learn.org/stable/auto_examples/model_selection/plot_roc.html#one-vs-rest-multiclass-roc) approach), and then taking a macro-average (unweighted mean) over these scores to obtain a single final [roc_auc_ovr](https://scikit-learn.org/stable/modules/model_evaluation.html) score.

Note that this approach may be a bad idea if there is severe class-imbalance in the data, and can also hide poor model performance on specific classes.

# Run k-Fold Cross Validation

(to identify the best-performing model(s) and feature engineering strategy)

In [None]:
# run k-fold cross-validation on all model/dataset combinations #
k_fold_cv_results: dict[str, Any] = {}
for experiment_num, experiment_contents in enumerate(experiments_to_run):
    start_time = time.perf_counter()
    dataset_name, model_name = experiment_contents
    print(
        f"""
    Running experiment        {experiment_num}
    Dataset:                  {dataset_name}
    Model:                    {model_name} 
"""
    )
    match dataset_name:
        case "bag_of_words_only":
            temp_x: scipy.sparse._csr.csr_matrix = x_train_bag_of_words
        case "embeddings_only":
            temp_x: np.ndarray = x_train_embeddings
        case "bag_of_words_and_embeddings":
            temp_x: scipy.sparse._csr.csr_matrix = (
                x_train_bagofwords_and_embeddings_sparse
            )
    k_fold_cv_results[
        (model_name, dataset_name)
    ] = sklearn.model_selection.cross_validate(
        estimator=sklearn.base.clone(models[model_name]),
        X=temp_x,
        y=y_codes_train,
        scoring="roc_auc_ovr",  # see https://scikit-learn.org/stable/modules/model_evaluation.html
        cv=training_config.k_cross_valid_folds,
        return_train_score=False,
        return_estimator=False,
        n_jobs=training_config.use_n_cores,
    )
    minutes_elapsed = (time.perf_counter() - start_time) / 60
    test_score_per_label = k_fold_cv_results[(model_name, dataset_name)].get(
        "test_score"
    )
    print(
        "   Scores on holdout folds: "
        + f"(mean={np.mean(test_score_per_label):.3f}) (min={min(test_score_per_label):.3f}) (max={max(test_score_per_label):.3f}) scores: "
        + ", ".join([f"{test_score:.2f}" for test_score in test_score_per_label])
    )
    print(f"    ...done ({minutes_elapsed:.2f} minutes)")

In [None]:
# calculate summary statistics for each model/dataset combination cross-validation result #
for key, value in k_fold_cv_results.items():
    value["mean_test_score"] = value["test_score"].mean()
    value["min_test_score"] = value["test_score"].min()
    value["max_test_score"] = value["test_score"].max()

# sort results by mean test score #
k_fold_cv_results = {
    k: v
    for k, v in sorted(
        k_fold_cv_results.items(),
        key=lambda item: item[1].get("mean_test_score"),
        reverse=True,
    )
}

# Plot k-Fold Cross-Validation Performance for each Experiment

In [None]:
plt.figure(figsize=(10, 8))
# draw in each group mean #
plt.scatter(
    y=[f"model={key[0]}, dataset={key[1]}" for key in k_fold_cv_results.keys()],
    x=[value.get("mean_test_score") for value in k_fold_cv_results.values()],
    color="red",
    alpha=0.5,
)
# draw in performance on each fold #
plt.scatter(
    y=tuple(
        itertools.chain.from_iterable(
            [
                [f"model={key[0]}, dataset={key[1]}"] * trainConfig.k_cross_valid_folds
                for key in k_fold_cv_results.keys()
            ]
        )
    ),
    x=tuple(
        itertools.chain.from_iterable(
            [value.get("test_score").tolist() for value in k_fold_cv_results.values()]
        )
    ),
    marker="|",
)
plt.xlabel("Macro-Averaged ROC AUC (OVR)")
plt.grid()
plt.title(
    f"Per-Fold Model Performance (Macro-Averaged ROC AUC (OVR)) ({training_config.k_cross_valid_folds} Folds)"
)
# draw legend #
plt.legend(
    handles=[
        matplotlib.lines.Line2D(
            [0],
            [0],
            marker="o",
            color="red",
            alpha=0.5,
            label="Average (Mean)",
            markerfacecolor="red",
            markersize=10,
        ),
    ]
)

# Train Final Chosen Model

In [None]:
final_model = models.get("stacked_ensemble")
final_model.fit(X=x_train_bagofwords_and_embeddings_sparse, y=y_codes_train)

# Create Prediction Pipeline

(contains both feature engineering and trained model within a single useable/saveable/loadable scikit-learn **pipeline** object)

In [None]:
# create model prediction pipeline #
embedder = sklearn.preprocessing.FunctionTransformer(
    lambda x: universal_sentence_encoder(x), feature_names_out="one-to-one"
)
prediction_pipeline = sklearn.pipeline.Pipeline(
    steps=[
        (
            "feature_prep",
            sklearn.pipeline.FeatureUnion(
                [("bag_of_words", tokenizer), ("document_embedding", embedder)]
            ),
        ),
        ("model", final_model),
    ]
)

# Evaluate Final Model on Test Data

In [None]:
# get predictions on test data #
test_pred_codes = prediction_pipeline.predict(X=x_test)
test_preds_proba = prediction_pipeline.predict_proba(X=x_test)

In [None]:
# plot confusion matrix on test data #
# fig, ax = plt.subplots(figsize=(10, 10))
confusion_matrix: np.ndarray = sklearn.metrics.confusion_matrix(
    y_true=y_codes_test, y_pred=test_pred_codes
)
confusion_matrix_display = sklearn.metrics.ConfusionMatrixDisplay(
    confusion_matrix, display_labels=label_encoder.classes_
)
confusion_matrix_display.plot()
fig = confusion_matrix_display.ax_.get_figure()
fig.set_figwidth(9)
fig.set_figheight(9)
plt.title("Confusion Matrix (Test Set Predictions)")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.xticks(rotation=90)

In [None]:
# plot model test set performance (accuracy) for different confidence thresholds #
# i.e. if we consider model predictions with a low probability value as "model doesn't know",
#       then what accuracy can we achieve over only "high confidence" predictions
#       (and for what proportion of the data do we have "high confidence" predictions)
test_max_pred_proba = test_preds_proba.max(axis=1)
prob_thresholds = np.arange(test_preds_proba.min(), test_preds_proba.max(), 0.01)
test_accuracy: list[float] = []
percent_of_obs: list[float] = []
for prob_thresh in prob_thresholds:
    idx_mask = test_max_pred_proba > prob_thresh
    temp_y_true = y_codes_test[idx_mask]
    temp_y_pred = test_pred_codes[idx_mask]
    test_accuracy.append((temp_y_pred == temp_y_true).sum() / len(temp_y_true))
    percent_of_obs.append(idx_mask.sum() / len(test_max_pred_proba))
plt.figure(figsize=(12, 7))
plt.plot(prob_thresholds, test_accuracy, label="model accuracy")
plt.plot(prob_thresholds, percent_of_obs, label="% of total data")
plt.xticks(np.arange(0.0, 1.1, 0.1))
plt.yticks(np.arange(0.0, 1.1, 0.1))
plt.grid()
plt.legend()
plt.title(
    "Test Set Performance: Model Accuracy under Different Minimum Required Confidence Thresholds"
)
plt.xlabel("Confidence Threshold (Minimum Acceptable Probability of Predicted Label)")
plt.ylabel("Model Accuracy / % of Total Data")