diff --git a/.github/workflows/development_CI.yaml b/.github/workflows/development_CI.yaml index e0f18f8..100c71e 100644 --- a/.github/workflows/development_CI.yaml +++ b/.github/workflows/development_CI.yaml @@ -16,10 +16,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.8 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Install dependencies run: | diff --git a/Makefile b/Makefile index b31a1db..3b20397 100644 --- a/Makefile +++ b/Makefile @@ -18,16 +18,14 @@ test-unit: pytest tests @echo 'unit tests OK' -lint: - pylint cobra - @echo 'lint OK' +black-check: + black --diff cobra/ -lint-minimal: - pylint E cobra - @echo 'lint minimal OK' +black: + black cobra/ typecheck: - mypy cobra + mypy cobra --allow-redefinition --allow-untyped-globals --ignore-missing-imports @echo 'typecheck OK' codestyle: @@ -38,4 +36,4 @@ docstyle: pydocstyle cobra @echo 'docstyle OK' -code-qa: typecheck codestyle docstyle lint-minimal +code-qa: typecheck codestyle docstyle diff --git a/cobra/__init__.py b/cobra/__init__.py index 7152555..451287b 100644 --- a/cobra/__init__.py +++ b/cobra/__init__.py @@ -1 +1,3 @@ -from .version import __version__ \ No newline at end of file +"""Cobra module.""" + +from .version import __version__ diff --git a/cobra/evaluation/__init__.py b/cobra/evaluation/__init__.py index 1f8f487..8302ea9 100644 --- a/cobra/evaluation/__init__.py +++ b/cobra/evaluation/__init__.py @@ -1,3 +1,5 @@ +"""The evaluation module includes utils and plots to evaluate a created model.""" + from .pigs_tables import generate_pig_tables from .pigs_tables import compute_pig_table from .pigs_tables import plot_incidence @@ -11,12 +13,14 @@ # from .evaluator import Evaluator from .evaluator import ClassificationEvaluator, RegressionEvaluator -__all__ = ["generate_pig_tables", - "compute_pig_table", - "plot_incidence", - "plot_performance_curves", - "plot_variable_importance", - "plot_univariate_predictor_quality", - "plot_correlation_matrix", - "ClassificationEvaluator", - "RegressionEvaluator"] +__all__ = [ + "generate_pig_tables", + "compute_pig_table", + "plot_incidence", + "plot_performance_curves", + "plot_variable_importance", + "plot_univariate_predictor_quality", + "plot_correlation_matrix", + "ClassificationEvaluator", + "RegressionEvaluator", +] diff --git a/cobra/evaluation/evaluator.py b/cobra/evaluation/evaluator.py index b694a33..5e697e4 100644 --- a/cobra/evaluation/evaluator.py +++ b/cobra/evaluation/evaluator.py @@ -1,4 +1,6 @@ +"""Evaluate the created model.""" +from typing import Any, Union, cast import numpy as np import pandas as pd @@ -25,9 +27,12 @@ from sklearn.metrics import mean_squared_error from sklearn.metrics import r2_score -class ClassificationEvaluator(): - """Evaluator class encapsulating classification model metrics - and plotting functionality. + +DEFAULT_LABELS = ["0", "1"] + + +class ClassificationEvaluator: + """Evaluator class encapsulating classification model metrics and plotting functionality. Attributes ---------- @@ -56,28 +61,26 @@ class ClassificationEvaluator(): (by default 10, so deciles). """ - def __init__(self, - probability_cutoff: float=None, - lift_at: float=0.05, - n_bins: int = 10): - - self.y_true = None - self.y_pred = None + def __init__( + self, probability_cutoff: float = None, lift_at: float = 0.05, n_bins: int = 10 + ): + """Initialize the ClassificationEvaluator.""" + self.y_true: np.ndarray + self.y_pred: np.ndarray self.lift_at = lift_at self.probability_cutoff = probability_cutoff self.n_bins = n_bins # Placeholder to store fitted output - self.scalar_metrics = None - self.roc_curve = None - self.confusion_matrix = None - self.lift_curve = None - self.cumulative_gains = None + self.scalar_metrics: pd.Series + self.roc_curve: dict[str, Any] + self.confusion_matrix: np.ndarray + self.lift_curve: tuple[list[float], list[float], float] + self.cumulative_gains: tuple[np.ndarray, np.ndarray] def fit(self, y_true: np.ndarray, y_pred: np.ndarray): - """Fit the evaluator by computing the relevant evaluation metrics on - the inputs. + """Fit the evaluator by computing the relevant evaluation metrics on the inputs. Parameters ---------- @@ -90,20 +93,21 @@ def fit(self, y_true: np.ndarray, y_pred: np.ndarray): # if probability_cutoff is not set, take the optimal cut-off if not self.probability_cutoff: - self.probability_cutoff = (ClassificationEvaluator. - _compute_optimal_cutoff(fpr, tpr, - thresholds)) + self.probability_cutoff = ClassificationEvaluator._compute_optimal_cutoff( + fpr, tpr, thresholds + ) # Transform probabilities to binary array using cut-off - y_pred_b = np.array([0 if pred <= self.probability_cutoff else 1 - for pred in y_pred]) + y_pred_b = np.array( + [0 if pred <= self.probability_cutoff else 1 for pred in y_pred] + ) # Compute the various evaluation metrics - self.scalar_metrics = ClassificationEvaluator._compute_scalar_metrics( - y_true, - y_pred, - y_pred_b, - self.lift_at + self.scalar_metrics = cast( + pd.Series, + ClassificationEvaluator._compute_scalar_metrics( + y_true, y_pred, y_pred_b, self.lift_at + ), ) self.y_true = y_true @@ -111,16 +115,18 @@ def fit(self, y_true: np.ndarray, y_pred: np.ndarray): self.roc_curve = {"fpr": fpr, "tpr": tpr, "thresholds": thresholds} self.confusion_matrix = confusion_matrix(y_true, y_pred_b) - self.lift_curve = ClassificationEvaluator._compute_lift_per_bin(y_true, y_pred, self.n_bins) - self.cumulative_gains = ClassificationEvaluator._compute_cumulative_gains(y_true, y_pred) + self.lift_curve = ClassificationEvaluator._compute_lift_per_bin( + y_true, y_pred, self.n_bins + ) + self.cumulative_gains = ClassificationEvaluator._compute_cumulative_gains( + y_true, y_pred + ) @staticmethod - def _compute_scalar_metrics(y_true: np.ndarray, - y_pred: np.ndarray, - y_pred_b: np.ndarray, - lift_at: float) -> pd.Series: - """Convenient function to compute various scalar performance measures - and return them in a pd.Series. + def _compute_scalar_metrics( + y_true: np.ndarray, y_pred: np.ndarray, y_pred_b: np.ndarray, lift_at: float + ) -> pd.Series: + """Compute various scalar performance measures. Parameters ---------- @@ -144,21 +150,31 @@ def _compute_scalar_metrics(y_true: np.ndarray, F1 Matthews correlation coefficient Lift at given percentage + + Raises + ---------- + ValueError + The `column_order` and `pig_tables` parameters do not contain + the same set of variables. """ - return pd.Series({ - "accuracy": accuracy_score(y_true, y_pred_b), - "AUC": roc_auc_score(y_true, y_pred), - "precision": precision_score(y_true, y_pred_b), - "recall": recall_score(y_true, y_pred_b), - "F1": f1_score(y_true, y_pred_b, average=None)[1], - "matthews_corrcoef": matthews_corrcoef(y_true, y_pred_b), - "lift at {}".format(lift_at): np.round(ClassificationEvaluator - ._compute_lift(y_true=y_true, - y_pred=y_pred, - lift_at=lift_at), 2) - }) - - def plot_roc_curve(self, path: str=None, dim: tuple=(12, 8)): + return pd.Series( + { + "accuracy": accuracy_score(y_true, y_pred_b), + "AUC": roc_auc_score(y_true, y_pred), + "precision": precision_score(y_true, y_pred_b), + "recall": recall_score(y_true, y_pred_b), + "F1": f1_score(y_true, y_pred_b, average=None)[1], + "matthews_corrcoef": matthews_corrcoef(y_true, y_pred_b), + f"lift at {lift_at}": np.round( + ClassificationEvaluator._compute_lift( + y_true=y_true, y_pred=y_pred, lift_at=lift_at + ), + 2, + ), + } + ) + + def plot_roc_curve(self, path: str = None, dim: tuple = (12, 8)): """Plot ROC curve of the model. Parameters @@ -167,27 +183,40 @@ def plot_roc_curve(self, path: str=None, dim: tuple=(12, 8)): Path to store the figure. dim : tuple, optional Tuple with width and length of the plot. - """ + Raises + ---------- + NotFittedError + The instance is not fitted yet. + """ if self.roc_curve is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) auc = float(self.scalar_metrics.loc["AUC"]) with plt.style.context("seaborn-whitegrid"): - fig, ax = plt.subplots(figsize=dim) - ax.plot(self.roc_curve["fpr"], - self.roc_curve["tpr"], - color="cornflowerblue", linewidth=3, - label="ROC curve (area = {s:.3})".format(s=auc)) - - ax.plot([0, 1], [0, 1], color="darkorange", linewidth=3, - linestyle="--", label="random selection") + ax.plot( + self.roc_curve["fpr"], + self.roc_curve["tpr"], + color="cornflowerblue", + linewidth=3, + label="ROC curve (area = {s:.3})".format(s=auc), + ) + + ax.plot( + [0, 1], + [0, 1], + color="darkorange", + linewidth=3, + linestyle="--", + label="random selection", + ) ax.set_xlabel("False positive rate", fontsize=15) ax.set_ylabel("True positive rate", fontsize=15) ax.legend(loc="lower right") @@ -200,8 +229,9 @@ def plot_roc_curve(self, path: str=None, dim: tuple=(12, 8)): plt.show() - def plot_confusion_matrix(self, path: str=None, dim: tuple=(12, 8), - labels: list=["0", "1"]): + def plot_confusion_matrix( + self, path: str = None, dim: tuple = (12, 8), labels: list = None + ): """Plot the confusion matrix. Parameters @@ -212,29 +242,39 @@ def plot_confusion_matrix(self, path: str=None, dim: tuple=(12, 8), Tuple with width and length of the plot. labels : list, optional Optional list of labels, default "0" and "1". - """ + Raises + ---------- + NotFittedError + The instance is not fitted yet. + """ + labels = labels or DEFAULT_LABELS if self.confusion_matrix is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) - fig, ax = plt.subplots(figsize=dim) - ax = sns.heatmap(self.confusion_matrix, - annot=self.confusion_matrix.astype(str), - fmt="s", cmap="Blues", - xticklabels=labels, yticklabels=labels) + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable + ax = sns.heatmap( + self.confusion_matrix, + annot=self.confusion_matrix.astype(str), + fmt="s", + cmap="Blues", + xticklabels=labels, + yticklabels=labels, + ) ax.set_title("Confusion matrix", fontsize=20) - plt.ylabel('True labels', fontsize=15) - plt.xlabel('Predicted labels', fontsize=15) + plt.ylabel("True labels", fontsize=15) + plt.xlabel("Predicted labels", fontsize=15) if path: plt.savefig(path, format="png", dpi=300, bbox_inches="tight") plt.show() - def plot_cumulative_response_curve(self, path: str=None, dim: tuple=(12, 8)): + def plot_cumulative_response_curve(self, path: str = None, dim: tuple = (12, 8)): """Plot cumulative response curve. Parameters @@ -243,30 +283,40 @@ def plot_cumulative_response_curve(self, path: str=None, dim: tuple=(12, 8)): Path to store the figure. dim : tuple, optional Tuple with width and length of the plot. - """ + Raises + ---------- + NotFittedError + The instance is not fitted yet. + """ if self.lift_curve is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) x_labels, lifts, inc_rate = self.lift_curve - - lifts = np.array(lifts)*inc_rate*100 + lifts = np.array(lifts) * inc_rate * 100 with plt.style.context("seaborn-ticks"): - fig, ax = plt.subplots(figsize=dim) + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable - plt.bar(x_labels[::-1], lifts, align="center", - color="cornflowerblue") + plt.bar(x_labels[::-1], lifts, align="center", color="cornflowerblue") plt.ylabel("Response (%)", fontsize=15) plt.xlabel("Decile", fontsize=15) ax.set_xticks(x_labels) ax.set_xticklabels(x_labels) - plt.axhline(y=inc_rate*100, color="darkorange", linestyle="--", - xmin=0.05, xmax=0.95, linewidth=3, label="incidence") + plt.axhline( + y=inc_rate * 100, + color="darkorange", + linestyle="--", + xmin=0.05, + xmax=0.95, + linewidth=3, + label="incidence", + ) # Legend ax.legend(loc="upper right") @@ -285,7 +335,7 @@ def plot_cumulative_response_curve(self, path: str=None, dim: tuple=(12, 8)): plt.show() - def plot_lift_curve(self, path: str=None, dim: tuple=(12, 8)): + def plot_lift_curve(self, path: str = None, dim: tuple = (12, 8)): """Plot lift per decile. Parameters @@ -294,28 +344,39 @@ def plot_lift_curve(self, path: str=None, dim: tuple=(12, 8)): Path to store the figure. dim : tuple, optional Tuple with width and length of the plot. - """ + Raises + ---------- + NotFittedError + The instance is not fitted yet. + """ if self.lift_curve is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) x_labels, lifts, _ = self.lift_curve with plt.style.context("seaborn-ticks"): - fig, ax = plt.subplots(figsize=dim) + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable - plt.bar(x_labels[::-1], lifts, align="center", - color="cornflowerblue") + plt.bar(x_labels[::-1], lifts, align="center", color="cornflowerblue") plt.ylabel("Lift", fontsize=15) plt.xlabel("Decile", fontsize=15) ax.set_xticks(x_labels) ax.set_xticklabels(x_labels) - plt.axhline(y=1, color="darkorange", linestyle="--", - xmin=0.05, xmax=0.95, linewidth=3, label="baseline") + plt.axhline( + y=1, + color="darkorange", + linestyle="--", + xmin=0.05, + xmax=0.95, + linewidth=3, + label="baseline", + ) # Legend ax.legend(loc="upper right") @@ -334,7 +395,7 @@ def plot_lift_curve(self, path: str=None, dim: tuple=(12, 8)): plt.show() - def plot_cumulative_gains(self, path: str=None, dim: tuple=(12, 8)): + def plot_cumulative_gains(self, path: str = None, dim: tuple = (12, 8)): """Plot cumulative gains per decile. Parameters @@ -344,15 +405,24 @@ def plot_cumulative_gains(self, path: str=None, dim: tuple=(12, 8)): dim : tuple, optional Tuple with width and length of the plot. """ - with plt.style.context("seaborn-whitegrid"): - fig, ax = plt.subplots(figsize=dim) - - ax.plot(self.cumulative_gains[0]*100, self.cumulative_gains[1]*100, - color="cornflowerblue", linewidth=3, - label="cumulative gains") - ax.plot(ax.get_xlim(), ax.get_ylim(), linewidth=3, - ls="--", color="darkorange", label="random selection") + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable + + ax.plot( + self.cumulative_gains[0] * 100, + self.cumulative_gains[1] * 100, + color="cornflowerblue", + linewidth=3, + label="cumulative gains", + ) + ax.plot( + ax.get_xlim(), + ax.get_ylim(), + linewidth=3, + ls="--", + color="darkorange", + label="random selection", + ) ax.set_title("Cumulative Gains curve", fontsize=20) @@ -365,11 +435,11 @@ def plot_cumulative_gains(self, path: str=None, dim: tuple=(12, 8)): # Format ticks ticks_loc_y = ax.get_yticks().tolist() ax.yaxis.set_major_locator(mticker.FixedLocator(ticks_loc_y)) - ax.set_yticklabels(["{:3.0f}%".format(x) for x in ticks_loc_y]) + ax.set_yticklabels([f"{x:3.0f}%" for x in ticks_loc_y]) ticks_loc_x = ax.get_xticks().tolist() ax.xaxis.set_major_locator(mticker.FixedLocator(ticks_loc_x)) - ax.set_xticklabels(["{:3.0f}%".format(x) for x in ticks_loc_x]) + ax.set_xticklabels([f"{x:3.0f}%" for x in ticks_loc_x]) # Legend ax.legend(loc="lower right") @@ -379,10 +449,8 @@ def plot_cumulative_gains(self, path: str=None, dim: tuple=(12, 8)): plt.show() @staticmethod - def _find_optimal_cutoff(y_true: np.ndarray, - y_pred: np.ndarray) -> float: - """Find the optimal probability cut off point for a - classification model. Wrapper around _compute_optimal_cutoff. + def _find_optimal_cutoff(y_true: np.ndarray, y_pred: np.ndarray) -> float: + """Find the optimal probability cut off point for a classification model. Parameters ---------- @@ -396,14 +464,14 @@ def _find_optimal_cutoff(y_true: np.ndarray, float Optimal cut-off probability for the model. """ - return ClassificationEvaluator._compute_optimal_cutoff(roc_curve(y_true=y_true, - y_score=y_pred)) + fpr, tpr, thresholds = roc_curve(y_true=y_true, y_score=y_pred) + return ClassificationEvaluator._compute_optimal_cutoff(fpr, tpr, thresholds) @staticmethod - def _compute_optimal_cutoff(fpr: np.ndarray, tpr: np.ndarray, - thresholds: np.ndarray) -> float: - """Find the optimal probability cut-off point for a - classification model. + def _compute_optimal_cutoff( + fpr: np.ndarray, tpr: np.ndarray, thresholds: np.ndarray + ) -> float: + """Calculate the optimal probability cut-off point for a classification model. The optimal cut-off would be where TPR is high and FPR is low, hence TPR - (1-FPR) should be zero or close to zero for the optimal cut-off. @@ -422,7 +490,7 @@ def _compute_optimal_cutoff(fpr: np.ndarray, tpr: np.ndarray, float Optimal probability cut-off point. """ - temp = np.absolute(tpr - (1-fpr)) + temp = np.absolute(tpr - (1 - fpr)) # index for optimal value is the one for which temp is minimal optimal_index = np.where(temp == min(temp))[0] @@ -430,10 +498,10 @@ def _compute_optimal_cutoff(fpr: np.ndarray, tpr: np.ndarray, return thresholds[optimal_index][0] @staticmethod - def _compute_cumulative_gains(y_true: np.ndarray, - y_pred: np.ndarray) -> tuple: - """Compute cumulative gains of the model, returns percentages and - gains cumulative gains curves. + def _compute_cumulative_gains( + y_true: np.ndarray, y_pred: np.ndarray + ) -> tuple[np.ndarray, np.ndarray]: + """Compute cumulative gains of the model. Code from (https://github.com/reiinakano/scikit-plot/blob/ 2dd3e6a76df77edcbd724c4db25575f70abb57cb/ @@ -451,9 +519,8 @@ def _compute_cumulative_gains(y_true: np.ndarray, tuple With x-labels, and gains. """ - # make y_true a boolean vector - y_true = (y_true == 1) + y_true = y_true == 1 sorted_indices = np.argsort(y_pred)[::-1] y_true = y_true[sorted_indices] @@ -470,11 +537,10 @@ def _compute_cumulative_gains(y_true: np.ndarray, return percentages, gains @staticmethod - def _compute_lift_per_bin(y_true: np.ndarray, - y_pred: np.ndarray, - n_bins: int=10) -> tuple: - """Compute lift of the model for a given number of bins, returns x-labels, - lifts and the target incidence to create cumulative response curves. + def _compute_lift_per_bin( + y_true: np.ndarray, y_pred: np.ndarray, n_bins: int = 10 + ) -> tuple[list[float], list[float], float]: + """Compute lift of the model for a given number of bins. Parameters ---------- @@ -491,20 +557,22 @@ def _compute_lift_per_bin(y_true: np.ndarray, tuple Includes x-labels, lifts per decile, and target incidence. """ + lifts = [ + ClassificationEvaluator._compute_lift( + y_true=y_true, y_pred=y_pred, lift_at=perc_lift + ) + for perc_lift in np.linspace(1 / n_bins, 1, num=n_bins, endpoint=True) + ] - lifts = [ClassificationEvaluator._compute_lift(y_true=y_true, - y_pred=y_pred, - lift_at=perc_lift) - for perc_lift in np.linspace(1/n_bins, 1, num=n_bins, endpoint=True)] + x_labels = [len(lifts) - x for x in np.arange(0, len(lifts), 1)] - x_labels = [len(lifts)-x for x in np.arange(0, len(lifts), 1)] - - return x_labels, lifts, y_true.mean() + return x_labels, lifts, cast(float, y_true.mean()) @staticmethod - def _compute_lift(y_true: np.ndarray, y_pred: np.ndarray, - lift_at: float=0.05) -> float: - """Calculates lift given two arrays on specified level. + def _compute_lift( + y_true: np.ndarray, y_pred: np.ndarray, lift_at: float = 0.05 + ) -> float: + """Calculate lift on a specified level. Parameters ---------- @@ -520,7 +588,6 @@ def _compute_lift(y_true: np.ndarray, y_pred: np.ndarray, float Lift of the model. """ - # Make sure it is numpy array y_true_ = np.array(y_true) y_pred_ = np.array(y_pred) @@ -534,24 +601,20 @@ def _compute_lift(y_true: np.ndarray, y_pred: np.ndarray, # Calculate necessary variables nrows = len(y_data) - stop = int(np.floor(nrows*lift_at)) - avg_incidence = np.einsum("ij->j", y_true_)/float(len(y_true_)) + stop = int(np.floor(nrows * lift_at)) + avg_incidence = np.einsum("ij->j", y_true_) / float(len(y_true_)) # Sort and filter data - data_sorted = (y_data[y_data[:, 1].argsort()[::-1]][:stop, 0] - .reshape(stop, 1)) + data_sorted = y_data[y_data[:, 1].argsort()[::-1]][:stop, 0].reshape(stop, 1) # Calculate lift (einsum is a very fast way of summing, but needs specific shape) - inc_in_top_n = np.einsum("ij->j", data_sorted)/float(len(data_sorted)) - - lift = np.round(inc_in_top_n/avg_incidence, 2)[0] - + inc_in_top_n = np.einsum("ij->j", data_sorted) / float(len(data_sorted)) + lift = np.round(inc_in_top_n / avg_incidence, 2)[0] return lift -class RegressionEvaluator(): - """Evaluator class encapsulating regression model metrics - and plotting functionality. +class RegressionEvaluator: + """Evaluator class encapsulating regression model metrics and plotting functionality. Attributes ---------- @@ -566,7 +629,7 @@ class RegressionEvaluator(): """ def __init__(self): - + """Initialize the RegressionEvaluator.""" self.y_true = None self.y_pred = None @@ -575,8 +638,7 @@ def __init__(self): self.qq = None def fit(self, y_true: np.ndarray, y_pred: np.ndarray): - """Fit the evaluator by computing the relevant evaluation metrics on - the inputs. + """Fit the evaluator by computing the relevant evaluation metrics on the inputs. Parameters ---------- @@ -586,7 +648,9 @@ def fit(self, y_true: np.ndarray, y_pred: np.ndarray): Model scores. """ # Compute the various evaluation metrics - self.scalar_metrics = RegressionEvaluator._compute_scalar_metrics(y_true, y_pred) + self.scalar_metrics = RegressionEvaluator._compute_scalar_metrics( + y_true, y_pred + ) self.y_true = y_true self.y_pred = y_pred @@ -595,10 +659,8 @@ def fit(self, y_true: np.ndarray, y_pred: np.ndarray): self.qq = RegressionEvaluator._compute_qq_residuals(y_true, y_pred) @staticmethod - def _compute_scalar_metrics(y_true: np.ndarray, - y_pred: np.ndarray) -> pd.Series: - """Convenient function to compute various scalar performance measures - and return them in a pd.Series. + def _compute_scalar_metrics(y_true: np.ndarray, y_pred: np.ndarray) -> pd.Series: + """Compute various scalar performance measures. Parameters ---------- @@ -616,18 +678,18 @@ def _compute_scalar_metrics(y_true: np.ndarray, Mean squared error (expected value of the quadratic error) Root mean squared error (sqrt of expected value of the quadratic error) """ - return pd.Series({ - "R2": r2_score(y_true, y_pred), - "MAE": mean_absolute_error(y_true, y_pred), - "MSE": mean_squared_error(y_true, y_pred), - "RMSE": sqrt(mean_squared_error(y_true, y_pred)) - }) + return pd.Series( + { + "R2": r2_score(y_true, y_pred), + "MAE": mean_absolute_error(y_true, y_pred), + "MSE": mean_squared_error(y_true, y_pred), + "RMSE": sqrt(mean_squared_error(y_true, y_pred)), + } + ) @staticmethod - def _compute_qq_residuals(y_true: np.ndarray, - y_pred: np.ndarray) -> pd.Series: - """Convenience function to compute various scalar performance measures - and return them in a pd.Series. + def _compute_qq_residuals(y_true: np.ndarray, y_pred: np.ndarray) -> pd.Series: + """Compute various scalar performance measures. Parameters ---------- @@ -641,24 +703,28 @@ def _compute_qq_residuals(y_true: np.ndarray, pd.Series Theoretical quantiles and associated actual residuals. """ - ## also possible directly via statsmodels.api.qqplot() + # also possible directly via statsmodels.api.qqplot() n = len(y_true) df = pd.DataFrame({"res": sorted((y_true - y_pred))}) # ascending order m, s = df["res"].mean(), df["res"].std() - df["z_res"] = df["res"].apply(lambda x: (x-m)/s) - df["rank"] = df.index+1 - df["percentile"] = df["rank"].apply(lambda x: x/(n+1)) # divide by n+1 to avoid inf + df["z_res"] = df["res"].apply(lambda x: (x - m) / s) + df["rank"] = df.index + 1 + df["percentile"] = df["rank"].apply( + lambda x: x / (n + 1) + ) # divide by n+1 to avoid inf df["q_theoretical"] = norm.ppf(df["percentile"]) - return pd.Series({ - "quantiles": df["q_theoretical"].values, - "residuals": df["z_res"].values, - }) + return pd.Series( + { + "quantiles": df["q_theoretical"].values, + "residuals": df["z_res"].values, + } + ) - def plot_predictions(self, path: str=None, dim: tuple=(12, 8)): + def plot_predictions(self, path: str = None, dim: tuple = (12, 8)): """Plot predictions from the model against actual values. Parameters @@ -667,21 +733,30 @@ def plot_predictions(self, path: str=None, dim: tuple=(12, 8)): Path to store the figure. dim : tuple, optional Tuple with width and length of the plot. + + Raises + ---------- + NotFittedError + The instance is not fitted yet. """ if self.y_true is None and self.y_pred is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) + raise NotFittedError(msg.format(self.__class__.__name__)) y_true = self.y_true y_pred = self.y_pred with plt.style.context("seaborn-whitegrid"): + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable - fig, ax = plt.subplots(figsize=dim) - - x = np.arange(1, len(y_true)+1) + x = np.arange(1, len(y_true) + 1) - ax.plot(x, y_true, ls="--", label="actuals", color="darkorange", linewidth=3) + ax.plot( + x, y_true, ls="--", label="actuals", color="darkorange", linewidth=3 + ) ax.plot(x, y_pred, label="predictions", color="cornflowerblue", linewidth=3) ax.set_xlabel("Index", fontsize=15) @@ -694,7 +769,7 @@ def plot_predictions(self, path: str=None, dim: tuple=(12, 8)): plt.show() - def plot_qq(self, path: str=None, dim: tuple=(12, 8)): + def plot_qq(self, path: str = None, dim: tuple = (12, 8)): """Display a Q-Q plot from the standardized prediction residuals. Parameters @@ -703,29 +778,36 @@ def plot_qq(self, path: str=None, dim: tuple=(12, 8)): Path to store the figure. dim : tuple, optional Tuple with width and length of the plot. - """ + Raises + ---------- + NotFittedError + The instance is not fitted yet. + """ if self.qq is None: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = "This {} instance is not fitted yet. Call 'fit' with appropriate arguments before using this method." raise NotFittedError(msg.format(self.__class__.__name__)) with plt.style.context("seaborn-whitegrid"): - - fig, ax = plt.subplots(figsize=dim) + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable x = self.qq["quantiles"] y = self.qq["residuals"] - ax.plot(x, x, ls="--", label="perfect model", color="darkorange", linewidth=3) + ax.plot( + x, x, ls="--", label="perfect model", color="darkorange", linewidth=3 + ) ax.plot(x, y, label="current model", color="cornflowerblue", linewidth=3) ax.set_xlabel("Theoretical quantiles", fontsize=15) - ax.set_xticks(range(int(np.floor(min(x))), int(np.ceil(max(x[x < float("inf")])))+1, 1)) + ax.set_xticks( + range(int(np.floor(min(x))), int(np.ceil(max(x[x < float("inf")]))) + 1) + ) ax.set_ylabel("Standardized residuals", fontsize=15) - ax.set_yticks(range(int(np.floor(min(y))), int(np.ceil(max(y[x < float("inf")])))+1, 1)) + ax.set_yticks( + range(int(np.floor(min(y))), int(np.ceil(max(y[x < float("inf")]))) + 1) + ) ax.legend(loc="best") ax.set_title("Q-Q plot", fontsize=20) @@ -733,4 +815,4 @@ def plot_qq(self, path: str=None, dim: tuple=(12, 8)): if path: plt.savefig(path, format="png", dpi=300, bbox_inches="tight") - plt.show() \ No newline at end of file + plt.show() diff --git a/cobra/evaluation/pigs_tables.py b/cobra/evaluation/pigs_tables.py index 4c58eaa..1fc58b9 100644 --- a/cobra/evaluation/pigs_tables.py +++ b/cobra/evaluation/pigs_tables.py @@ -1,3 +1,4 @@ +"""Create Predictor Insight Graph tables.""" import pandas as pd import matplotlib.pyplot as plt @@ -5,12 +6,15 @@ import numpy as np from matplotlib.ticker import FuncFormatter -import cobra.utils as utils +from cobra import utils -def generate_pig_tables(basetable: pd.DataFrame, - id_column_name: str, - target_column_name: str, - preprocessed_predictors: list) -> pd.DataFrame: + +def generate_pig_tables( + basetable: pd.DataFrame, + id_column_name: str, + target_column_name: str, + preprocessed_predictors: list, +) -> pd.DataFrame: """Compute PIG tables for all predictors in preprocessed_predictors. The output is a DataFrame with columns ``variable``, ``label``, @@ -34,10 +38,7 @@ def generate_pig_tables(basetable: pd.DataFrame, DataFrame containing a PIG table for all predictors. """ pigs = [ - compute_pig_table(basetable, - column_name, - target_column_name, - id_column_name) + compute_pig_table(basetable, column_name, target_column_name, id_column_name) for column_name in sorted(preprocessed_predictors) if column_name not in [id_column_name, target_column_name] ] @@ -45,10 +46,12 @@ def generate_pig_tables(basetable: pd.DataFrame, return output -def compute_pig_table(basetable: pd.DataFrame, - predictor_column_name: str, - target_column_name: str, - id_column_name: str) -> pd.DataFrame: +def compute_pig_table( + basetable: pd.DataFrame, + predictor_column_name: str, + target_column_name: str, + id_column_name: str, +) -> pd.DataFrame: """Compute the PIG table of a given predictor for a given target. Parameters @@ -72,33 +75,42 @@ def compute_pig_table(basetable: pd.DataFrame, # group by the binned variable, compute the incidence # (= mean of the target for the given bin) and compute the bin size # (e.g. COUNT(id_column_name)). After that, rename the columns - res = (basetable.groupby(predictor_column_name) - .agg({target_column_name: "mean", id_column_name: "size"}) - .reset_index() - .rename(columns={predictor_column_name: "label", - target_column_name: "avg_target", - id_column_name: "pop_size"})) + res = ( + basetable.groupby(predictor_column_name) + .agg({target_column_name: "mean", id_column_name: "size"}) + .reset_index() + .rename( + columns={ + predictor_column_name: "label", + target_column_name: "avg_target", + id_column_name: "pop_size", + } + ) + ) # add the column name to a variable column # add the average incidence # replace population size by a percentage of total population res["variable"] = utils.clean_predictor_name(predictor_column_name) res["global_avg_target"] = global_avg_target - res["pop_size"] = res["pop_size"]/len(basetable.index) + res["pop_size"] = res["pop_size"] / len(basetable.index) # make sure to always return the data with the proper column order - column_order = ["variable", "label", "pop_size", - "global_avg_target", "avg_target"] + column_order = ["variable", "label", "pop_size", "global_avg_target", "avg_target"] return res[column_order] -def plot_incidence(pig_tables: pd.DataFrame, - variable: str, - model_type: str, - column_order: list=None, - dim: tuple=(12, 8)): - """Plots a Predictor Insights Graph (PIG), a graph in which the mean +def plot_incidence( + pig_tables: pd.DataFrame, + variable: str, + model_type: str, + column_order: list = None, + dim: tuple = (12, 8), +): + """Plot a Predictor Insights Graph (PIG). + + A PIG is a graph in which the mean target value is plotted for a number of bins constructed from a predictor variable. When the target is a binary classification target, the plotted mean target value is a true incidence rate. @@ -120,28 +132,36 @@ def plot_incidence(pig_tables: pd.DataFrame, on the PIG. dim: tuple, default=(12, 8) Optional tuple to configure the width and length of the plot. + + Raises + ---------- + ValueError + The `column_order` and `pig_tables` parameters do not contain + the same set of variables. """ if model_type not in ["classification", "regression"]: - raise ValueError("An unexpected value was set for the model_type " - "parameter. Expected 'classification' or " - "'regression'.") + raise ValueError( + "An unexpected value was set for the model_type " + "parameter. Expected 'classification' or " + "'regression'." + ) - df_plot = pig_tables[pig_tables['variable'] == variable].copy() + df_plot = pig_tables[pig_tables["variable"] == variable].copy() if column_order is not None: - if not set(df_plot['label']) == set(column_order): + if not set(df_plot["label"]) == set(column_order): raise ValueError( - 'The column_order and pig_tables parameters do not contain ' - 'the same set of variables.') + "The column_order and pig_tables parameters do not contain " + "the same set of variables." + ) - df_plot['label'] = df_plot['label'].astype('category') - df_plot['label'].cat.reorder_categories(column_order, - inplace=True) + df_plot["label"] = df_plot["label"].astype("category") + df_plot["label"].cat.reorder_categories(column_order, inplace=True) - df_plot.sort_values(by=['label'], ascending=True, inplace=True) + df_plot.sort_values(by=["label"], ascending=True, inplace=True) df_plot.reset_index(inplace=True) else: - df_plot.sort_values(by=['avg_target'], ascending=False, inplace=True) + df_plot.sort_values(by=["avg_target"], ascending=False, inplace=True) df_plot.reset_index(inplace=True) with plt.style.context("seaborn-ticks"): @@ -150,35 +170,49 @@ def plot_incidence(pig_tables: pd.DataFrame, # -------------------------- # Left axis - average target # -------------------------- - ax.plot(df_plot['label'], df_plot['avg_target'], - color="#00ccff", marker=".", - markersize=20, linewidth=3, - label='incidence rate per bin' if model_type == "classification" else "mean target value per bin", - zorder=10) - - ax.plot(df_plot['label'], df_plot['global_avg_target'], - color="#022252", linestyle='--', linewidth=4, - label='average incidence rate' if model_type == "classification" else "global mean target value", - zorder=10) + ax.plot( + df_plot["label"], + df_plot["avg_target"], + color="#00ccff", + marker=".", + markersize=20, + linewidth=3, + label="incidence rate per bin" + if model_type == "classification" + else "mean target value per bin", + zorder=10, + ) + + ax.plot( + df_plot["label"], + df_plot["global_avg_target"], + color="#022252", + linestyle="--", + linewidth=4, + label="average incidence rate" + if model_type == "classification" + else "global mean target value", + zorder=10, + ) # Dummy line to have label on second axis from first - ax.plot(np.nan, "#939598", linewidth=6, label='bin size') + ax.plot(np.nan, "#939598", linewidth=6, label="bin size") # Set labels & ticks - ax.set_ylabel('Incidence' if model_type == "classification" else "Mean target value", - fontsize=16) - ax.set_xlabel("Bins", fontsize=15) + ax.set_ylabel( + "incidence" if model_type == "classification" else "mean target value", + fontsize=16, + ) + ax.set_xlabel(f"{variable} bins" "", fontsize=16) ax.xaxis.set_tick_params(labelsize=14) - plt.setp(ax.get_xticklabels(), - rotation=45, ha="right", rotation_mode="anchor") + plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor") ax.yaxis.set_tick_params(labelsize=14) if model_type == "classification": # Mean target values are between 0 and 1 (target incidence rate), # so format them as percentages - ax.set_yticks(np.arange(0, max(df_plot['avg_target'])+0.05, 0.05)) - ax.yaxis.set_major_formatter( - FuncFormatter(lambda y, _: '{:.1%}'.format(y))) + ax.set_yticks(np.arange(0, max(df_plot["avg_target"]) + 0.05, 0.05)) + ax.yaxis.set_major_formatter(FuncFormatter(lambda y, _: f"{y:.1%}")) elif model_type == "regression": # If the difference between the highest avg. target of all bins # versus the global avg. target AND the difference between the @@ -190,40 +224,52 @@ def plot_incidence(pig_tables: pd.DataFrame, # the bins and versus the global avg. target. # (Motivation for the AND above: if on one end there IS enough # difference, the effect that we discuss here does not occur.) - global_avg_target = max(df_plot['global_avg_target']) # series of same number, for every bin. - if ((np.abs((max(df_plot['avg_target']) - global_avg_target)) / global_avg_target < 0.25) - and (np.abs((min(df_plot['avg_target']) - global_avg_target)) / global_avg_target < 0.25)): - ax.set_ylim(global_avg_target * 0.75, - global_avg_target * 1.25) + global_avg_target = max( + df_plot["global_avg_target"] + ) # series of same number, for every bin. + if ( + np.abs((max(df_plot["avg_target"]) - global_avg_target)) + / global_avg_target + < 0.25 + ) and ( + np.abs((min(df_plot["avg_target"]) - global_avg_target)) + / global_avg_target + < 0.25 + ): + ax.set_ylim(global_avg_target * 0.75, global_avg_target * 1.25) # Remove ticks but keep the labels - ax.tick_params(axis='both', which='both', length=0) - ax.tick_params(axis='y', colors="#00ccff") - ax.yaxis.label.set_color('#00ccff') + ax.tick_params(axis="both", which="both", length=0) + ax.tick_params(axis="y", colors="#00ccff") + ax.yaxis.label.set_color("#00ccff") # ----------------- # Right Axis - bins # ----------------- ax2 = ax.twinx() - ax2.bar(df_plot['label'], df_plot['pop_size'], - align='center', color="#939598", zorder=1) + ax2.bar( + df_plot["label"], + df_plot["pop_size"], + align="center", + color="#939598", + zorder=1, + ) # Set labels & ticks ax2.set_xlabel("Bins", fontsize=15) ax2.xaxis.set_tick_params(rotation=45, labelsize=14) ax2.yaxis.set_tick_params(labelsize=14) - ax2.yaxis.set_major_formatter( - FuncFormatter(lambda y, _: '{:.1%}'.format(y))) - ax2.set_ylabel('Population size', fontsize=15) - ax2.tick_params(axis='y', colors="#939598") - ax2.yaxis.label.set_color('#939598') + ax2.yaxis.set_major_formatter(FuncFormatter(lambda y, _: "{:.1%}".format(y))) + ax2.set_ylabel("Population size", fontsize=15) + ax2.tick_params(axis="y", colors="#939598") + ax2.yaxis.label.set_color("#939598") # Despine & prettify sns.despine(ax=ax, right=True, left=True) sns.despine(ax=ax2, left=True, right=False) - ax2.spines['right'].set_color('white') + ax2.spines["right"].set_color("white") ax2.grid(False) @@ -234,9 +280,15 @@ def plot_incidence(pig_tables: pd.DataFrame, title = "Mean target plot" fig.suptitle(title, fontsize=20) plt.title(variable, fontsize=17) - ax.legend(frameon=False, bbox_to_anchor=(0., 1.01, 1., .102), - loc=3, ncol=1, mode="expand", borderaxespad=0., - prop={"size": 14}) + ax.legend( + frameon=False, + bbox_to_anchor=(0.0, 1.01, 1.0, 0.102), + loc=3, + ncol=1, + mode="expand", + borderaxespad=0.0, + prop={"size": 14}, + ) # Set order of layers ax.set_zorder(1) diff --git a/cobra/evaluation/plotting_utils.py b/cobra/evaluation/plotting_utils.py index 8cac03c..cc7d59b 100644 --- a/cobra/evaluation/plotting_utils.py +++ b/cobra/evaluation/plotting_utils.py @@ -1,14 +1,20 @@ +"""Collection of plotting utils.""" # third party imports +from typing import cast import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns -def plot_univariate_predictor_quality(df_metric: pd.DataFrame, - dim: tuple=(12, 8), - path: str=None): + +DEFAULT_COLOURS = {"train": "#0099bf", "selection": "#ff9500", "validation": "#8064a2"} + + +def plot_univariate_predictor_quality( + df_metric: pd.DataFrame, dim: tuple = (12, 8), path: str = None +): """Plot univariate quality of the predictors. Parameters @@ -22,7 +28,6 @@ def plot_univariate_predictor_quality(df_metric: pd.DataFrame, path : str, optional Path to store the figure. """ - if "AUC selection" in df_metric.columns: metric = "AUC" ascending = False @@ -30,17 +35,21 @@ def plot_univariate_predictor_quality(df_metric: pd.DataFrame, metric = "RMSE" ascending = True - df = (df_metric[df_metric["preselection"]] - .sort_values(by=metric+" selection", ascending=ascending)) + df = df_metric[df_metric["preselection"]].sort_values( + by=metric + " selection", ascending=ascending + ) - df = pd.melt(df, id_vars=["predictor"], - value_vars=[metric+" train", metric+" selection"], - var_name="split", - value_name=metric) + df = pd.melt( + df, + id_vars=["predictor"], + value_vars=[metric + " train", metric + " selection"], + var_name="split", + value_name=metric, + ) # plot data with plt.style.context("seaborn-ticks"): - fig, ax = plt.subplots(figsize=dim) + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable ax = sns.barplot(x=metric, y="predictor", hue="split", data=df) ax.set_title("Univariate predictor quality", fontsize=20) @@ -60,9 +69,10 @@ def plot_univariate_predictor_quality(df_metric: pd.DataFrame, plt.show() -def plot_correlation_matrix(df_corr: pd.DataFrame, - dim: tuple=(12, 8), - path: str=None): + +def plot_correlation_matrix( + df_corr: pd.DataFrame, dim: tuple = (12, 8), path: str = None +): """Plot correlation matrix of the predictors. Parameters @@ -83,15 +93,15 @@ def plot_correlation_matrix(df_corr: pd.DataFrame, plt.show() -def plot_performance_curves(model_performance: pd.DataFrame, - dim: tuple=(12, 8), - path: str=None, - colors: dict={"train": "#0099bf", - "selection": "#ff9500", - "validation": "#8064a2"}, - metric_name: str=None): - """Plot performance curves generated by the forward feature selection - for the train-selection-validation sets. + +def plot_performance_curves( + model_performance: pd.DataFrame, + dim: tuple = (12, 8), + path: str = None, + colors: dict = None, + metric_name: str = None, +): + """Plot performance curves for the train-selection-validation sets. Parameters ---------- @@ -109,7 +119,7 @@ def plot_performance_curves(model_performance: pd.DataFrame, Defaults to RMSE in case of regression and AUC in case of classification. """ - + colors = colors or DEFAULT_COLOURS model_type = model_performance["model_type"][0] if metric_name is None: @@ -117,29 +127,50 @@ def plot_performance_curves(model_performance: pd.DataFrame, metric_name = "AUC" elif model_type == "regression": metric_name = "RMSE" + metric_name = cast(str, metric_name) - max_metric = np.round(max(max(model_performance['train_performance']), - max(model_performance['selection_performance']), - max(model_performance['validation_performance'])), 1) + max_metric = np.round( + max( + max(model_performance["train_performance"]), + max(model_performance["selection_performance"]), + max(model_performance["validation_performance"]), + ), + 1, + ) with plt.style.context("seaborn-whitegrid"): - fig, ax = plt.subplots(figsize=dim) - plt.plot(model_performance['train_performance'], marker=".", - markersize=20, linewidth=3, label="train", - color=colors["train"]) - plt.plot(model_performance['selection_performance'], marker=".", - markersize=20, linewidth=3, label="selection", - color=colors["selection"]) - plt.plot(model_performance['validation_performance'], marker=".", - markersize=20, linewidth=3, label="validation", - color=colors["validation"]) + plt.plot( + model_performance["train_performance"], + marker=".", + markersize=20, + linewidth=3, + label="train", + color=colors["train"], + ) + plt.plot( + model_performance["selection_performance"], + marker=".", + markersize=20, + linewidth=3, + label="selection", + color=colors["selection"], + ) + plt.plot( + model_performance["validation_performance"], + marker=".", + markersize=20, + linewidth=3, + label="validation", + color=colors["validation"], + ) # Set x- and y-ticks - ax.set_xticks(np.arange(len(model_performance['last_added_predictor']))) - ax.set_xticklabels(model_performance['last_added_predictor'].tolist(), - rotation=40, ha='right') + ax.set_xticks(np.arange(len(model_performance["last_added_predictor"]))) + ax.set_xticklabels( + model_performance["last_added_predictor"].tolist(), rotation=40, ha="right" + ) if model_type == "classification": ax.set_yticks(np.arange(0.5, max_metric + 0.02, 0.05)) @@ -147,24 +178,26 @@ def plot_performance_curves(model_performance: pd.DataFrame, # In regression, the scale of the y-axis can largely vary depending # on the dataset, it is easier to just set the y-axis bounds, # but not the tick distance. - ax.set_ylim(0, max_metric*1.1) + ax.set_ylim(0, max_metric * 1.1) # Make pretty - ax.legend(loc='lower right') - fig.suptitle('Performance curves forward feature selection', - fontsize=20) - plt.title("Metric: "+metric_name, fontsize=15, loc="left") - plt.ylabel('Model performance', fontsize=15) + ax.legend(loc="lower right") + fig.suptitle("Performance curves forward feature selection", fontsize=20) + plt.title("Metric: " + metric_name, fontsize=15, loc="left") + plt.ylabel("Model performance") if path is not None: plt.savefig(path, format="png", dpi=300, bbox_inches="tight") plt.show() -def plot_variable_importance(df_variable_importance: pd.DataFrame, - title: str=None, - dim: tuple=(12, 8), - path: str=None): + +def plot_variable_importance( + df_variable_importance: pd.DataFrame, + title: str = None, + dim: tuple = (12, 8), + path: str = None, +): """Plot variable importance of a given model. Parameters @@ -179,10 +212,13 @@ def plot_variable_importance(df_variable_importance: pd.DataFrame, Path to store the figure. """ with plt.style.context("seaborn-ticks"): - fig, ax = plt.subplots(figsize=dim) - ax = sns.barplot(x="importance", y="predictor", - data=df_variable_importance, - color="cornflowerblue") + fig, ax = plt.subplots(figsize=dim) # pylint: disable=unused-variable + ax = sns.barplot( + x="importance", + y="predictor", + data=df_variable_importance, + color="cornflowerblue", + ) if title: ax.set_title(title, fontsize=20) else: @@ -190,8 +226,8 @@ def plot_variable_importance(df_variable_importance: pd.DataFrame, # Make pretty axis sns.despine(ax=ax, right=True) - plt.ylabel('Predictor', fontsize=15) - plt.xlabel('Importance', fontsize=15) + plt.ylabel("Predictor", fontsize=15) + plt.xlabel("Importance", fontsize=15) # Remove white lines from the second axis ax.grid(False) diff --git a/cobra/model_building/__init__.py b/cobra/model_building/__init__.py index 7a646c3..c4d2a89 100644 --- a/cobra/model_building/__init__.py +++ b/cobra/model_building/__init__.py @@ -1,3 +1,5 @@ +"""This module includes utils to calculate the best features.""" + from .univariate_selection import compute_univariate_preselection from .univariate_selection import get_preselected_predictors from .univariate_selection import compute_correlations @@ -5,9 +7,11 @@ from .models import LogisticRegressionModel, LinearRegressionModel from .forward_selection import ForwardFeatureSelection -__all__ = ['compute_univariate_preselection', - 'get_preselected_predictors', - 'compute_correlations', - 'LogisticRegressionModel', - 'LinearRegressionModel', - 'ForwardFeatureSelection'] +__all__ = [ + "compute_univariate_preselection", + "get_preselected_predictors", + "compute_correlations", + "LogisticRegressionModel", + "LinearRegressionModel", + "ForwardFeatureSelection", +] diff --git a/cobra/model_building/forward_selection.py b/cobra/model_building/forward_selection.py index 29e06b3..1733616 100644 --- a/cobra/model_building/forward_selection.py +++ b/cobra/model_building/forward_selection.py @@ -1,6 +1,7 @@ +"""Feature forward selection.""" import logging -from typing import Callable, Optional +from typing import Callable, Optional, Set, Union, cast import pandas as pd from tqdm.auto import tqdm @@ -9,9 +10,16 @@ log = logging.getLogger(__name__) + +DEFAULT_SPLIT_NAMES = ["train", "selection", "validation"] +DEFAULT_FORCED_PREDICTORS = [] +DEFAULT_EXCLUDED_PREDICTORS = [] + +Model = Union[LinearRegressionModel, LogisticRegressionModel] + + class ForwardFeatureSelection: - """Perform forward feature selection for a given dataset using a given - algorithm. + """Perform forward feature selection for a given dataset using a given algorithm. Predictors are sequentially added to the model, starting with the one that has the highest univariate predictive power, and then proceeding with those that @@ -35,11 +43,13 @@ class ForwardFeatureSelection: List of fitted models. """ - def __init__(self, - model_type: str="classification", - max_predictors: int=50, - pos_only: bool=True): - + def __init__( + self, + model_type: str = "classification", + max_predictors: int = 50, + pos_only: bool = True, + ): + """Initialize the ForwardFeatureSelection class.""" self.model_type = model_type if model_type == "classification": self.MLModel = LogisticRegressionModel @@ -49,9 +59,9 @@ def __init__(self, self.max_predictors = max_predictors self.pos_only = pos_only - self._fitted_models = [] + self._fitted_models: list[Model] = [] - def get_model_from_step(self, step: int): + def get_model_from_step(self, step: int) -> Model: """Get fitted model from a particular step. Parameters @@ -70,19 +80,25 @@ def get_model_from_step(self, step: int): In case step is larger than the number of available models. """ if len(self._fitted_models) <= step: - raise ValueError(f"No model available for step {step}. " - "The first step starts from index 0.") + raise ValueError( + f"No model available for step {step}. " + "The first step starts from index 0." + ) return self._fitted_models[step] - def compute_model_performances(self, data: pd.DataFrame, - target_column_name: str, - splits: list=["train", "selection", "validation"], - metric: Optional[Callable]=None, - ) -> pd.DataFrame: - """Compute for each model the performance for different sets (e.g. - train-selection-validation) and return them along with a list of - predictors used in the model. Note that the computation of the + def compute_model_performances( + self, + data: pd.DataFrame, + target_column_name: str, + splits: list = None, + metric: Optional[Callable] = None, + ) -> pd.DataFrame: + """ + Compute for each model the performance for different sets. + + Different sets could be cross validation, train-selection-validation, ... + Note that the computation of the performance for each split is cached inside the model itself, so it is inexpensive to perform it multiple times! @@ -94,7 +110,7 @@ def compute_model_performances(self, data: pd.DataFrame, Name of the target column. splits : list, optional List of splits to compute performance on. - metric: Callable (function), optional + metric : Callable (function), optional Function that computes an evaluation metric to evaluate the model's performances, instead of the default metric (AUC for classification, RMSE for regression). @@ -108,31 +124,32 @@ def compute_model_performances(self, data: pd.DataFrame, Contains for each model the performance for train, selection and validation sets as well as the set of predictors used in this model. """ + splits = splits or DEFAULT_SPLIT_NAMES results = [] - predictor_set = set([]) + predictor_set: Set[str] = set() for model in self._fitted_models: - last_added_predictor = (set(model.predictors) - .difference(predictor_set)) + last_added_predictor = set(model.predictors).difference(predictor_set) tmp = { "predictors": model.predictors, - "last_added_predictor": list(last_added_predictor)[0] + "last_added_predictor": list(last_added_predictor)[0], } # Evaluate model on each dataset split, # e.g. train-selection-validation - tmp.update({ - f"{split}_performance": model.evaluate( - data[data["split"] == split], - data[data["split"] == split][target_column_name], - split=split, # parameter used for caching - metric=metric - ) - for split in splits - }) + tmp.update( + { + f"{split}_performance": model.evaluate( + data[data["split"] == split], + data[data["split"] == split][target_column_name], + split=split, # parameter used for caching + metric=metric, + ) + for split in splits + } + ) results.append(tmp) - predictor_set = predictor_set.union(set(model.predictors)) df = pd.DataFrame(results) @@ -140,9 +157,14 @@ def compute_model_performances(self, data: pd.DataFrame, return df - def fit(self, train_data: pd.DataFrame, target_column_name: str, - predictors: list, forced_predictors: list=[], - excluded_predictors: list=[]): + def fit( + self, + train_data: pd.DataFrame, + target_column_name: str, + predictors: list, + forced_predictors: list = None, + excluded_predictors: list = None, + ): """Fit the forward feature selection estimator. Parameters @@ -168,42 +190,63 @@ def fit(self, train_data: pd.DataFrame, target_column_name: str, In case the number of forced predictors is larger than the maximum number of allowed predictors in the model. """ - - assert "split" in train_data.columns, "The train_data input df does not include a split column." - assert len(set(["train", "selection"]).difference(set(train_data["split"].unique()))) == 0, \ - "The train_data input df does not include a 'train' and 'selection' split." + assert ( + "split" in train_data.columns + ), "The train_data input df does not include a split column." + assert ( + len( + set(["train", "selection"]).difference( + set(train_data["split"].unique()) + ) + ) + == 0 + ), "The train_data input df does not include a 'train' and 'selection' split." # remove excluded predictors from predictor lists - filtered_predictors = [var for var in predictors - if (var not in excluded_predictors and - var not in forced_predictors)] + forced_predictors = forced_predictors or DEFAULT_FORCED_PREDICTORS + excluded_predictors = excluded_predictors or DEFAULT_EXCLUDED_PREDICTORS + filtered_predictors = [ + var + for var in predictors + if (var not in excluded_predictors and var not in forced_predictors) + ] # checks on predictor lists and self.max_predictors attr if len(forced_predictors) > self.max_predictors: - raise ValueError("Size of forced_predictors cannot be bigger than " - "max_predictors.") + raise ValueError( + "Size of forced_predictors cannot be bigger than " "max_predictors." + ) elif len(forced_predictors) == self.max_predictors: - log.info("Size of forced_predictors equals max_predictors " - "only one model will be trained...") + log.info( + "Size of forced_predictors equals max_predictors " + "only one model will be trained..." + ) # train model with all forced_predictors (only) - (self._fitted_models - .append(self._train_model(train_data[train_data["split"] == "train"], - target_column_name, - forced_predictors))) + self._fitted_models.append( + self._train_model( + train_data[train_data["split"] == "train"], + target_column_name, + forced_predictors, + ) + ) + else: - self._fitted_models = self._forward_selection(train_data, - target_column_name, - filtered_predictors, - forced_predictors) - - def _forward_selection(self, - train_data: pd.DataFrame, - target_column_name: str, - predictors: list, - forced_predictors: list = []) -> list: - """Perform the forward feature selection algorithm to compute a list - of models (with increasing performance). The length of the list, - i.e. the number of models, is bounded by the max_predictors class + self._fitted_models = self._forward_selection( + train_data, target_column_name, filtered_predictors, forced_predictors + ) + + def _forward_selection( + self, + train_data: pd.DataFrame, + target_column_name: str, + predictors: list, + forced_predictors: list = None, + ) -> list[Model]: + """Perform the forward feature selection algorithm. + + The algorithm will compute a list of models (with increasing performance). + The length of the list, i.e. the number of models, + is bounded by the max_predictors class attribute. Parameters @@ -223,32 +266,38 @@ def _forward_selection(self, List of fitted models where the index of the list indicates the number of predictors minus one (as indices start from 0). """ - fitted_models = [] - current_predictors = [] + forced_predictors = forced_predictors or DEFAULT_FORCED_PREDICTORS + fitted_models: list[Model] = [] + current_predictors: list[str] = [] - max_steps = 1 + min(self.max_predictors, - len(predictors) + len(forced_predictors)) + max_steps = 1 + min( + self.max_predictors, len(predictors) + len(forced_predictors) + ) - for step in tqdm(range(1, max_steps), desc="Sequentially adding best " - "predictor..."): + for step in tqdm( + range(1, max_steps), desc="Sequentially adding best predictor..." + ): if step <= len(forced_predictors): # first, we go through the forced predictors - candidate_predictors = [var for var in forced_predictors - if var not in current_predictors] + candidate_predictors = [ + var for var in forced_predictors if var not in current_predictors + ] else: - candidate_predictors = [var for var in (predictors - + forced_predictors) - if var not in current_predictors] + candidate_predictors = [ + var + for var in (predictors + forced_predictors) + if var not in current_predictors + ] - model = self._find_next_best_model(train_data, - target_column_name, - candidate_predictors, - current_predictors) + model = self._find_next_best_model( + train_data, target_column_name, candidate_predictors, current_predictors + ) if model is not None: # Add new model predictors to the list of current predictors - current_predictors = list(set(current_predictors) - .union(set(model.predictors))) + current_predictors = list( + set(current_predictors).union(set(model.predictors)) + ) fitted_models.append(model) # else: @@ -262,12 +311,17 @@ def _forward_selection(self, return fitted_models - def _find_next_best_model(self, - train_data: pd.DataFrame, - target_column_name: str, - candidate_predictors: list, - current_predictors: list): - """Given a list of current predictors which are already selected to + def _find_next_best_model( + self, + train_data: pd.DataFrame, + target_column_name: str, + candidate_predictors: list, + current_predictors: list, + ) -> Model: + """ + Find the next best model with candidate predictors. + + Given a list of current predictors which are already selected to be include in the model, find amongst a list candidate predictors the predictor to add to the selected list so that the resulting model has the best performance. @@ -287,50 +341,68 @@ def _find_next_best_model(self, ------- self.MLModel Best performing model. + + Raises + ---------- + ValueError + The `column_order` and `pig_tables` parameters do not contain + the same set of variables. """ # placeholders best_model = None if self.MLModel == LogisticRegressionModel: - best_performance = -1 # AUC metric is used + best_performance = -1.0 # AUC metric is used elif self.MLModel == LinearRegressionModel: best_performance = float("inf") # RMSE metric is used else: - raise ValueError("No metric comparison method has been configured " - "for the given model_type specified as " - "ForwardFeatureSelection argument.") - - fit_data = train_data[train_data["split"] == "train"] # data to fit the models with - sel_data = train_data[train_data["split"] == "selection"] # data to compare the models with + raise ValueError( + "No metric comparison method has been configured " + "for the given model_type specified as " + "ForwardFeatureSelection argument." + ) + + fit_data = train_data[ + train_data["split"] == "train" + ] # data to fit the models with + sel_data = train_data[ + train_data["split"] == "selection" + ] # data to compare the models with for pred in candidate_predictors: # Train a model with an additional predictor - model = self._train_model(fit_data, target_column_name, - (current_predictors + [pred])) + model = self._train_model( + fit_data, target_column_name, (current_predictors + [pred]) + ) # Evaluate the model - performance = (model - .evaluate(sel_data[current_predictors + [pred]], - sel_data[target_column_name], - split="selection")) + performance = model.evaluate( + sel_data[current_predictors + [pred]], + sel_data[target_column_name], + split="selection", + ) if self.pos_only and (not (model.get_coef() >= 0).all()): continue # Check if the model is better than the current best model # and if it is, replace the current best. - if self.MLModel == LogisticRegressionModel \ - and performance > best_performance: # AUC metric is used + if ( + self.MLModel == LogisticRegressionModel + and performance > best_performance + ): # AUC metric is used best_performance = performance best_model = model - elif self.MLModel == LinearRegressionModel \ - and performance < best_performance: # RMSE metric is used + elif ( + self.MLModel == LinearRegressionModel and performance < best_performance + ): # RMSE metric is used best_performance = performance best_model = model - return best_model + return cast(Model, best_model) - def _train_model(self, train_data: pd.DataFrame, target_column_name: str, - predictors: list): + def _train_model( + self, train_data: pd.DataFrame, target_column_name: str, predictors: list + ) -> Model: """Train the model with a given set of predictors. Parameters diff --git a/cobra/model_building/models.py b/cobra/model_building/models.py index 3a921c0..0da6f04 100644 --- a/cobra/model_building/models.py +++ b/cobra/model_building/models.py @@ -1,3 +1,4 @@ +"""Contains all types of models supported by Cobra.""" from typing import Callable, Optional @@ -5,17 +6,20 @@ import numpy as np import pandas as pd from scipy import stats -from sklearn.metrics import roc_auc_score, mean_squared_error +from sklearn.metrics import mean_squared_error, roc_auc_score, roc_curve from numpy import sqrt from sklearn.linear_model import LogisticRegression, LinearRegression -from sklearn.metrics import roc_curve # custom imports import cobra.utils as utils from cobra.evaluation import ClassificationEvaluator + class LogisticRegressionModel: - """Wrapper around the LogisticRegression class, with additional methods + """ + Cobra's LogisticRegression model. + + Wrapper around the LogisticRegression class, with additional methods implemented such as evaluation (using AUC), getting a list of coefficients, a dictionary of coefficients per predictor, ... for convenience. @@ -28,8 +32,10 @@ class LogisticRegressionModel: """ def __init__(self): - self.logit = LogisticRegression(fit_intercept=True, C=1e9, - solver='liblinear', random_state=42) + """Initialize the LogisticRegressionModel class.""" + self.logit = LogisticRegression( + fit_intercept=True, C=1e9, solver="liblinear", random_state=42 + ) self._is_fitted = False # placeholder to keep track of a list of predictors self.predictors = [] @@ -47,16 +53,18 @@ def serialize(self) -> dict: "meta": "logistic-regression", "predictors": self.predictors, "_eval_metrics_by_split": self._eval_metrics_by_split, - "params": self.logit.get_params() + "params": self.logit.get_params(), } if self._is_fitted: - serialized_model.update({ - "classes_": self.logit.classes_.tolist(), - "coef_": self.logit.coef_.tolist(), - "intercept_": self.logit.intercept_.tolist(), - "n_iter_": self.logit.n_iter_.tolist(), - }) + serialized_model.update( + { + "classes_": self.logit.classes_.tolist(), + "coef_": self.logit.coef_.tolist(), + "intercept_": self.logit.intercept_.tolist(), + "n_iter_": self.logit.n_iter_.tolist(), + } + ) return serialized_model @@ -73,7 +81,6 @@ def deserialize(self, model_dict: dict): ValueError In case JSON file is no valid serialized model. """ - if not self._is_valid_dict(model_dict): raise ValueError("No valid serialized model") @@ -86,8 +93,8 @@ def deserialize(self, model_dict: dict): self.predictors = model_dict["predictors"] self._eval_metrics_by_split = model_dict["_eval_metrics_by_split"] - def get_coef(self) -> np.array: - """Returns the model coefficients. + def get_coef(self) -> np.ndarray: + """Return the model coefficients. Returns ------- @@ -97,7 +104,7 @@ def get_coef(self) -> np.array: return self.logit.coef_[0] def get_intercept(self) -> float: - """Returns the intercept of the model. + """Return the intercept of the model. Returns ------- @@ -107,7 +114,7 @@ def get_intercept(self) -> float: return self.logit.intercept_[0] def get_coef_by_predictor(self) -> dict: - """Returns a dictionary mapping predictor (key) to coefficient (value). + """Return a dictionary mapping predictor (key) to coefficient (value). Returns ------- @@ -147,10 +154,17 @@ def score_model(self, X: pd.DataFrame) -> np.ndarray: # ensure we have the proper predictors and the proper order return self.logit.predict_proba(X[self.predictors])[:, 1] - def evaluate(self, X: pd.DataFrame, y: pd.Series, - split: str=None, - metric: Optional[Callable]=None) -> float: - """Evaluate the model on a given dataset (X, y). The optional split + def evaluate( + self, + X: pd.DataFrame, + y: pd.Series, + split: str = None, + metric: Optional[Callable] = None, + ) -> float: + """ + Evaluate the model on a given dataset (X, y). + + The optional split parameter is to indicate that the dataset belongs to (train, selection, validation), so that the computation on these sets can be cached! @@ -179,7 +193,9 @@ def evaluate(self, X: pd.DataFrame, y: pd.Series, y_pred = self.score_model(X) fpr, tpr, thresholds = roc_curve(y_true=y, y_score=y_pred) - cutoff = (ClassificationEvaluator._compute_optimal_cutoff(fpr, tpr, thresholds)) + cutoff = ClassificationEvaluator._compute_optimal_cutoff( + fpr, tpr, thresholds + ) y_pred_b = np.array([0 if pred <= cutoff else 1 for pred in y_pred]) performance = metric(y_true=y, y_pred=y_pred_b) @@ -198,8 +214,7 @@ def evaluate(self, X: pd.DataFrame, y: pd.Series, return self._eval_metrics_by_split[split] def compute_variable_importance(self, data: pd.DataFrame) -> pd.DataFrame: - """Compute the importance of each predictor in the model and return - it as a DataFrame. + """Compute the importance of each predictor in the model. Parameters ---------- @@ -211,28 +226,25 @@ def compute_variable_importance(self, data: pd.DataFrame) -> pd.DataFrame: pd.DataFrame DataFrame containing columns predictor and importance. """ - y_pred = self.score_model(data) importance_by_variable = { utils.clean_predictor_name(predictor): stats.pearsonr( - data[predictor], - y_pred - )[0] + data[predictor], y_pred + )[0] for predictor in self.predictors } - df = pd.DataFrame.from_dict(importance_by_variable, - orient="index").reset_index() + df = pd.DataFrame.from_dict( + importance_by_variable, orient="index" + ).reset_index() df.columns = ["predictor", "importance"] - return (df.sort_values(by="importance", ascending=False) - .reset_index(drop=True)) + return df.sort_values(by="importance", ascending=False).reset_index(drop=True) def _is_valid_dict(self, model_dict: dict) -> bool: - - if ("meta" not in model_dict - or model_dict["meta"] != "logistic-regression"): + """Check if the model dictionary is valid.""" + if "meta" not in model_dict or model_dict["meta"] != "logistic-regression": return False attr = ["classes_", "coef_", "intercept_", "n_iter_", "predictors"] @@ -240,15 +252,17 @@ def _is_valid_dict(self, model_dict: dict) -> bool: if not (key in model_dict or type(model_dict[key]) != list): return False - if ("params" not in model_dict - or "_eval_metrics_by_split" not in model_dict): + if "params" not in model_dict or "_eval_metrics_by_split" not in model_dict: return False return True class LinearRegressionModel: - """Wrapper around the LinearRegression class, with additional methods + """ + Cobra's LinearRegression model. + + Wrapper around the LinearRegression class, with additional methods implemented such as evaluation (using RMSE), getting a list of coefficients, a dictionary of coefficients per predictor, ... for convenience. @@ -278,14 +292,16 @@ def serialize(self) -> dict: "meta": "linear-regression", "predictors": self.predictors, "_eval_metrics_by_split": self._eval_metrics_by_split, - "params": self.linear.get_params() + "params": self.linear.get_params(), } if self._is_fitted: - serialized_model.update({ - "coef_": self.linear.coef_.tolist(), - "intercept_": self.linear.intercept_.tolist() - }) + serialized_model.update( + { + "coef_": self.linear.coef_.tolist(), + "intercept_": self.linear.intercept_.tolist(), + } + ) return serialized_model @@ -302,7 +318,6 @@ def deserialize(self, model_dict: dict): ValueError In case JSON file is no valid serialized model. """ - if not self._is_valid_dict(model_dict): raise ValueError("No valid serialized model") @@ -313,8 +328,8 @@ def deserialize(self, model_dict: dict): self.predictors = model_dict["predictors"] self._eval_metrics_by_split = model_dict["_eval_metrics_by_split"] - def get_coef(self) -> np.array: - """Returns the model coefficients. + def get_coef(self) -> np.ndarray: + """Return the model coefficients. Returns ------- @@ -324,7 +339,7 @@ def get_coef(self) -> np.array: return self.linear.coef_ def get_intercept(self) -> float: - """Returns the intercept of the model. + """Return the intercept of the model. Returns ------- @@ -334,7 +349,7 @@ def get_intercept(self) -> float: return self.linear.intercept_[0] def get_coef_by_predictor(self) -> dict: - """Returns a dictionary mapping predictor (key) to coefficient (value). + """Return a dictionary mapping predictor (key) to coefficient (value). Returns ------- @@ -374,10 +389,16 @@ def score_model(self, X: pd.DataFrame) -> np.ndarray: # ensure we have the proper predictors and the proper order return self.linear.predict(X[self.predictors]) - def evaluate(self, X: pd.DataFrame, y: pd.Series, - split: str=None, - metric: Optional[Callable]=None) -> float: - """Evaluate the model on a given dataset (X, y). The optional split + def evaluate( + self, + X: pd.DataFrame, + y: pd.Series, + split: str = None, + metric: Optional[Callable] = None, + ) -> float: + """Evaluate the model on a given dataset (X, y). + + The optional split parameter is to indicate that the dataset belongs to (train, selection, validation), so that the computation on these sets can be cached! @@ -414,14 +435,12 @@ def evaluate(self, X: pd.DataFrame, y: pd.Series, if split is None: return performance - else: - self._eval_metrics_by_split[split] = performance + self._eval_metrics_by_split[split] = performance return self._eval_metrics_by_split[split] def compute_variable_importance(self, data: pd.DataFrame) -> pd.DataFrame: - """Compute the importance of each predictor in the model and return - it as a DataFrame. + """Compute the importance of each predictor in the model. Parameters ---------- @@ -433,37 +452,34 @@ def compute_variable_importance(self, data: pd.DataFrame) -> pd.DataFrame: pd.DataFrame DataFrame containing columns predictor and importance. """ - y_pred = self.score_model(data) importance_by_variable = { utils.clean_predictor_name(predictor): stats.pearsonr( - data[predictor], - y_pred - )[0] + data[predictor], y_pred + )[0] for predictor in self.predictors } - df = pd.DataFrame.from_dict(importance_by_variable, - orient="index").reset_index() + df = pd.DataFrame.from_dict( + importance_by_variable, orient="index" + ).reset_index() df.columns = ["predictor", "importance"] - return (df.sort_values(by="importance", ascending=False) - .reset_index(drop=True)) - - def _is_valid_dict(self, model_dict: dict) -> bool: + return df.sort_values(by="importance", ascending=False).reset_index(drop=True) - if ("meta" not in model_dict - or model_dict["meta"] != "linear-regression"): + @staticmethod + def _is_valid_dict(model_dict: dict) -> bool: + """Check if the model dictionary is valid.""" + if "meta" not in model_dict or model_dict["meta"] != "linear-regression": return False attr = ["coef_", "intercept_", "predictors"] for key in attr: - if not (key in model_dict or type(model_dict[key]) != list): + if not (key in model_dict or not isinstance(model_dict[key], list)): return False - if ("params" not in model_dict - or "_eval_metrics_by_split" not in model_dict): + if "params" not in model_dict or "_eval_metrics_by_split" not in model_dict: return False return True diff --git a/cobra/model_building/univariate_selection.py b/cobra/model_building/univariate_selection.py index 2db4abb..d6c1901 100644 --- a/cobra/model_building/univariate_selection.py +++ b/cobra/model_building/univariate_selection.py @@ -1,20 +1,24 @@ - +"""Calculate the univariate quality of predictors.""" import pandas as pd from sklearn.metrics import roc_auc_score, mean_squared_error from numpy import sqrt import cobra.utils as utils -def compute_univariate_preselection(target_enc_train_data: pd.DataFrame, - target_enc_selection_data: pd.DataFrame, - predictors: list, - target_column: str, - model_type: str = "classification", - preselect_auc_threshold: float = 0.053, - preselect_rmse_threshold: float = 5, - preselect_overtrain_threshold: float = 0.05 - ) -> pd.DataFrame: - """Perform a preselection of predictors based on an AUC (in case of + +def compute_univariate_preselection( + target_enc_train_data: pd.DataFrame, + target_enc_selection_data: pd.DataFrame, + predictors: list, + target_column: str, + model_type: str = "classification", + preselect_auc_threshold: float = 0.053, + preselect_rmse_threshold: float = 5, + preselect_overtrain_threshold: float = 0.05, +) -> pd.DataFrame: + """Perform a preselection of predictors. + + The preselection is based on an AUC (in case of classification) or a RMSE (in case of regression) threshold of a univariate model on a train and selection dataset and return a DataFrame containing for each variable the train and selection AUC or RMSE along with a @@ -71,15 +75,21 @@ def compute_univariate_preselection(target_enc_train_data: pd.DataFrame, auc_train = roc_auc_score( y_true=target_enc_train_data[target_column], - y_score=target_enc_train_data[predictor]) + y_score=target_enc_train_data[predictor], + ) auc_selection = roc_auc_score( y_true=target_enc_selection_data[target_column], - y_score=target_enc_selection_data[predictor]) + y_score=target_enc_selection_data[predictor], + ) - result.append({"predictor": cleaned_predictor, - "AUC train": auc_train, - "AUC selection": auc_selection}) + result.append( + { + "predictor": cleaned_predictor, + "AUC train": auc_train, + "AUC selection": auc_selection, + } + ) df_auc = pd.DataFrame(result) @@ -88,28 +98,40 @@ def compute_univariate_preselection(target_enc_train_data: pd.DataFrame, # Identify those variables for which the AUC difference between train # and selection is within a user-defined ratio - auc_overtrain = ((df_auc["AUC train"] - df_auc["AUC selection"]) - < preselect_overtrain_threshold) + preselect_overtrain = df_auc["AUC train"] - df_auc["AUC selection"] + auc_overtrain = preselect_overtrain < preselect_overtrain_threshold df_auc["preselection"] = auc_thresh & auc_overtrain - df_out = df_auc.sort_values(by="AUC selection", ascending=False).reset_index(drop=True) + df_out = df_auc.sort_values(by="AUC selection", ascending=False).reset_index( + drop=True + ) elif model_type == "regression": for predictor in predictors: cleaned_predictor = utils.clean_predictor_name(predictor) - rmse_train = sqrt(mean_squared_error( - y_true=target_enc_train_data[target_column], - y_pred=target_enc_train_data[predictor])) - - rmse_selection = sqrt(mean_squared_error( - y_true=target_enc_selection_data[target_column], - y_pred=target_enc_selection_data[predictor])) - - result.append({"predictor": cleaned_predictor, - "RMSE train": rmse_train, - "RMSE selection": rmse_selection}) + rmse_train = sqrt( + mean_squared_error( + y_true=target_enc_train_data[target_column], + y_pred=target_enc_train_data[predictor], + ) + ) + + rmse_selection = sqrt( + mean_squared_error( + y_true=target_enc_selection_data[target_column], + y_pred=target_enc_selection_data[predictor], + ) + ) + + result.append( + { + "predictor": cleaned_predictor, + "RMSE train": rmse_train, + "RMSE selection": rmse_selection, + } + ) df_rmse = pd.DataFrame(result) @@ -118,17 +140,22 @@ def compute_univariate_preselection(target_enc_train_data: pd.DataFrame, # Identify those variables for which the RMSE difference between train # and selection is within a user-defined ratio - rmse_overtrain = ((df_rmse["RMSE selection"] - df_rmse["RMSE train"]) # flip subtraction vs. AUC - < preselect_overtrain_threshold) + preselect_overtrain = ( + df_rmse["RMSE selection"] - df_rmse["RMSE train"] + ) # flip subtraction vs. AUC + rmse_overtrain = preselect_overtrain < preselect_overtrain_threshold df_rmse["preselection"] = rmse_thresh & rmse_overtrain - df_out = df_rmse.sort_values(by="RMSE selection", ascending=True).reset_index(drop=True) # lower is better + df_out = df_rmse.sort_values(by="RMSE selection", ascending=True).reset_index( + drop=True + ) # lower is better return df_out + def get_preselected_predictors(df_metric: pd.DataFrame) -> list: - """Wrapper function to extract a list of predictors from df_metric. + """Extract a list of predictors from df_metric. Parameters ---------- @@ -142,22 +169,26 @@ def get_preselected_predictors(df_metric: pd.DataFrame) -> list: list List of preselected predictors. """ - if "AUC selection" in df_metric.columns: - predictor_list = (df_metric[df_metric["preselection"]] - .sort_values(by="AUC selection", ascending=False) - .predictor.tolist()) + predictor_list = ( + df_metric[df_metric["preselection"]] + .sort_values(by="AUC selection", ascending=False) + .predictor.tolist() + ) elif "RMSE selection" in df_metric.columns: - predictor_list = (df_metric[df_metric["preselection"]] - .sort_values(by="RMSE selection", ascending=True) # lower is better - .predictor.tolist()) + predictor_list = ( + df_metric[df_metric["preselection"]] + .sort_values(by="RMSE selection", ascending=True) # lower is better + .predictor.tolist() + ) return [col + "_enc" for col in predictor_list] -def compute_correlations(target_enc_train_data: pd.DataFrame, - predictors: list) -> pd.DataFrame: - """Given a DataFrame and a list of predictors, compute the correlations - amongst the predictors in the DataFrame. + +def compute_correlations( + target_enc_train_data: pd.DataFrame, predictors: list +) -> pd.DataFrame: + """Compute the correlations amongst the predictors in the DataFrame. Parameters ---------- @@ -172,11 +203,11 @@ def compute_correlations(target_enc_train_data: pd.DataFrame, pd.DataFrame The correlation matrix of the training set. """ - correlations = target_enc_train_data[predictors].corr() - predictors_cleaned = [utils.clean_predictor_name(predictor) - for predictor in predictors] + predictors_cleaned = [ + utils.clean_predictor_name(predictor) for predictor in predictors + ] # Change index and columns with the cleaned version of the predictors # e.g. change "var1_enc" with "var1" diff --git a/cobra/preprocessing/__init__.py b/cobra/preprocessing/__init__.py index e02ad4c..cd8579a 100644 --- a/cobra/preprocessing/__init__.py +++ b/cobra/preprocessing/__init__.py @@ -1,9 +1,13 @@ +"""This module contains all preprocessing utils.""" + from .kbins_discretizer import KBinsDiscretizer from .target_encoder import TargetEncoder from .categorical_data_processor import CategoricalDataProcessor from .preprocessor import PreProcessor -__all__ = ['KBinsDiscretizer', - 'TargetEncoder', - 'CategoricalDataProcessor', - 'PreProcessor'] +__all__ = [ + "KBinsDiscretizer", + "TargetEncoder", + "CategoricalDataProcessor", + "PreProcessor", +] diff --git a/cobra/preprocessing/categorical_data_processor.py b/cobra/preprocessing/categorical_data_processor.py index 175bfb5..ba762ed 100644 --- a/cobra/preprocessing/categorical_data_processor.py +++ b/cobra/preprocessing/categorical_data_processor.py @@ -1,7 +1,8 @@ +"""Process categorical data.""" # standard lib imports import re -from typing import Optional +from typing import Any, Optional, Set, Union import logging # third party imports @@ -14,9 +15,9 @@ log = logging.getLogger(__name__) + class CategoricalDataProcessor(BaseEstimator): - """Regroups the categories of categorical variables based on significance - with target variable. + """Regroup categorical variables based on significance with target variable. This class implements the Python Prediction's way of dealing with categorical data preprocessing. There are three steps involved: @@ -60,22 +61,34 @@ class CategoricalDataProcessor(BaseEstimator): Whether contingency table should be scaled before chi^2. """ - valid_keys = ["model_type", "regroup", "regroup_name", "keep_missing", - "category_size_threshold", "p_value_threshold", - "scale_contingency_table", "forced_categories"] - - def __init__(self, - model_type: str="classification", - regroup: bool=True, - regroup_name: str="Other", - keep_missing: bool=True, - category_size_threshold: int=5, - p_value_threshold: float=0.001, - scale_contingency_table: bool=True, - forced_categories: dict={}): - + valid_keys = [ + "model_type", + "regroup", + "regroup_name", + "keep_missing", + "category_size_threshold", + "p_value_threshold", + "scale_contingency_table", + "forced_categories", + ] + + def __init__( + self, + model_type: str = "classification", + regroup: bool = True, + regroup_name: str = "Other", + keep_missing: bool = True, + category_size_threshold: int = 5, + p_value_threshold: float = 0.001, + scale_contingency_table: bool = True, + forced_categories: dict = {}, + ): + """Initialize the CategoricalDataProcessor.""" if model_type not in ["classification", "regression"]: - raise ValueError("An unexpected model_type was provided. A valid model_type is either 'classification' or 'regression'.") + raise ValueError( + "An unexpected model_type was provided. " + "A valid model_type is either 'classification' or 'regression'." + ) self.model_type = model_type self.regroup = regroup @@ -87,7 +100,7 @@ def __init__(self, self.forced_categories = forced_categories # dict to store fitted output in - self._cleaned_categories_by_column = {} + self._cleaned_categories_by_column: dict[str, Set[Any]] = {} def attributes_to_dict(self) -> dict: """Return the attributes of CategoricalDataProcessor as a dictionary. @@ -108,8 +121,7 @@ def attributes_to_dict(self) -> dict: return params def set_attributes_from_dict(self, params: dict): - """Set instance attributes from a dictionary of values with key the - name of the attribute. + """Set instance attributes from a dictionary of values with key the name of the attribute. Parameters ---------- @@ -125,9 +137,10 @@ def set_attributes_from_dict(self, params: dict): _fitted_output = params.pop("_cleaned_categories_by_column", {}) if type(_fitted_output) != dict: - raise ValueError("_cleaned_categories_by_column is expected to " - "be a dict but is of type {} instead" - .format(type(_fitted_output))) + raise ValueError( + "_cleaned_categories_by_column is expected to " + "be a dict but is of type {} instead".format(type(_fitted_output)) + ) # Clean out params dictionary to remove unknown keys (for safety!) params = {key: params[key] for key in params if key in self.valid_keys} @@ -142,8 +155,7 @@ def set_attributes_from_dict(self, params: dict): return self - def fit(self, data: pd.DataFrame, column_names: list, - target_column: str): + def fit(self, data: pd.DataFrame, column_names: list, target_column: str): """Fit the CategoricalDataProcessor. Parameters @@ -156,18 +168,17 @@ def fit(self, data: pd.DataFrame, column_names: list, target_column : str Column name of the target. """ - if not self.regroup: # We do not need to fit anything if regroup is set to False! log.info("regroup was set to False, so no fitting is required") return None - for column_name in tqdm(column_names, desc="Fitting category " - "regrouping..."): - + for column_name in tqdm(column_names, desc="Fitting category regrouping..."): if column_name not in data.columns: - log.warning("DataFrame has no column '{}', so it will be " - "skipped in fitting" .format(column_name)) + log.warning( + "DataFrame has no column '{}', so it will be " + "skipped in fitting".format(column_name) + ) continue cleaned_cats = self._fit_column(data, column_name, target_column) @@ -179,9 +190,11 @@ def fit(self, data: pd.DataFrame, column_names: list, # Add to _cleaned_categories_by_column for later use self._cleaned_categories_by_column[column_name] = cleaned_cats - def _fit_column(self, data: pd.DataFrame, column_name: str, - target_column) -> set: - """Compute which categories to regroup into "Other" + def _fit_column(self, data: pd.DataFrame, column_name: str, target_column) -> set: + """ + Fit all necessary columns into "Other". + + Computes which categories to regroup into "Other" for a particular column, and return those that need to be kept as-is. @@ -200,8 +213,10 @@ def _fit_column(self, data: pd.DataFrame, column_name: str, model_type = self.model_type if len(data[column_name].unique()) == 1: - log.warning(f"Predictor {column_name} is constant" - " and will be ignored in computation.") + log.warning( + f"Predictor {column_name} is constant" + " and will be ignored in computation." + ) return set(data[column_name].unique()) y = data[target_column] @@ -210,39 +225,34 @@ def _fit_column(self, data: pd.DataFrame, column_name: str, else: incidence = None - combined_categories = set() + combined_categories: Set[str] = set() # replace missings and get unique categories as a list - X = (CategoricalDataProcessor - ._replace_missings(data[column_name]) - .astype(object)) + X = CategoricalDataProcessor._replace_missings(data[column_name]).astype(object) unique_categories = list(X.unique()) # do not merge categories in case of dummies, i.e. 0 and 1 # (and possibly "Missing") - if (len(unique_categories) == 2 - or (len(unique_categories) == 3 - and "Missing" in unique_categories)): + if len(unique_categories) == 2 or ( + len(unique_categories) == 3 and "Missing" in unique_categories + ): return set(unique_categories) # get small categories and add them to the merged category list # does not apply incidence factor when model_type = "regression" - small_categories = (CategoricalDataProcessor - ._get_small_categories( - X, - incidence, - self.category_size_threshold)) + small_categories = CategoricalDataProcessor._get_small_categories( + X, incidence, self.category_size_threshold + ) combined_categories = combined_categories.union(small_categories) for category in unique_categories: if category in small_categories: continue - pval = (CategoricalDataProcessor - ._compute_p_value(X, y, category, - model_type, - self.scale_contingency_table)) + pval = CategoricalDataProcessor._compute_p_value( + X, y, category, model_type, self.scale_contingency_table + ) # if not significant, add it to the list if pval > self.p_value_threshold: @@ -254,8 +264,7 @@ def _fit_column(self, data: pd.DataFrame, column_name: str, return set(unique_categories).difference(combined_categories) - def transform(self, data: pd.DataFrame, - column_names: list) -> pd.DataFrame: + def transform(self, data: pd.DataFrame, column_names: list) -> pd.DataFrame: """Transform the data. Parameters @@ -271,29 +280,25 @@ def transform(self, data: pd.DataFrame, pd.DataFrame Data with additional transformed variables. """ - if self.regroup and len(self._cleaned_categories_by_column) == 0: - msg = ("{} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "{} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) for column_name in column_names: if column_name not in data.columns: - log.warning("Unknown column '{}' will be skipped" - .format(column_name)) + log.warning("Unknown column '{}' will be skipped".format(column_name)) continue data = self._transform_column(data, column_name) return data - def _transform_column(self, data: pd.DataFrame, - column_name: str) -> pd.DataFrame: - """Given a DataFrame, a column name and a list of categories to - combine, create an additional column which combines these categories - into "Other". + def _transform_column(self, data: pd.DataFrame, column_name: str) -> pd.DataFrame: + """Create an additional column which combines categories into "Other". Parameters ---------- @@ -307,16 +312,13 @@ def _transform_column(self, data: pd.DataFrame, pd.DataFrame Original DataFrame with an added processed column. """ - column_name_clean = column_name + "_processed" data.loc[:, column_name_clean] = data[column_name].astype(object) # Fill missings first - data.loc[:, column_name_clean] = (CategoricalDataProcessor - ._replace_missings( - data, - column_name_clean - )) + data.loc[:, column_name_clean] = CategoricalDataProcessor._replace_missings( + data, column_name_clean + ) if self.regroup: categories = self._cleaned_categories_by_column.get(column_name) @@ -325,25 +327,27 @@ def _transform_column(self, data: pd.DataFrame, # Log warning if categories is None, which indicates it is # not in fitted output if categories is None: - log.warning("Column '{}' is not in fitted output " - "and will be skipped".format(column_name)) + log.warning( + "Column '{}' is not in fitted output " + "and will be skipped".format(column_name) + ) return data - data.loc[:, column_name_clean] = (CategoricalDataProcessor - ._replace_categories( - data[column_name_clean], - categories, - self.regroup_name)) + data.loc[ + :, column_name_clean + ] = CategoricalDataProcessor._replace_categories( + data[column_name_clean], categories, self.regroup_name + ) # change data to categorical - data.loc[:, column_name_clean] = (data[column_name_clean] - .astype("category")) + data.loc[:, column_name_clean] = data[column_name_clean].astype("category") return data - def fit_transform(self, data: pd.DataFrame, column_names: list, - target_column: str) -> pd.DataFrame: - """Fits the data, then transforms it. + def fit_transform( + self, data: pd.DataFrame, column_names: list, target_column: str + ) -> pd.DataFrame: + """Fit and transform the data. Parameters ---------- @@ -360,15 +364,16 @@ def fit_transform(self, data: pd.DataFrame, column_names: list, pd.DataFrame Data with additional transformed variables. """ - self.fit(data, column_names, target_column) return self.transform(data, column_names) @staticmethod - def _get_small_categories(predictor_series: pd.Series, - incidence: float, - category_size_threshold: int) -> set: - """Fetch categories with a size below a certain threshold. + def _get_small_categories( + predictor_series: pd.Series, incidence: float, category_size_threshold: int + ) -> set: + """ + Fetch categories with a size below a certain threshold. + Note that we use an additional weighting with the overall incidence. Parameters @@ -392,12 +397,13 @@ def _get_small_categories(predictor_series: pd.Series, factor = 1 # Get all categories with a count below a threshold - bool_mask = (category_counts*factor) <= category_size_threshold + bool_mask = (category_counts * factor) <= category_size_threshold return set(category_counts[bool_mask].index.tolist()) @staticmethod - def _replace_missings(data: pd.DataFrame, - column_names: Optional[list] = None) -> pd.DataFrame: + def _replace_missings( + data: pd.DataFrame, column_names: Optional[Union[list[str], str]] = None + ) -> pd.DataFrame: """Replace missing values (incl. empty strings). Parameters @@ -427,10 +433,17 @@ def _replace_missings(data: pd.DataFrame, return temp @staticmethod - def _compute_p_value(X: pd.Series, y: pd.Series, category: str, - model_type: str, - scale_contingency_table: bool) -> float: - """Calculates p-value in order to evaluate whether category of + def _compute_p_value( + X: pd.Series, + y: pd.Series, + category: str, + model_type: str, + scale_contingency_table: bool, + ) -> float: + """ + Calculate p-value. + + Calculate p-value in order to evaluate whether category of interest is significantly different from the rest of the categories, given the target variable. @@ -461,31 +474,37 @@ def _compute_p_value(X: pd.Series, y: pd.Series, category: str, df["other_categories"] = np.where(X == category, 0, 1) if model_type == "classification": - contingency_table = pd.crosstab(index=df["other_categories"], columns=df["y"], - margins=False) + contingency_table = pd.crosstab( + index=df["other_categories"], columns=df["y"], margins=False + ) # if true, we scale the "other" categories if scale_contingency_table: size_other_cats = contingency_table.iloc[1].sum() incidence_mean = y.mean() - contingency_table.iloc[1, 0] = (1-incidence_mean) * size_other_cats + contingency_table.iloc[1, 0] = (1 - incidence_mean) * size_other_cats contingency_table.iloc[1, 1] = incidence_mean * size_other_cats contingency_table = contingency_table.values.astype(np.int64) pval = stats.chi2_contingency(contingency_table, correction=False)[1] elif model_type == "regression": - pval = stats.kruskal(df.y[df.other_categories == 0], - df.y[df.other_categories == 1])[1] + pval = stats.kruskal( + df.y[df.other_categories == 0], df.y[df.other_categories == 1] + )[1] return pval @staticmethod - def _replace_categories(data: pd.Series, categories: set, - replace_with: str) -> pd.Series: - """Replace categories in set with "Other" and transform the remaining - categories to strings to avoid type errors later on in the pipeline. + def _replace_categories( + data: pd.Series, categories: set, replace_with: str + ) -> pd.Series: + """ + Replace categories in set with "Other". + + Transforms the remaining categories to strings + to avoid type errors later on in the pipeline. Parameters ---------- @@ -501,5 +520,4 @@ def _replace_categories(data: pd.Series, categories: set, pd.Series Series with replaced categories. """ - return data.apply( - lambda x: str(x) if x in categories else replace_with) + return data.apply(lambda x: str(x) if x in categories else replace_with) diff --git a/cobra/preprocessing/kbins_discretizer.py b/cobra/preprocessing/kbins_discretizer.py index c30d7de..84fae51 100644 --- a/cobra/preprocessing/kbins_discretizer.py +++ b/cobra/preprocessing/kbins_discretizer.py @@ -1,7 +1,8 @@ - +"""Binning of continous data.""" # standard lib imports from copy import deepcopy -from typing import List +from this import d +from typing import Dict, List, Optional, Union import numbers import logging import math @@ -15,8 +16,12 @@ log = logging.getLogger(__name__) + class KBinsDiscretizer(BaseEstimator): - """Bin continuous data into intervals of predefined size. It provides a + """ + Discretize continuous values into categorical values. + + Bin continuous data into intervals of predefined size. It provides a way to partition continuous data into discrete values, i.e. transform continuous data into nominal data. This can make a linear model more expressive as it introduces nonlinearity to the model, while maintaining @@ -59,17 +64,27 @@ class KBinsDiscretizer(BaseEstimator): """ valid_strategies = ("uniform", "quantile") - valid_keys = ["n_bins", "strategy", "closed", "auto_adapt_bins", - "starting_precision", "label_format", - "change_endpoint_format"] - - def __init__(self, n_bins: int = 10, strategy: str = "quantile", - closed: str = "right", - auto_adapt_bins: bool = False, - starting_precision: int = 0, - label_format: str = "{} - {}", - change_endpoint_format: bool = False): - + valid_keys = [ + "n_bins", + "strategy", + "closed", + "auto_adapt_bins", + "starting_precision", + "label_format", + "change_endpoint_format", + ] + + def __init__( + self, + n_bins: int = 10, + strategy: str = "quantile", + closed: str = "right", + auto_adapt_bins: bool = False, + starting_precision: int = 0, + label_format: str = "{} - {}", + change_endpoint_format: bool = False, + ): + """Initialize the KBinsDiscretizer.""" # validate number of bins self._validate_n_bins(n_bins) @@ -85,8 +100,7 @@ def __init__(self, n_bins: int = 10, strategy: str = "quantile", self._bins_by_column = {} def _validate_n_bins(self, n_bins: int): - """Check if ``n_bins`` is of the proper type and if it is bigger - than two + """Check if ``n_bins`` is of the proper type and if it is bigger than one. Parameters ---------- @@ -99,17 +113,20 @@ def _validate_n_bins(self, n_bins: int): in case ``n_bins`` is not an integer or if ``n_bins < 2`` """ if not isinstance(n_bins, numbers.Integral): - raise ValueError("{} received an invalid n_bins type. " - "Received {}, expected int." - .format(KBinsDiscretizer.__name__, - type(n_bins).__name__)) + raise ValueError( + "{} received an invalid n_bins type. Received {}, expected int.".format( + KBinsDiscretizer.__name__, type(n_bins).__name__ + ) + ) if n_bins < 2: - raise ValueError("{} received an invalid number " - "of bins. Received {}, expected at least 2." - .format(KBinsDiscretizer.__name__, n_bins)) + raise ValueError( + "{} received an invalid number of bins. Received {}, expected at least 2.".format( + KBinsDiscretizer.__name__, n_bins + ) + ) def attributes_to_dict(self) -> dict: - """Return the attributes of KBinsDiscretizer in a dictionary + """Return the attributes of KBinsDiscretizer as a dictionary. Returns ------- @@ -127,8 +144,7 @@ def attributes_to_dict(self) -> dict: return params def set_attributes_from_dict(self, params: dict): - """Set instance attributes from a dictionary of values with key the - name of the attribute. + """Set instance attributes from a dictionary. Parameters ---------- @@ -144,9 +160,11 @@ def set_attributes_from_dict(self, params: dict): _bins_by_column = params.pop("_bins_by_column", {}) if type(_bins_by_column) != dict: - raise ValueError("_bins_by_column is expected to be a dict " - "but is of type {} instead" - .format(type(_bins_by_column))) + raise ValueError( + "_bins_by_column is expected to be a dict but is of type {} instead".format( + type(_bins_by_column) + ) + ) # Clean out params dictionary to remove unknown keys (for safety!) params = {key: params[key] for key in params if key in self.valid_keys} @@ -163,7 +181,7 @@ def set_attributes_from_dict(self, params: dict): return self def fit(self, data: pd.DataFrame, column_names: list): - """Fits the estimator + """Fit the estimator. Parameters ---------- @@ -172,19 +190,19 @@ def fit(self, data: pd.DataFrame, column_names: list): column_names : list Names of the columns of the DataFrame to discretize """ - if self.strategy not in self.valid_strategies: - raise ValueError("{}: valid options for 'strategy' are {}. " - "Got strategy={!r} instead." - .format(KBinsDiscretizer.__name__, - self.valid_strategies, self.strategy)) - - for column_name in tqdm(column_names, desc="Computing " - "discretization bins..."): + raise ValueError( + "{}: valid options for 'strategy' are {}. Got strategy={!r} instead.".format( + KBinsDiscretizer.__name__, self.valid_strategies, self.strategy + ) + ) + for column_name in tqdm(column_names, desc="Computing discretization bins..."): if column_name not in data.columns: - log.warning("DataFrame has no column '{}', so it will be " - "skipped in fitting" .format(column_name)) + log.warning( + "DataFrame has no column '{}', so it will be " + "skipped in fitting".format(column_name) + ) continue bins = self._fit_column(data, column_name) @@ -192,9 +210,10 @@ def fit(self, data: pd.DataFrame, column_names: list): # Add to bins_by_column for later use self._bins_by_column[column_name] = bins - def _fit_column(self, data: pd.DataFrame, - column_name: str) -> List[tuple]: - """Compute bins for a specific column in data + def _fit_column( + self, data: pd.DataFrame, column_name: str + ) -> Optional[List[tuple]]: + """Compute bins for a specific column in data. Parameters ---------- @@ -211,51 +230,61 @@ def _fit_column(self, data: pd.DataFrame, col_min, col_max = data[column_name].min(), data[column_name].max() if col_min == col_max: - log.warning("Predictor '{}' is constant and " - "will be ignored in computation".format(column_name)) + log.warning( + "Predictor '{}' is constant and will be ignored in computation".format( + column_name + ) + ) return None - prop_inf = (np.sum(np.isinf(data[column_name])) - / data[column_name].shape[0]) + prop_inf = np.sum(np.isinf(data[column_name])) / data[column_name].shape[0] if prop_inf > 0: - log.warning(f"Column {column_name} has " - f"{prop_inf:.1%} inf values, thus it was skipped. " - f"Consider dropping or transforming it.") + log.warning( + f"Column {column_name} has " + f"{prop_inf:.1%} inf values, thus it was skipped. " + f"Consider dropping or transforming it." + ) return None prop_nan = data[column_name].isna().sum() / data[column_name].shape[0] if prop_nan >= 0.99: - log.warning(f"Column {column_name} is" - f" {prop_nan:.1%}% NaNs, " - f"consider dropping or transforming it.") + log.warning( + f"Column {column_name} is" + f" {prop_nan:.1%}% NaNs, " + f"consider dropping or transforming it." + ) n_bins = self.n_bins if self.auto_adapt_bins: size = len(data.index) - missing_pct = data[column_name].isnull().sum()/size + missing_pct = data[column_name].isnull().sum() / size n_bins = int(max(round((1 - missing_pct) * n_bins), 2)) - bin_edges = self._compute_bin_edges(data, column_name, n_bins, - col_min, col_max) + bin_edges = self._compute_bin_edges(data, column_name, n_bins, col_min, col_max) if len(bin_edges) < 3: - log.warning("Only 1 bin was found for predictor '{}' so it will " - "be ignored in computation".format(column_name)) + log.warning( + "Only 1 bin was found for predictor '{}' so it will " + "be ignored in computation".format(column_name) + ) return None if len(bin_edges) < n_bins + 1: - log.warning("The number of actual bins for predictor '{}' is {} " - "which is smaller than the requested number of bins " - "{}".format(column_name, len(bin_edges) - 1, n_bins)) + log.warning( + "The number of actual bins for predictor '{}' is {} " + "which is smaller than the requested number of bins " + "{}".format(column_name, len(bin_edges) - 1, n_bins) + ) return self._compute_bins_from_edges(bin_edges) - def transform(self, data: pd.DataFrame, - column_names: list) -> pd.DataFrame: - """Discretizes the data in the given list of columns by mapping each - number to the appropriate bin computed by the fit method + def transform(self, data: pd.DataFrame, column_names: list) -> pd.DataFrame: + """Discretize the data in the given list of columns. + + This is done by mapping each number to + the appropriate bin computed by the fit method. Parameters ---------- @@ -270,15 +299,18 @@ def transform(self, data: pd.DataFrame, data with additional discretized variables """ if len(self._bins_by_column) == 0: - msg = ("{} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "{} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) for column_name in tqdm(column_names, desc="Discretizing columns..."): if column_name not in self._bins_by_column: - log.warning("Column '{}' is not in fitted output " - "and will be skipped".format(column_name)) + log.warning( + "Column '{}' is not in fitted output " + "and will be skipped".format(column_name) + ) continue # can be None for a column with a constant value! @@ -288,12 +320,10 @@ def transform(self, data: pd.DataFrame, return data - def _transform_column(self, data: pd.DataFrame, - column_name: str, - bins: List[tuple]) -> pd.DataFrame: - """Given a DataFrame, a column name and a list of bins, - create an additional column which determines the bin in which the value - of column_name lies in. + def _transform_column( + self, data: pd.DataFrame, column_name: str, bins: List[tuple] + ) -> pd.DataFrame: + """Create a new column with binned values of column_name. Parameters ---------- @@ -309,20 +339,19 @@ def _transform_column(self, data: pd.DataFrame, pd.DataFrame original DataFrame with an added binned column """ - interval_idx = KBinsDiscretizer._create_index(bins, self.closed) column_name_bin = column_name + "_bin" # use pd.cut to compute bins - data.loc[:, column_name_bin] = pd.cut(x=data[column_name], - bins=interval_idx) + data.loc[:, column_name_bin] = pd.cut(x=data[column_name], bins=interval_idx) # Rename bins so that the output has a proper format bin_labels = self._create_bin_labels(bins) - data.loc[:, column_name_bin] = (data[column_name_bin] - .cat.rename_categories(bin_labels)) + data.loc[:, column_name_bin] = data[column_name_bin].cat.rename_categories( + bin_labels + ) if data[column_name_bin].isnull().sum() > 0: @@ -335,9 +364,8 @@ def _transform_column(self, data: pd.DataFrame, return data - def fit_transform(self, data: pd.DataFrame, - column_names: list) -> pd.DataFrame: - """Fits to data, then transform it + def fit_transform(self, data: pd.DataFrame, column_names: list) -> pd.DataFrame: + """Fit to data, then transform it. Parameters ---------- @@ -354,11 +382,15 @@ def fit_transform(self, data: pd.DataFrame, self.fit(data, column_names) return self.transform(data, column_names) - def _compute_bin_edges(self, data: pd.DataFrame, column_name: str, - n_bins: int, col_min: float, - col_max: float) -> list: - """Compute the bin edges for a given column, a DataFrame and the number - of required bins + def _compute_bin_edges( + self, + data: pd.DataFrame, + column_name: str, + n_bins: int, + col_min: float, + col_max: float, + ) -> list: + """Compute the desired bin edges. Parameters ---------- @@ -378,14 +410,19 @@ def _compute_bin_edges(self, data: pd.DataFrame, column_name: str, list list of bin edges from which to compute the bins """ - + # fmt: off bin_edges = [] if self.strategy == "quantile": - bin_edges = list(data[column_name] - .quantile(np.linspace(0, 1, n_bins + 1), - interpolation="linear")) + bin_edges = list( + data[column_name] + .quantile( + np.linspace(0, 1, n_bins + 1), + interpolation="linear" + ) + ) elif self.strategy == "uniform": bin_edges = list(np.linspace(col_min, col_max, n_bins + 1)) + # fmt: on # nans lead to unexpected behavior during sorting, # by replacing with inf we ensure these stay at the @@ -397,22 +434,23 @@ def _compute_bin_edges(self, data: pd.DataFrame, column_name: str, bin_edges[-1] = np.inf if np.isnan(bin_edges).sum() > 0: - log.warning(f"Column {column_name} " - "has NaNs present in bin definitions") + log.warning(f"Column {column_name} has NaNs present in bin definitions") - # Make absolutely sure bin edges are ordered, + # Make absolutely sure bin edges are ordered, # in very rare situations this wasn't the case - # due to rounding in quantile calculation (e.g. + # due to rounding in quantile calculation (e.g. # distributions with strong mass for same value) bin_edges = sorted(bin_edges) - + # Make sure the bin_edges are unique # and order remains the same return list(dict.fromkeys(bin_edges)) def _compute_minimal_precision_of_bin_edges(self, bin_edges: list) -> int: - """Compute the minimal precision of a list of bin_edges so that we end - up with a strictly ascending sequence of different numbers even when rounded. + """Compute the minimal precision of a list of bin_edges. + + This way we end up with a strictly ascending sequence of + different numbers even when rounded. The starting_precision attribute will be used as the initial precision. In case of a negative starting_precision, the bin edges will be rounded to the nearest 10, 100, ... (e.g. 5.55 -> 10, 246 -> 200, ...) @@ -427,7 +465,6 @@ def _compute_minimal_precision_of_bin_edges(self, bin_edges: list) -> int: int minimal precision for the bin edges """ - precision = self.starting_precision while True: cont = False @@ -443,8 +480,8 @@ def _compute_minimal_precision_of_bin_edges(self, bin_edges: list) -> int: return precision def _compute_bins_from_edges(self, bin_edges: list) -> List[tuple]: - """Given a list of bin edges, compute the minimal precision for which - we can make meaningful bins and make those bins + """ + Return bins with the minimal precision. Parameters ---------- @@ -460,7 +497,7 @@ def _compute_bins_from_edges(self, bin_edges: list) -> List[tuple]: # this can be a negative number, which then # rounds numbers to the nearest 10, 100, ... precision = self._compute_minimal_precision_of_bin_edges(bin_edges) - + bins = [] for a, b in zip(bin_edges, bin_edges[1:]): fmt_a = round(a, precision) @@ -471,9 +508,12 @@ def _compute_bins_from_edges(self, bin_edges: list) -> List[tuple]: return bins @staticmethod - def _create_index(intervals: List[tuple], - closed: str = "right") -> pd.IntervalIndex: - """Create an pd.IntervalIndex based on a list of tuples. + def _create_index( + intervals: List[tuple], closed: str = "right" + ) -> pd.IntervalIndex: + """ + Create an pd.IntervalIndex based on a list of tuples. + This is basically a wrapper around pd.IntervalIndex.from_tuples However, the lower bound of the first entry in the list (the lower bin) is replaced by -np.inf. Similarly, the upper bound of the last entry in @@ -492,13 +532,18 @@ def _create_index(intervals: List[tuple], pd.IntervalIndex Description """ - # check if closed is of the proper form + # fmt: off if closed not in ["left", "right"]: - raise ValueError("{}: valid options for 'closed' are {}. " - "Got strategy={!r} instead." - .format(KBinsDiscretizer.__name__, - ["left", "right"], closed)) + raise ValueError( + "{}: valid options for 'closed' are {}. " + "Got strategy={!r} instead." + .format( + KBinsDiscretizer.__name__, + ["left", "right"], closed + ) + ) + # fmt: on # deepcopy variable because we do not want to modify the content # of intervals (which is still used outside of this function) @@ -511,8 +556,8 @@ def _create_index(intervals: List[tuple], return pd.IntervalIndex.from_tuples(_intervals, closed) def _create_bin_labels(self, bins: List[tuple]) -> list: - """Given a list of bins, create a list of string containing the bins - as a string with a specific format (e.g. bin labels) + """ + Stringify the bin bounds to be used as bin labels. Parameters ---------- @@ -525,9 +570,16 @@ def _create_bin_labels(self, bins: List[tuple]) -> list: list of (formatted) bin labels """ bin_labels = [] + # fmt: off for interval in bins: - bin_labels.append(self.label_format.format(interval[0], - interval[1])) + bin_labels.append( + self.label_format + .format( + interval[0], + interval[1] + ) + ) + # fmt: on # Format first and last bin as < x and > y resp. if self.change_endpoint_format: diff --git a/cobra/preprocessing/preprocessor.py b/cobra/preprocessing/preprocessor.py index e03d352..c873b68 100644 --- a/cobra/preprocessing/preprocessor.py +++ b/cobra/preprocessing/preprocessor.py @@ -1,3 +1,4 @@ +"""Preprocess data.""" # standard lib imports import inspect @@ -6,6 +7,7 @@ import logging from random import shuffle from datetime import datetime +from typing import Any, Set # third party imports import pandas as pd @@ -19,8 +21,12 @@ log = logging.getLogger(__name__) + class PreProcessor(BaseEstimator): - """This class implements a so-called facade pattern to define a + """ + Preprocess data. + + This class implements a so-called facade pattern to define a higher-level interface to work with the CategoricalDataProcessor, KBinsDiscretizer and TargetEncoder classes, so that their fit and transform methods are called in the correct order. @@ -48,12 +54,14 @@ class PreProcessor(BaseEstimator): (``classification`` or ``regression``). """ - def __init__(self, - categorical_data_processor: CategoricalDataProcessor, - discretizer: KBinsDiscretizer, - target_encoder: TargetEncoder, - is_fitted: bool = False): - + def __init__( + self, + categorical_data_processor: CategoricalDataProcessor, + discretizer: KBinsDiscretizer, + target_encoder: TargetEncoder, + is_fitted: bool = False, + ): + """Initialize the PreProcessor class.""" self._categorical_data_processor = categorical_data_processor self._discretizer = discretizer self._target_encoder = target_encoder @@ -63,27 +71,28 @@ def __init__(self, self.model_type = categorical_data_processor.model_type @classmethod - def from_params(cls, - model_type: str="classification", - n_bins: int=10, - strategy: str="quantile", - closed: str="right", - auto_adapt_bins: bool=False, - starting_precision: int=0, - label_format: str="{} - {}", - change_endpoint_format: bool=False, - regroup: bool=True, - regroup_name: str="Other", - keep_missing: bool=True, - category_size_threshold: int=5, - p_value_threshold: float=0.001, - scale_contingency_table: bool=True, - forced_categories: dict={}, - weight: float=0.0, - imputation_strategy: str="mean"): - """Constructor to instantiate PreProcessor from all the parameters - that can be set in all its required (attribute) classes - along with good default values. + def from_params( + cls, + model_type: str = "classification", + n_bins: int = 10, + strategy: str = "quantile", + closed: str = "right", + auto_adapt_bins: bool = False, + starting_precision: int = 0, + label_format: str = "{} - {}", + change_endpoint_format: bool = False, + regroup: bool = True, + regroup_name: str = "Other", + keep_missing: bool = True, + category_size_threshold: int = 5, + p_value_threshold: float = 0.001, + scale_contingency_table: bool = True, + forced_categories: dict = {}, + weight: float = 0.0, + imputation_strategy: str = "mean", + ): + """ + Instantiate a PreProcessor from given or default params. Parameters ---------- @@ -147,29 +156,39 @@ def from_params(cls, PreProcessor Class encapsulating CategoricalDataProcessor, KBinsDiscretizer, and TargetEncoder instances. - """ - categorical_data_processor = CategoricalDataProcessor(model_type, - regroup, - regroup_name, keep_missing, - category_size_threshold, - p_value_threshold, - scale_contingency_table, - forced_categories) - - discretizer = KBinsDiscretizer(n_bins, strategy, closed, - auto_adapt_bins, - starting_precision, - label_format, - change_endpoint_format) - + """ + categorical_data_processor = CategoricalDataProcessor( + model_type, + regroup, + regroup_name, + keep_missing, + category_size_threshold, + p_value_threshold, + scale_contingency_table, + forced_categories, + ) + + discretizer = KBinsDiscretizer( + n_bins, + strategy, + closed, + auto_adapt_bins, + starting_precision, + label_format, + change_endpoint_format, + ) + target_encoder = TargetEncoder(weight, imputation_strategy) return cls(categorical_data_processor, discretizer, target_encoder) @classmethod def from_pipeline(cls, pipeline: dict): - """Constructor to instantiate PreProcessor from a (fitted) pipeline - which was stored as a JSON file and passed to this function as a dict. + """ + Instantiate a PreProcessor from a (fitted) pipeline. + + The pipeline should be stored as a JSON file and passed to this function + as a dict. Parameters ---------- @@ -187,10 +206,11 @@ def from_pipeline(cls, pipeline: dict): If the loaded pipeline does not have all required parameters and no others. """ - if not PreProcessor._is_valid_pipeline(pipeline): - raise ValueError("Invalid pipeline, as it does not " - "contain all and only the required parameters.") + raise ValueError( + "Invalid pipeline, as it does not " + "contain all and only the required parameters." + ) categorical_data_processor = CategoricalDataProcessor() categorical_data_processor.set_attributes_from_dict( @@ -204,11 +224,20 @@ def from_pipeline(cls, pipeline: dict): target_encoder = TargetEncoder() target_encoder.set_attributes_from_dict(pipeline["target_encoder"]) - return cls(categorical_data_processor, discretizer, target_encoder, - is_fitted=pipeline["_is_fitted"]) + return cls( + categorical_data_processor, + discretizer, + target_encoder, + is_fitted=pipeline["_is_fitted"], + ) - def fit(self, train_data: pd.DataFrame, continuous_vars: list, - discrete_vars: list, target_column_name: str): + def fit( + self, + train_data: pd.DataFrame, + continuous_vars: list, + discrete_vars: list, + target_column_name: str, + ): """Fit the data to the preprocessing pipeline. Parameters @@ -222,11 +251,10 @@ def fit(self, train_data: pd.DataFrame, continuous_vars: list, target_column_name : str Column name of the target. """ - # get list of all variables - preprocessed_variable_names = (PreProcessor - ._get_variable_list(continuous_vars, - discrete_vars)) + preprocessed_variable_names = PreProcessor._get_variable_list( + continuous_vars, discrete_vars + ) log.info("Starting to fit pipeline") start = time.time() @@ -240,35 +268,39 @@ def fit(self, train_data: pd.DataFrame, continuous_vars: list, if continuous_vars: begin = time.time() self._discretizer.fit(train_data, continuous_vars) - log.info("Fitting KBinsDiscretizer took {} seconds" - .format(time.time() - begin)) + log.info( + "Fitting KBinsDiscretizer took {} seconds".format(time.time() - begin) + ) - train_data = self._discretizer.transform(train_data, - continuous_vars) + train_data = self._discretizer.transform(train_data, continuous_vars) if discrete_vars: begin = time.time() - self._categorical_data_processor.fit(train_data, - discrete_vars, - target_column_name) - log.info("Fitting categorical_data_processor class took {} seconds" - .format(time.time() - begin)) - - train_data = (self._categorical_data_processor - .transform(train_data, discrete_vars)) + self._categorical_data_processor.fit( + train_data, discrete_vars, target_column_name + ) + log.info( + "Fitting categorical_data_processor class took {} seconds".format( + time.time() - begin + ) + ) + + train_data = self._categorical_data_processor.transform( + train_data, discrete_vars + ) begin = time.time() - self._target_encoder.fit(train_data, preprocessed_variable_names, - target_column_name) - log.info("Fitting TargetEncoder took {} seconds" - .format(time.time() - begin)) + self._target_encoder.fit( + train_data, preprocessed_variable_names, target_column_name + ) + log.info("Fitting TargetEncoder took {} seconds".format(time.time() - begin)) self._is_fitted = True # set fitted boolean to True - log.info("Fitting pipeline took {} seconds" - .format(time.time() - start)) + log.info("Fitting pipeline took {} seconds".format(time.time() - start)) - def transform(self, data: pd.DataFrame, continuous_vars: list, - discrete_vars: list) -> pd.DataFrame: + def transform( + self, data: pd.DataFrame, continuous_vars: list, discrete_vars: list + ) -> pd.DataFrame: """Transform the data by applying the preprocessing pipeline. Parameters @@ -290,37 +322,38 @@ def transform(self, data: pd.DataFrame, continuous_vars: list, NotFittedError In case PreProcessor was not fitted first. """ - start = time.time() if not self._is_fitted: - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") - + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) - preprocessed_variable_names = (PreProcessor - ._get_variable_list(continuous_vars, - discrete_vars)) + preprocessed_variable_names = PreProcessor._get_variable_list( + continuous_vars, discrete_vars + ) if continuous_vars: data = self._discretizer.transform(data, continuous_vars) if discrete_vars: - data = self._categorical_data_processor.transform(data, - discrete_vars) + data = self._categorical_data_processor.transform(data, discrete_vars) - data = self._target_encoder.transform(data, - preprocessed_variable_names) + data = self._target_encoder.transform(data, preprocessed_variable_names) - log.info("Transforming data took {} seconds" - .format(time.time() - start)) + log.info("Transforming data took {} seconds".format(time.time() - start)) return data - def fit_transform(self, train_data: pd.DataFrame, continuous_vars: list, - discrete_vars: list, - target_column_name: str) -> pd.DataFrame: + def fit_transform( + self, + train_data: pd.DataFrame, + continuous_vars: list, + discrete_vars: list, + target_column_name: str, + ) -> pd.DataFrame: """Fit preprocessing pipeline and transform the data. Parameters @@ -339,19 +372,18 @@ def fit_transform(self, train_data: pd.DataFrame, continuous_vars: list, pd.DataFrame Transformed (preprocessed) data. """ - - self.fit(train_data, continuous_vars, discrete_vars, - target_column_name) + self.fit(train_data, continuous_vars, discrete_vars, target_column_name) return self.transform(train_data, continuous_vars, discrete_vars) @staticmethod - def train_selection_validation_split(data: pd.DataFrame, - train_prop: float=0.6, - selection_prop: float=0.2, - validation_prop: float=0.2) -> pd.DataFrame: - """Adds `split` column with train/selection/validation values - to the dataset. + def train_selection_validation_split( + data: pd.DataFrame, + train_prop: float = 0.6, + selection_prop: float = 0.2, + validation_prop: float = 0.2, + ) -> pd.DataFrame: + """Add `split` column with train/selection/validation values to the dataset. Train set = data on which the model is trained and on which the encoding is based. Selection set = data used for univariate and forward feature selection. Often called the validation set. @@ -374,8 +406,10 @@ def train_selection_validation_split(data: pd.DataFrame, DataFrame with additional split column. """ if not math.isclose(train_prop + selection_prop + validation_prop, 1.0): - raise ValueError("The sum of train_prop, selection_prop and " - "validation_prop must be 1.0.") + raise ValueError( + "The sum of train_prop, selection_prop and " + "validation_prop must be 1.0." + ) if train_prop == 0.0: raise ValueError("train_prop cannot be zero!") @@ -387,21 +421,26 @@ def train_selection_validation_split(data: pd.DataFrame, size_train = int(train_prop * nrows) size_select = int(selection_prop * nrows) size_valid = int(validation_prop * nrows) - correction = nrows - (size_train+size_select+size_valid) + correction = nrows - (size_train + size_select + size_valid) - split = ['train'] * size_train \ - + ['train'] * correction \ - + ['selection'] * size_select \ - + ['validation'] * size_valid + split = ( + ["train"] * size_train + + ["train"] * correction + + ["selection"] * size_select + + ["validation"] * size_valid + ) shuffle(split) - data['split'] = split + data["split"] = split return data - def serialize_pipeline(self) -> dict: - """Serialize the preprocessing pipeline by writing all its required + def serialize_pipeline(self) -> dict[str, Any]: + """ + Serialize the preprocessing pipeline. + + This is done by writing all its required parameters to a dictionary to later store it as a JSON file. Returns @@ -409,19 +448,17 @@ def serialize_pipeline(self) -> dict: dict Return the pipeline as a dictionary. """ + pipeline: dict[str, Any] pipeline = { - "metadata": { - "timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S") - } + "metadata": {"timestamp": datetime.now().strftime("%d/%m/%Y %H:%M:%S")} } - pipeline["categorical_data_processor"] = (self - ._categorical_data_processor - .attributes_to_dict()) + pipeline[ + "categorical_data_processor" + ] = self._categorical_data_processor.attributes_to_dict() pipeline["discretizer"] = self._discretizer.attributes_to_dict() - pipeline["target_encoder"] = (self._target_encoder - .attributes_to_dict()) + pipeline["target_encoder"] = self._target_encoder.attributes_to_dict() pipeline["_is_fitted"] = True @@ -429,8 +466,7 @@ def serialize_pipeline(self) -> dict: @staticmethod def _is_valid_pipeline(pipeline: dict) -> bool: - """Validate the loaded pipeline by checking if all required parameters - are present (and no others!). + """Validate the loaded pipeline by checking if only the required parameters are present. Parameters ---------- @@ -438,13 +474,13 @@ def _is_valid_pipeline(pipeline: dict) -> bool: Loaded pipeline from JSON file. """ keys = inspect.getfullargspec(PreProcessor.from_params).args - valid_keys = set([key for key in keys - if key not in ["cls", "serialization_path"]]) + valid_keys = set( + [key for key in keys if key not in ["cls", "serialization_path"]] + ) - input_keys = set() + input_keys: Set[str] = set() for key in pipeline: - if key in ["categorical_data_processor", "discretizer", - "target_encoder"]: + if key in ["categorical_data_processor", "discretizer", "target_encoder"]: input_keys = input_keys.union(set(pipeline[key].keys())) elif key != "metadata": input_keys.add(key) @@ -456,8 +492,9 @@ def _is_valid_pipeline(pipeline: dict) -> bool: @staticmethod def _get_variable_list(continuous_vars: list, discrete_vars: list) -> list: - """Merge lists of continuous_vars and discrete_vars and add suffix - "_bin" resp. "_processed" to the predictors. + """Merge lists of continuous_vars and discrete_vars. + + Suffixes "_bin" resp. "_processed" are added to the predictors. Parameters ---------- @@ -476,8 +513,9 @@ def _get_variable_list(continuous_vars: list, discrete_vars: list) -> list: ValueError In case both lists are empty. """ - var_list = ([col + "_processed" for col in discrete_vars] - + [col + "_bin" for col in continuous_vars]) + var_list = [col + "_processed" for col in discrete_vars] + [ + col + "_bin" for col in continuous_vars + ] if not var_list: raise ValueError("Variable var_list is None or empty list.") diff --git a/cobra/preprocessing/target_encoder.py b/cobra/preprocessing/target_encoder.py index 3eda39d..7cd3f6a 100644 --- a/cobra/preprocessing/target_encoder.py +++ b/cobra/preprocessing/target_encoder.py @@ -1,3 +1,4 @@ +"""Target encoding.""" import logging @@ -6,10 +7,15 @@ from sklearn.base import BaseEstimator from sklearn.exceptions import NotFittedError + log = logging.getLogger(__name__) + class TargetEncoder(BaseEstimator): - """Target encoding for categorical features, inspired by + """ + Target encoding for categorical features. + + Inspired by http://contrib.scikit-learn.org/category_encoders/targetencoder.html. Replace each value of the categorical feature with the average of the @@ -62,29 +68,32 @@ class TargetEncoder(BaseEstimator): valid_imputation_strategies = ("mean", "min", "max") - def __init__(self, weight: float=0.0, - imputation_strategy: str="mean"): - + def __init__(self, weight: float = 0.0, imputation_strategy: str = "mean"): + """Initialize the TargetEncoder class.""" if weight < 0: raise ValueError("The value of weight cannot be smaller than zero.") elif imputation_strategy not in self.valid_imputation_strategies: - raise ValueError("Valid options for 'imputation_strategy' are {}." - " Got imputation_strategy={!r} instead." - .format(self.valid_imputation_strategies, - imputation_strategy)) + raise ValueError( + "Valid options for 'imputation_strategy' are {}. " + "Got imputation_strategy={!r} instead.".format( + self.valid_imputation_strategies, imputation_strategy + ) + ) if weight == 0: - log.warning("The target encoder's additive smoothing weight is " - "set to 0. This disables smoothing and may make the " - "encoding prone to overfitting. Increase the weight " - "if needed.") + log.warning( + "The target encoder's additive smoothing weight is " + "set to 0. This disables smoothing and may make the " + "encoding prone to overfitting. Increase the weight " + "if needed." + ) self.weight = weight self.imputation_strategy = imputation_strategy self._mapping = {} # placeholder for fitted output # placeholder for the global incidence of the data used for fitting - self._global_mean = None + self._global_mean: float def attributes_to_dict(self) -> dict: """Return the attributes of TargetEncoder in a dictionary. @@ -98,8 +107,7 @@ def attributes_to_dict(self) -> dict: params = self.get_params() params["_mapping"] = { - key: value.to_dict() - for key, value in self._mapping.items() + key: value.to_dict() for key, value in self._mapping.items() } params["_global_mean"] = self._global_mean @@ -107,8 +115,7 @@ def attributes_to_dict(self) -> dict: return params def set_attributes_from_dict(self, params: dict): - """Set instance attributes from a dictionary of values with key the - name of the attribute. + """Set instance attributes from a dictionary. Parameters ---------- @@ -119,8 +126,10 @@ def set_attributes_from_dict(self, params: dict): if "weight" in params and type(params["weight"]) == float: self.weight = params["weight"] - if ("imputation_strategy" in params and - params["imputation_strategy"] in self.valid_imputation_strategies): + if ( + "imputation_strategy" in params + and params["imputation_strategy"] in self.valid_imputation_strategies + ): self.imputation_strategy = params["imputation_strategy"] if "_global_mean" in params and type(params["_global_mean"]) == float: @@ -136,14 +145,12 @@ def dict_to_series(key, value): return s self._mapping = { - key: dict_to_series(key, value) - for key, value in _mapping.items() + key: dict_to_series(key, value) for key, value in _mapping.items() } return self - def fit(self, data: pd.DataFrame, column_names: list, - target_column: str): + def fit(self, data: pd.DataFrame, column_names: list, target_column: str): """Fit the TargetEncoder to the data. Parameters @@ -162,8 +169,10 @@ def fit(self, data: pd.DataFrame, column_names: list, for column in tqdm(column_names, desc="Fitting target encoding..."): if column not in data.columns: - log.warning("DataFrame has no column '{}', so it will be " - "skipped in fitting" .format(column)) + log.warning( + "DataFrame has no column '{}', so it will be " + "skipped in fitting".format(column) + ) continue self._mapping[column] = self._fit_column(data[column], y) @@ -191,15 +200,13 @@ def _fit_column(self, X: pd.Series, y: pd.Series) -> pd.Series: stats = y.groupby(X).agg(["mean", "count"]) # Note: if self.weight = 0, we have the ordinary incidence replacement - numerator = (stats["count"] * stats["mean"] - + self.weight * self._global_mean) + numerator = stats["count"] * stats["mean"] + self.weight * self._global_mean denominator = stats["count"] + self.weight return numerator / denominator - def transform(self, data: pd.DataFrame, - column_names: list) -> pd.DataFrame: + def transform(self, data: pd.DataFrame, column_names: list) -> pd.DataFrame: """Replace (e.g. encode) values of each categorical column with a new value (reflecting the corresponding average target value, optionally smoothed by a regularization weight), @@ -224,25 +231,27 @@ def transform(self, data: pd.DataFrame, method. """ if (len(self._mapping) == 0) or (self._global_mean is None): - msg = ("This {} instance is not fitted yet. Call 'fit' with " - "appropriate arguments before using this method.") + msg = ( + "This {} instance is not fitted yet. Call 'fit' with " + "appropriate arguments before using this method." + ) raise NotFittedError(msg.format(self.__class__.__name__)) for column in tqdm(column_names, desc="Applying target encoding..."): if column not in data.columns: - log.warning("Unknown column '{}' will be skipped." - .format(column)) + log.warning("Unknown column '{}' will be skipped.".format(column)) continue elif column not in self._mapping: - log.warning("Column '{}' is not in fitted output " - "and will be skipped.".format(column)) + log.warning( + "Column '{}' is not in fitted output " + "and will be skipped.".format(column) + ) continue data = self._transform_column(data, column) return data - def _transform_column(self, data: pd.DataFrame, - column_name: str) -> pd.DataFrame: + def _transform_column(self, data: pd.DataFrame, column_name: str) -> pd.DataFrame: """Replace (e.g. encode) values of a categorical column with a new value (reflecting the corresponding average target value, optionally smoothed by a regularization weight), @@ -265,8 +274,9 @@ def _transform_column(self, data: pd.DataFrame, # Convert dtype to float, because when the original dtype # is of type "category", the resulting dtype would otherwise also be of # type "category": - data[new_column] = (data[column_name].map(self._mapping[column_name]) - .astype("float")) + data[new_column] = ( + data[column_name].map(self._mapping[column_name]).astype("float") + ) # In case of categorical data, it could be that new categories will # emerge which were not present in the train set, so this will result @@ -274,20 +284,17 @@ def _transform_column(self, data: pd.DataFrame, # configured imputation strategy: if data[new_column].isnull().sum() > 0: if self.imputation_strategy == "mean": - data[new_column].fillna(self._global_mean, - inplace=True) + data[new_column].fillna(self._global_mean, inplace=True) elif self.imputation_strategy == "min": - data[new_column].fillna(data[new_column].min(), - inplace=True) + data[new_column].fillna(data[new_column].min(), inplace=True) elif self.imputation_strategy == "max": - data[new_column].fillna(data[new_column].max(), - inplace=True) + data[new_column].fillna(data[new_column].max(), inplace=True) return data - def fit_transform(self, data: pd.DataFrame, - column_names: list, - target_column: str) -> pd.DataFrame: + def fit_transform( + self, data: pd.DataFrame, column_names: list, target_column: str + ) -> pd.DataFrame: """Fit the encoder and transform the data. Parameters @@ -309,8 +316,11 @@ def fit_transform(self, data: pd.DataFrame, @staticmethod def _clean_column_name(column_name: str) -> str: - """Generate a name for the new column that this target encoder - generates in the given data, by removing "_bin", "_processed" or + """ + Generate a clean name. + + Cleans the name generated by the target encoder + in the given data, by removing "_bin", "_processed" or "_cleaned" from the original categorical column, and adding "_enc". Parameters diff --git a/cobra/utils.py b/cobra/utils.py index f394caf..4efee0d 100644 --- a/cobra/utils.py +++ b/cobra/utils.py @@ -1,7 +1,13 @@ +"""Cobra utils.""" + + def clean_predictor_name(predictor_name: str) -> str: - """Strip the redundant suffix (e.g. "_enc" or "_bin") off from the end - of the predictor name to return a clean version of the predictor """ - return (predictor_name.replace("_enc", "") - .replace("_bin", "") - .replace("_processed", "")) + Clean the predictor name. + + This is done by stripping the redundant suffix (e.g. "_enc" or "_bin") off + from the end of the predictor name to return a clean version of the predictor + """ + return ( + predictor_name.replace("_enc", "").replace("_bin", "").replace("_processed", "") + ) diff --git a/cobra/version.py b/cobra/version.py index 545d07d..a82b376 100644 --- a/cobra/version.py +++ b/cobra/version.py @@ -1 +1 @@ -__version__ = "1.1.1" \ No newline at end of file +__version__ = "1.1.1" diff --git a/requirements.dev.txt b/requirements.dev.txt index 3d87710..dc2121b 100644 --- a/requirements.dev.txt +++ b/requirements.dev.txt @@ -1,6 +1,7 @@ +black>=22.3.0 mypy>=0.942 pycodestyle>=2.8.0 pydocstyle>=6.1.1 -pylint>=2.13.7 pytest>=7.1.1 -pytest-mock>=3.7.0 \ No newline at end of file +pytest-mock>=3.7.0 +pytest-cov>=3.0.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..15fbabe --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[pycodestyle] +max-line-length = 120 \ No newline at end of file