# Setup

This is the source file for classes and functions used throughout the project.

In [5]:
try:
    %load_ext lab_black
except ModuleNotFoundError:
    print("Couldn't load Black autoformatter.")

In [6]:
import numpy as np
from IPython.core.display import display
from numpy import random
import pandas as pd
import os, sys
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.cm

In [7]:
# Where to save the figures
PROJECT_ROOT_DIR = "."
PROJECT_SAVE_DIR = "figs"

if not (os.path.isdir(PROJECT_ROOT_DIR + "/" + PROJECT_SAVE_DIR)):
    print("Figure directory did not exist, creating now.")
    os.mkdir(PROJECT_ROOT_DIR + "/" + PROJECT_SAVE_DIR)
else:
    print("Figure directory exists.")

Figure directory exists.


In [8]:
# Read in target (ENM) model feature data
X_enm = pd.read_csv(
    "./data/ENM-preprocessed-feats.csv", sep="\t", header="infer", index_col=0
)

# Read in source (organics) model feature data
X_source = pd.read_csv(
    "./data/organics-preprocessed-feats.csv", sep="\t", header="infer", index_col=0
)

# Read in ENM labels (maximum_weight_fraction)
y_enm = pd.read_csv("./data/ENM-clean.csv", sep=",", header="infer", usecols=[4])

# Read in organics labels (maximum_weight_fraction)
y_source = pd.read_csv(
    "./data/organics-preprocessed-WF.csv", sep="\t", header="infer", index_col=0
)
y_source.index = X_source.index

# Utility classes, functions

In [5]:
def savepdf(fig, name):
    """Save figures as .pdf files"""

    # Adjust matplotlib settings so text is editable in PDFs
    plt.rcParams["font.family"] = "sans-serif"
    plt.rcParams["font.sans-serif"] = "Helvetica"
    plt.rcParams["pdf.fonttype"] = 42
    plt.rcParams["ps.fonttype"] = 42
    # Save PDF in figure directory
    fig.savefig(
        PROJECT_ROOT_DIR + "/" + PROJECT_SAVE_DIR + "/" + name + ".pdf",
        bbox_inches="tight",
        transparent=True,
    )

In [6]:
def custom_latex(df, file_name, tex_label="INSERT_LABEL", caption="INSERT_CAPTION"):
    """
    Exports DataFrame as text file suitable for LaTeX

    Pre-formatted for the tabularx LaTeX package.
    """

    # Where to save the results
    directory_root = "."
    directory_sub = "results"

    if not (os.path.isdir(directory_root + "/" + directory_sub)):
        print("Results directory did not exist, creating now.")
        os.mkdir(directory_root + "/" + directory_sub)
    else:
        print("Results directory exists.")

    file_path = directory_root + "/" + directory_sub + "/" + file_name + ".txt"
    print(file_path)

    # Write to text file
    with open(file_path, "w") as f:
        f.write("\\begin{table}[htb]\n")
        f.write(
            "\\begin{tabularx}{\\textwidth}{" + " ".join("X" * len(df.columns)) + "}\n"
        )
        f.write("\\toprule\n")
        f.write(
            " & ".join(["\\textbf{" + str(col) + "}" for col in df.columns]) + " \\\\\n"
        )
        f.write("\\midrule\n")
        for i, row in df.iterrows():
            f.write(" & ".join([str(x) for x in row.values]) + " \\\\\n")
        f.write("\\bottomrule\n")
        f.write("\\end{tabularx}\n")
        f.write("\\caption{" + caption + "}\n")
        f.write("\\label{tab:" + tex_label + "}\n")
        f.write("\\end{table}\n")

In [14]:
from scipy.stats import shapiro

def norm_test(data):
    """
    Shapiro-Wilk test for normality
    
    Source:
    https://machinelearningmastery.com/a-gentle-introduction-to-normality-tests-in-python/
    """
    # Normality test
    stat, p = shapiro(data)
    print("Statistics=%.3f, p=%.3f" % (stat, p))
    # Interpret
    alpha = 0.05
    if p > alpha:
        print("Sample looks Gaussian (fail to reject H0)")
    else:
        print("Sample does not look Gaussian (reject H0)")

In [7]:
class HiddenPrints:
    """
    Option to suppress print output.

    Source:
    https://stackoverflow.com/questions/8391411/suppress-calls-to-print-python
    """

    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, "w")

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout

# Pre-processing functions

In [8]:
def bins(row):
    """Assign weight fractions (continuous) to bins (int)

    Class ranges are different from those used by Isaacs et al. 2016.
    """

    if row["maximum_weight_fraction"] <= 0.01:
        val = 0  # low
    elif row["maximum_weight_fraction"] > 0.10:
        val = 2  # high
    else:
        val = 1  # medium
    return val

In [9]:
bin_enm = np.asarray(y_enm.apply(bins, axis=1))
bin_source = np.asarray(y_source.apply(bins, axis=1))

In [13]:
def pca_precheck(X, n_components, thres_btm=0.75, thres_top=0.90):
    """List PCA components for hyperparameterization

    Find a reasonable range of n_components to try during hyperparameterization.
    Returns a list of integers.

    Arguments
    ---------
    X : DataFrame
        Feature data for dimension reduction
    n_components : int
        A generous number of PCA components to test
    thres_btm : float (default=0.75)
        The minimum cumulative explained variance threshold to aim for; a fraction
    thres_top : float (default=0.85)
        The maximum cumulative explained variance threshold to aim for
    """

    from sklearn.preprocessing import MinMaxScaler
    from sklearn import decomposition

    # Scale the data first (e.g., chemical properties) from 0 to 1
    scaler = MinMaxScaler()
    X_scaled = scaler.fit_transform(X)

    # Apply PCA
    pca = decomposition.PCA(n_components=n_components)
    pca.fit(X_scaled)
    cum_evr = np.cumsum(pca.explained_variance_ratio_)
    print(cum_evr)

    # Figure out number of components to achieve desired cumulative explained variance
    component_list = np.where((cum_evr > thres_btm) & (cum_evr < thres_top))[0] + 1
    component_list = component_list.tolist()

    if len(component_list) == 0:
        component_list = [
            next(i[0] for i in enumerate(cum_evr) if i[1] > thres_top) + 1
        ]

    # Plot just to double check / visualize
    fig = plt.figure()
    xi = np.arange(1, n_components + 1, step=1)
    plt.plot(xi, cum_evr, ".-", label="pca")
    plt.plot([0, n_components], [thres_btm, thres_btm], "k", label=thres_btm)
    plt.plot([0, n_components], [thres_top, thres_top], "r", label=thres_top)
    plt.xlabel("Coefficient Number")
    plt.ylabel("Cumulative Explained Variance Ratio")
    plt.grid()
    plt.legend()
    plt.show()

    print(component_list)
    return component_list

# Plot functions

In [10]:
def bar_graph_bins(label_data, data_composition):
    """Bar graph of weight fraction bins

    Create a bar graph of weight fraction bins and print the
    count and frequency for each.

    Arguments
    ---------
    label_data : int array of shape [n,]
        Dataframe containing binned wf data
    data_composition : string
        Describes the chemical composition of label_data
        for use in the plot title; e.g., `ENM`, `Organics`
    """

    import matplotlib.pyplot as plt

    # Find the count, frequency of WF bins
    unique, counts = np.unique(label_data, return_counts=True)
    wf_distrib = dict(zip(unique, counts))
    freq = []
    for i in counts:
        percent = (i / np.sum(counts)).round(2)
        freq.append(percent)
    bin_names = ["low", "medium", "high"]
    if len(unique) == 4:
        bin_names = ["xlow"] + bin_names
    # Plot
    fig, ax = plt.subplots()
    ax.bar(range(len(wf_distrib)), list(wf_distrib.values()), align="center")
    ax.set_xticks(range(len(wf_distrib)))
    ax.set_xticklabels(list(bin_names))
    ax.set_xlabel("Weight fraction bin")
    ax.set_ylabel("Frequency of observations")
    savepdf(fig, "histogram_bins_%s" % data_composition.lower().replace(" ", ""))
    #ax.title("Frequency of %s Weight Fraction Bins" % data_composition)
    plt.show()
    
    print("Label bin: ", unique)
    print("Count    : ", counts)
    print("Frequency: ", freq)

In [16]:
# Function for plotting piecharts
def plot_piechart(
    data, feat_subset_prefix, save_fig_name, figsize=[3, 2.5], labels=None
):

    # Aesthetic settings
    font_body = {"fontsize": 7, "fontname": "Helvetica"}
    plt.rcParams["font.family"] = "sans-serif"
    plt.rcParams["font.sans-serif"] = "Helvetica"
    my_colors = [
        "tab:blue",  # sky blue
        "#E59400",  # orange
        "tab:purple",  # purple-blue
        "#C0504D",  # red
        "tab:olive",
        "#323299",  # dark blue
        "#7a307a",  # violet
        "seagreen",
        "sienna",
        "gold",
    ]

    # Define data and labels for plotting
    feat_subset = [f for f in data.columns if feat_subset_prefix in f]
    if len(feat_subset) == 1:
        pos_labels = np.count_nonzero(data[feat_subset])
        values = [(pos_labels), (len(data[feat_subset]) - pos_labels)]
        if labels is None:
            labels = [feat_subset[0].split("_")[1], "other"]
        else:
            labels = labels

    else:
        values = data[feat_subset].sum(axis=0)
        labels = [f.split("_")[1] for f in feat_subset]

    # Plot
    fig, ax = plt.subplots(figsize=figsize)
    ax.pie(
        x=values,
        autopct="%1.1f%%",
        colors=my_colors,
        labels=labels,
        pctdistance=0.9,
        labeldistance=1.05,
        startangle=90,
        counterclock=False,
        textprops={**font_body},
    )
    ax.axis("equal")
    savepdf(fig, "pie_%s" % save_fig_name.lower().replace(" ", "_").replace(")", ""))
    plt.show()

In [3]:
# Heatmap showing feature correlation (nonparametric)
def correlation_matrix(df, figsize=(5, 5), save_fig_name=None):
    fig = plt.figure(figsize=figsize)
    ax = fig.add_subplot(111)
    cmap = matplotlib.cm.get_cmap("coolwarm", 24)
    cax = ax.imshow(df.corr("spearman"), cmap=cmap, vmin=-1, vmax=1)
    plt.title("Feature Correlation")
    labels = [f.split("_")[1] for f in df.columns.tolist()]
    ax.set_xticks(range(len(df.columns)))
    ax.set_yticks(range(len(df.columns)))
    ax.set_xticklabels(labels, fontsize=8, rotation=90)
    ax.set_yticklabels(labels, fontsize=8)
    fig.colorbar(cax)

    if np.all(save_fig_name != None):
        savepdf(fig, "feature_correlation_%s" % save_fig_name)

    plt.show()

# Modeling classes, functions

In [18]:
from sklearn import model_selection
from sklearn.model_selection import GridSearchCV
from sklearn.preprocessing import MinMaxScaler
import multiprocessing


class EstimatorSelectionHelper:
    """
    Set up grid search across multiple estimators, pipelines; automatically
    performs stratified CV if labels are multiclass.

    By David Bastista:
    http://www.davidsbatista.net/blog/2018/02/23/model_optimization/
    """

    def __init__(self, models, params):
        if not set(models.keys()).issubset(set(params.keys())):
            missing_params = list(set(models.keys()) - set(params.keys()))
            raise ValueError(
                "Some estimators are missing parameters: %s" % missing_params
            )
        self.models = models
        self.params = params
        self.keys = models.keys()
        self.grid_searches = {}

    def fit(self, X, y, cv=5, n_jobs=1, verbose=1, scoring=None, refit=False):
        for key in self.keys:
            print("Running GridSearchCV for %s." % key)
            model = self.models[key]
            params = self.params[key]
            gs = GridSearchCV(
                model,
                params,
                cv=cv,
                n_jobs=n_jobs,
                pre_dispatch=2 * n_jobs,
                verbose=verbose,
                scoring=scoring,
                refit=refit,
                return_train_score=True,
            )
            gs.fit(X, y)
            self.grid_searches[key] = gs

    def score_summary(self, sort_by="mean_score"):
        def row(key, scores, params):
            d = {
                "estimator": key,
                "min_score": min(scores),
                "max_score": max(scores),
                "mean_score": np.mean(scores),
                "std_score": np.std(scores),
            }
            return pd.Series({**params, **d})

        rows = []
        for k in self.grid_searches:
            print(k)
            params = self.grid_searches[k].cv_results_["params"]
            scores = []
            for i in range(self.grid_searches[k].cv):
                key = "split{}_test_score".format(i)
                r = self.grid_searches[k].cv_results_[key]
                scores.append(r.reshape(len(params), 1))

            all_scores = np.hstack(scores)
            for p, s in zip(params, all_scores):
                rows.append((row(k, s, p)))

        df = pd.concat(rows, axis=1).T.sort_values(
            ["mean_score", "max_score"], ascending=[False, False]
        )

        columns = ["estimator", "min_score", "mean_score", "max_score", "std_score"]
        columns = columns + [c for c in df.columns if c not in columns]

        return df[columns]

In [9]:
def plot_conf_matrix(
    cm,
    classes,
    normalize=True,
    showxlabel=False,
    showylabel=False,
    cmap=matplotlib.cm.Blues,
):
    """Print and plot a confusion matrix

    Normalization can be applied by setting `normalize=True`.

    Adapted from:
    http://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html
    """

    import matplotlib.pyplot as plt
    import itertools

    if normalize:
        cm = cm.astype("float") / cm.sum(axis=1)[:, np.newaxis]
        title = "Normalized Confusion Matrix"
    else:
        title = "Confusion Matrix"

    np.set_printoptions(precision=2)
    plt.rc("font", size=12)  # controls default text size
    plt.imshow(cm, interpolation="nearest", cmap=cmap, vmin=0, vmax=1)
    # plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = ".2f" if normalize else "d"
    thresh = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(
            j,
            i,
            format(cm[i, j], fmt),
            horizontalalignment="center",
            color="white" if cm[i, j] > thresh else "black",
        )

    plt.gcf().subplots_adjust(bottom=0.2)
    if showylabel:
        plt.ylabel("True weight fraction")
    if showxlabel:
        plt.xlabel("Predicted weight fraction")

References for implementing the augmentation functions: 
* https://stackoverflow.com/questions/34226400/find-the-index-of-the-k-smallest-values-of-a-numpy-array
* https://stackoverflow.com/questions/22117834/how-do-i-return-a-list-of-the-3-lowest-values-in-another-list
* http://dataaspirant.com/2015/04/11/five-most-popular-similarity-measures-implementation-in-python/
* https://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.cosine_distances.html

In [21]:
import random as pyrandom
from numpy import random
from sklearn.metrics.pairwise import cosine_distances
from sklearn.preprocessing import MinMaxScaler

# Define feature mask for data augmentation
feat_names = X_enm.columns
col_mask = ["cprp" not in name for name in feat_names]


# Functions for different data augmentation methods


def random_augment(k, X_source, y_source, random_state, X, y):
    """Randomly samples source data to pair with target data."""

    if k == 0:
        return X, y

    pyrandom.seed(random_state)
    np.random.seed(random_state)

    # Number of samples to select
    n_samples = k * len(X)
    # Obtain indices for randomly sampling source data
    idx_match = np.random.choice(len(X_source), n_samples)
    # Select matching rows from source data
    X_match = X_source.iloc[idx_match, :]
    y_match = y_source[idx_match]
    # Append sampled source data to target data
    X_aug = np.concatenate((X, X_match))
    y_aug = np.concatenate((y, y_match))
    assert (
        X_aug.shape[0] == y_aug.shape[0]
    ), f"X_aug.shape={X_aug.shape}, y_aug.shape={y_aug.shape}"

    return X_aug, y_aug


def unsupervised_augment(k, X_source, y_source, random_state, X, y):
    """
    Unsupervised data augmentation

    Match "k" most similar source data samples to target data samples
    based on the smallest cosine distance between target and source data
    samples (i.e., in an supervised fashion).
    """

    if k == 0:
        return X, y

    pyrandom.seed(random_state)
    np.random.seed(random_state)

    # Cosine distance matrix using feature mask
    cosdist_samples = cosine_distances(X_source * col_mask, X * col_mask)
    # Loop over distance matrix in search of k-smallest distances
    idx_match = []
    for col in cosdist_samples.T:
        # Find organics data indices of k-smallest distances
        matches = np.argpartition(col, k)[:k]
        idx_match.extend(matches)
    # Select matching rows from source data
    X_match = X_source.iloc[idx_match, :]
    y_match = y_source[idx_match]
    # Append sampled source data to target data
    X_aug = np.concatenate((X, X_match))
    y_aug = np.concatenate((y, y_match))

    return X_aug, y_aug


def supervised_augment(k, X_source, y_source, random_state, X, y):
    """
    Supervised data augmentation

    Match "k" most similar source data samples to target data samples
    based on the smallest average of cosine distance between samples
    and distance between WF labels (i.e., in an supervised fashion).
    """

    if k == 0:
        return X, y

    pyrandom.seed(random_state)
    np.random.seed(random_state)

    # Cosine distance matrix using feature mask
    cosdist_samples = cosine_distances(X_source * col_mask, X * col_mask)
    # For supervised matching augmentation, also consider WF labels
    # Turn 1D label arrays into 2D arrays
    y_2d = np.tile(y, (len(y_source), 1))
    y_source_2d = np.tile(y_source, (len(y), 1)).transpose()
    # Get normalized distance between ENM and organics labels
    scaler = MinMaxScaler()
    dist_y = scaler.fit_transform(np.abs(y_2d - y_source_2d).astype(float))
    # Weighted average distances of features and labels
    dist_matrix = (0.95 * cosdist_samples) + (0.05 * dist_y)
    # Loop over distance matrix in search of k-smallest distances
    idx_match = []
    for col in dist_matrix.T:
        # Find organics data indices of k-smallest distances
        matches = np.argpartition(col, k)[:k]
        idx_match.extend(matches)
    # Select matching rows from source data
    X_match = X_source.iloc[idx_match, :]
    y_match = y_source[idx_match]
    # Append sampled source data to target data
    X_aug = np.concatenate((X, X_match))
    y_aug = np.concatenate((y, y_match))

    return X_aug, y_aug

In [22]:
from sklearn.pipeline import Pipeline
import sys


class AugmentingPipeline(Pipeline):
    def __init__(
        self,
        steps,
        *,
        augmentation_type=None,
        augmentation_k=None,
        augmentation_X_source=None,
        augmentation_y_source=None,
        augmentation_random_state=None,
        **kwargs,
    ):
        self.augmentation_type = augmentation_type
        self.augmentation_k = augmentation_k
        self.augmentation_X_source = augmentation_X_source
        self.augmentation_y_source = augmentation_y_source
        self.augmentation_random_state = augmentation_random_state
        super().__init__(steps, **kwargs)

    def get_params(self, deep=True):
        params = super().get_params(deep=deep)
        return {
            "augmentation_type": self.augmentation_type,
            "augmentation_k": self.augmentation_k,
            "augmentation_X_source": self.augmentation_X_source,
            "augmentation_y_source": self.augmentation_y_source,
            "augmentation_random_state": self.augmentation_random_state,
            **params,
        }

    def set_params(self, **kwargs):
        if "augmentation_type" in kwargs:
            self.augmentation_type = kwargs["augmentation_type"]
        if "augmentation_k" in kwargs:
            self.augmentation_k = kwargs["augmentation_k"]
        if "augmentation_X_source" in kwargs:
            self.augmentation_X_source = kwargs["augmentation_X_source"]
        if "augmentation_y_source" in kwargs:
            self.augmentation_y_source = kwargs["augmentation_y_source"]
        super().set_params(**kwargs)
        return self

    def _augment(self, X, y):
        """Apply specified data augmentation function"""

        return self.augmentation_type(
            self.augmentation_k,
            self.augmentation_X_source,
            self.augmentation_y_source,
            self.augmentation_random_state,
            X,
            y,
        )

    def fit(self, X, y=None, **fit_params):
        # print("X dim", X.shape, file=sys.stderr)
        X, y = self._augment(X, y)  # Apply data augmentation
        # print("X_aug dim", X.shape, file=sys.stderr)
        # import pdb; pdb.set_trace()  # Option to run debugger
        return super().fit(X, y, **fit_params)

    def fit_transform(self, X, y=None, **fit_params):
        """fit_transform but with data augmentation. Only applies to training data."""
        print("fit_transform was called.", file=sys.stderr)
        X, y = self._augment(X, y)  # Apply data augmentation
        return super().fit_transform(X, y, **fit_params)

    def transform(self, X, y=None, **fit_params):
        print("transform was called.", file=sys.stderr)
        return super().transform(X, y, **fit_params)

In [23]:
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import balanced_accuracy_score


def apply_model_opt(
    classifiers,
    params,
    X_target=X_enm,
    y_target=bin_enm,
    cust_folds=None,
    random_state=None,
    n_jobs=3,
):

    """
    Optimize classifier parameters.

    Returns table of performance results across a grid of parameters.
    """

    # Set smallest class size as number of CV folds for leave-one-out CV
    _, class_counts = np.unique(y_target, return_counts=True)
    n_folds = min(class_counts)
    # Ignore n_folds above if custom CV fold is specified
    if cust_folds:
        n_folds = cust_folds
    else:
        n_folds = n_folds

    # Apply
    helper = EstimatorSelectionHelper(classifiers, params)
    helper.fit(
        X_target,
        y_target,
        n_jobs=n_jobs,
        cv=n_folds,
        scoring="balanced_accuracy",
    )
    results = helper.score_summary(sort_by="mean_score")
    results.columns = [col.split("__")[-1] for col in results.columns]
    # results["augmentation_type"] = [str(i).split(" ")[1] for i in results.loc[:, "augmentation_type"]]

    return results.infer_objects()

In [24]:
def model_eval(
    classifier,
    featreducer,
    augmentation_type,
    augmentation_k,
    X=X_enm,
    y=bin_enm,
    X_source=X_source,
    y_source=bin_source,
    random_state=np.arange(100),
    cust_folds=None,
    save_fig_name=None,
    show_conf_matrix=True,
):
    """Fit execute and evaluate a classifier using hyperparameters.

    1) Fit custom model pipeline that performs data augmentation and
    normalization on training data using optimized parameters and
    stratified k-fold cross validation;
    3) Execute optimized model and summarize its accuracy in a confusion
    matrix broken down by WF bins. Formatted confusion matrices are saved as
    .pdf files.

    Arguments
    ---------
    classifier : function
    featreducer: function
    augmentation_type : function
    k : int
        The number of organics samples to match with each ENM sample.
    X : DataFrame (default=X_enm)
        Target feature data.
    y : ndarray (default=bin_enm)
        Target WF bin data
    X_source : DataFrame (default=X_enm)
        Source feature data.
    y_source : ndarray (default=bin_enm)
        Source WF bin data.
    random_state : ndarray (default=np.arange(100))
        Option to set the seed for CV.
    save_fig_name : string (default=None)
        A unique string used at the end of confusion matrix and feature
        importance (rfc-only) file names for exporting the figures as .pdf;
        `None` indicates that no figures should be saved
    show_cnf_matrix : bool (default=True)
        `True` results in matrix graphics being printed as output
    """
    from sklearn.pipeline import Pipeline
    from sklearn.model_selection import StratifiedKFold
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.decomposition import PCA
    from sklearn.svm import SVC
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.metrics import confusion_matrix, balanced_accuracy_score
    import matplotlib.pyplot as plt

    def standard_error(bal_accu):
        """Calculate standard error from an array of balanced accuracies."""
        n = bal_accu.size
        samp_var = np.var(bal_accu, ddof=1)
        return np.sqrt(samp_var / n)

    feat_names = X.columns.values
    X = np.array(X)
    n_b = len(np.unique(y))  # Check number of bins

    # Set smallest class size as number of CV folds for leave-one-out CV
    _, class_counts = np.unique(y, return_counts=True)
    n_folds = min(class_counts)
    # Custom folds override
    if cust_folds:
        n_folds = cust_folds
    else:
        n_folds = n_folds

    augmentation_kwargs = {
        "augmentation_X_source": X_source,
        "augmentation_y_source": y_source,
        "augmentation_type": augmentation_type,
        "augmentation_k": augmentation_k,
    }

    # Placeholders for confusion matrix (cm), feature importance
    cm_cum_state = np.zeros([n_b, n_b])
    arr_norm_state_avg = []
    std_err = []

    for state in random_state:
        # CV settings
        skfold = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=state)
        # Placeholders
        arr_cm_norm_fold = np.zeros([n_folds, n_b, n_b])
        arr_norm_fold_avg = []

        # Fit and run pipeline
        for i, (train_index, test_index) in enumerate(skfold.split(X, y)):
            # Split data
            X_train, y_train = X[train_index], y[train_index]
            X_test, y_test = X[test_index], y[test_index]
            # Pipeline with data augmentation
            pipe = AugmentingPipeline(
                [
                    ("scaler", MinMaxScaler()),
                    ("dimreducer", featreducer),
                    ("estimator", classifier.set_params(random_state=state)),
                ],
                **augmentation_kwargs
            )
            pipe.fit(X_train, y_train)  # TODO: DOUBLE CHECK

            # Write prediction results to confusion matrix
            cm_fold = np.zeros([n_b, n_b])
            cm_fold = confusion_matrix(y_test, pipe.predict(X_test))  # size 3x3
            # Normalize balanced accuracy
            arr_cm_norm_fold[i, :, :] = (
                cm_fold.astype("float") / cm_fold.sum(axis=1)[:, np.newaxis]
            )  # 10x3x3
            # Get balanced average proportion of correct classifications
            arr_norm_fold_avg.append(
                arr_cm_norm_fold[i, :, :].diagonal().mean()
            )  # size 10

        # Accumulate data for each fold (n_fold total) over multiple trials
        cm_cum_state += np.average(arr_cm_norm_fold, axis=0)  # size 3x3
        arr_norm_state_avg.append(np.average(arr_norm_fold_avg))  # size 100
        std_err.append(standard_error(bal_accu=np.array(arr_norm_fold_avg)))  # size 100

    # Average over multiple trials
    cm_avg_all = cm_cum_state / len(random_state)  # size 3x3
    std_err_avg = np.average(np.array(std_err))

    # Average normalized balanced accuracy overall
    bal_accu_avg = cm_avg_all.diagonal().mean()

    # Plot and save normalized confusion matrix, optional feature importance
    fig = plt.figure()
    plot_conf_matrix(cm_avg_all, classes=["low", "mid", "high"])
    if np.all(save_fig_name != None):
        savepdf(fig, "confusion_norm_%s" % save_fig_name)
    if not show_conf_matrix:
        plt.close(fig)

    return arr_norm_state_avg, std_err

In [25]:
def run_hyperparams(
    df_params,
    random_state=np.arange(100),
    X=X_enm,
    y=bin_enm,
    cust_folds=None,
    fig_name_prefix=None,
):
    """
    Apply hyperparameters for model evaluation.

    Relies on `model_eval' function.
    """

    from sklearn.decomposition import PCA

    # Select for highest performing parameters (averaged across 5 folds);
    # note that these are sorted by best mean score
    df_params["augmentation_type"] = [
        str(i).split(" ")[1] for i in df_params.loc[:, "augmentation_type"]
    ]
    feat_subset = ["estimator", "augmentation_k", "augmentation_type"]
    df_params = df_params.drop_duplicates(subset=feat_subset, keep="first")
    df_params = df_params.sort_values(by=feat_subset, ascending=[False, True, True])
    df_params.columns = [col.split("__")[-1] for col in df_params.columns]
    df_params = df_params.reset_index(drop=True)


    # Placeholder lists for output
    df_params[["score_avg", "score_list", "std_err"]] = np.nan
    df_params[["score_avg", "score_list", "std_err"]] = df_params[
        ["score_avg", "score_list", "std_err"]].astype(object)

    for row in df_params.index:
        # Dictionary of classifier parameters, dropping N/A parameter(s)
        # Note: Adjust filter if grid has more/less than 3 classifier parameters
        cls_kwargs = df_params.iloc[row, -6:-3].dropna().to_dict()
        # Define classifier using parameters dictionary
        if df_params.loc[row, "estimator"] == "SVC":
            classifier = SVC(kernel="rbf", class_weight="balanced", **cls_kwargs)
        else:
            classifier = RandomForestClassifier(class_weight="balanced", **cls_kwargs)
        # Define feature reducer
        pca = PCA(n_components=df_params.n_components[row], copy=True)
        # Make sure augmentation_type is a function
        augmentation_type = df_params.augmentation_type[row]
        dispatcher = {
            "random_augment": random_augment,
            "unsupervised_augment": unsupervised_augment,
            "supervised_augment": supervised_augment,
        }
        if isinstance(augmentation_type, str):
            augmentation_type = dispatcher[augmentation_type]
        # Name figures using concatenated strings
        save_fig_name = "_".join(map(str, list(df_params.loc[row, feat_subset])))
        if fig_name_prefix:
            save_fig_name = fig_name_prefix + "_" + save_fig_name
        else: 
            save_fig_name = save_fig_name

        # Make parameter dict
        params = {
            "classifier": classifier,
            "featreducer": pca,
            "augmentation_type": augmentation_type,
            "augmentation_k": df_params.augmentation_k[row],
            "X": X,
            "y": y,
            "random_state": random_state,
            "cust_folds": cust_folds,
            "save_fig_name": save_fig_name,
        }
        score, err = model_eval(**params)
        df_params.score_list[row] = score
        df_params.std_err[row] = err
        df_params.score_avg[row] = np.average(df_params.score_list[row])
    cols_report = [
        "estimator",
        "augmentation_type",
        "augmentation_k",
        "score_avg",
        "score_list",
        "std_err",
    ]

    return df_params[cols_report]

# Export

In [10]:
if __name__ == "__main__":
    !jupyter nbconvert --to script functions.ipynb

[NbConvertApp] Converting notebook functions.ipynb to script
[NbConvertApp] Writing 31716 bytes to functions.py
