diff --git a/docs/refs.bib b/docs/refs.bib index 51c2860ca..5258734ca 100644 --- a/docs/refs.bib +++ b/docs/refs.bib @@ -425,4 +425,13 @@ @article{yeh2009comparisons @misc{uscode2011title15chapter41subchapteriv, title={United States Code 2011 Edition - Title 15 Commerce and Trade - Chapter 41 Consumer Credit Protection - Subchapter IV—Equal Credit Opportunity}, url={https://www.govinfo.gov/content/pkg/USCODE-2011-title15/html/USCODE-2011-title15-chap41-subchapIV.htm} -} \ No newline at end of file +} + +@misc{cruz2023unprocessing, + title={Unprocessing Seven Years of Algorithmic Fairness}, + author={Andr\'{e} F. Cruz and Moritz Hardt}, + year={2023}, + eprint={2306.07261}, + archivePrefix={arXiv}, + primaryClass={cs.LG} +} diff --git a/examples/plot_relaxed_equalized_odds.py b/examples/plot_relaxed_equalized_odds.py new file mode 100644 index 000000000..5ad35afa2 --- /dev/null +++ b/examples/plot_relaxed_equalized_odds.py @@ -0,0 +1,191 @@ +# Copyright (c) Fairlearn contributors. +# Licensed under the MIT License. + +""" +========================================== +RelaxedThresholdOptimizer with Census Data +========================================== +""" + +# %% +# Load and preprocess the data set +# -------------------------------- +# We download the data set using `fetch_adult` function in +# `fairlearn.datasets`. We start by importing the various modules we're going +# to use: +# + +import numpy as np +import pandas as pd +from sklearn import metrics as skm +from sklearn.ensemble import GradientBoostingClassifier +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder, StandardScaler + +from fairlearn.datasets import fetch_adult +from fairlearn.metrics import ( + MetricFrame, + equalized_odds_difference, + true_positive_rate, + false_positive_rate, + count, + plot_model_comparison, +) + +# %% +# We can now load and inspect the data by using the `fairlearn.datasets` module: + +data = fetch_adult() +X_raw = data.data +Y = (data.target == ">50K") * 1 +X_raw + +# %% +# We are going to treat the sex of each individual as a sensitive feature +# (where 0 indicates female and 1 indicates male), and in this particular case +# we are going separate this feature out and drop it from the main data. We +# then perform some standard data preprocessing steps to convert the data into +# a format suitable for the ML algorithms + +A = X_raw["sex"] +X = pd.get_dummies(X_raw) + +sc = StandardScaler() +X_scaled = sc.fit_transform(X) +X_scaled = pd.DataFrame(X_scaled, columns=X.columns) + +le = LabelEncoder() +Y = le.fit_transform(Y) + +# %% +# Finally, we split the data into training, validation, and test sets: +X_train, X_other, Y_train, Y_other, A_train, A_other = train_test_split( + X_scaled, Y, A, test_size=0.4, random_state=0, stratify=Y, +) + +# Split (X_other, Y_other, A_other) into validation and test +X_test, X_val, Y_test, Y_val, A_test, A_val = train_test_split( + X_other, Y_other, A_other, test_size=0.5, random_state=0, stratify=Y_other, +) + +# Work around indexing bug +X_train = X_train.reset_index(drop=True) +A_train = A_train.reset_index(drop=True) +X_test = X_test.reset_index(drop=True) +A_test = A_test.reset_index(drop=True) +X_val = X_val.reset_index(drop=True) +A_val = A_val.reset_index(drop=True) + +# %% +# Training a fairness-unaware predictor +# ------------------------------------- +# To show the effect of Fairlearn we will first train a standard ML predictor +# that does not incorporate fairness. For speed of demonstration, we use the +# simple :class:`sklearn.linear_model.LogisticRegression` class: + +unmitigated_predictor = GradientBoostingClassifier(n_estimators=500) + +# %%time +unmitigated_predictor.fit(X_train, Y_train) + +# %% +# Compute predictions +y_test_pred_scores = unmitigated_predictor.predict_proba(X_test)[:, -1] +y_test_pred_binary = y_test_pred_scores >= 0.5 # threshold = 0.5 + +# %% +# We can start to assess the predictor's fairness using the `MetricFrame`: +metric_frame = MetricFrame( + metrics={ + "accuracy": skm.accuracy_score, + "true_positive_rate": true_positive_rate, + "false_positive_rate": false_positive_rate, + "count": count, + }, + sensitive_features=A_test, + y_true=Y_test, + y_pred=y_test_pred_binary, +) +print(metric_frame.overall) +print(metric_frame.by_group) +metric_frame.by_group.plot.bar( + subplots=True, + layout=[4, 1], + legend=False, + figsize=[12, 8], + title="Accuracy and error-rates rate by group", +) + + +# %% +unmitigated_equalized_odds_diff = equalized_odds_difference( + y_true=Y_test, y_pred=y_test_pred_binary, sensitive_features=A_test, +) + +print(f"Equalized odds difference for unmitigated classifier: {unmitigated_equalized_odds_diff:.3}") + +# %% +from fairlearn.postprocessing._cvxpy_threshold_optimizer import _RelaxedThresholdOptimizer + +fair_clf = _RelaxedThresholdOptimizer( + # predictor=unmitigated_predictor, # TODO: use this when we no longer rely on callables + # predict_method="predict_proba", + predictor=lambda *args, **kwargs: unmitigated_predictor.predict(*args, **kwargs), + predict_method="__call__", + constraint="equalized_odds", + tolerance=0, +) + +# %% +fair_clf.fit(X_val, Y_val, sensitive_features=A_val) + + +# %% +y_test_pred_postprocessed = fair_clf.predict(X_test, sensitive_features=A_test) + +# %% +postprocessed_equalized_odds_diff = equalized_odds_difference( + y_true=Y_test, y_pred=y_test_pred_postprocessed, sensitive_features=A_test, +) + +print(f"Equalized odds difference after postprocessing: {postprocessed_equalized_odds_diff:.3}") + +# %% +# Add the unconstrained/unmitigated classifier predictions +all_model_predictions = {"unconstrained": y_test_pred_binary} + + +# Helper to get different thresholdings for different tolerance values +def compute_test_predictions_with_relaxed_constraints(tolerance: float) -> np.ndarray: + # Instantiate + clf = _RelaxedThresholdOptimizer( + predictor=lambda *args, **kwargs: unmitigated_predictor.predict(*args, **kwargs), + predict_method="__call__", + constraint="equalized_odds", + tolerance=tolerance, + random_state=23, + ) + + # Fit + clf.fit(X_train, Y_train, sensitive_features=A_train) + + return clf.predict(X_test, sensitive_features=A_test) + + +# Compute predictions at different levels of tolerance +all_model_predictions.update({ + f"train tolerance={tol:.1}": compute_test_predictions_with_relaxed_constraints(tol) + for tol in np.arange(0, unmitigated_equalized_odds_diff, 1e-2) +}) + +# %% +# Plot all models in the fairness-accuracy landscape +plot_model_comparison( + x_axis_metric=skm.accuracy_score, + y_axis_metric=equalized_odds_difference, + y_true=Y_test, + y_preds=all_model_predictions, + sensitive_features=A_test, + point_labels=True, + show_plot=True, +) diff --git a/fairlearn/postprocessing/_constants.py b/fairlearn/postprocessing/_constants.py index 101718589..108b93fb8 100644 --- a/fairlearn/postprocessing/_constants.py +++ b/fairlearn/postprocessing/_constants.py @@ -13,6 +13,10 @@ "Please make sure to install fairlearn[customplots] to use " "the postprocessing plots." ) +_CVXPY_IMPORT_ERROR_MESSAGE = ( + "Please make sure to install `cvxpy` to use postprocessing with relaxed " + "fairness constraint fulfillment." +) BASE_ESTIMATOR_NONE_ERROR_MESSAGE = "The base estimator cannot be `None`." BASE_ESTIMATOR_NOT_FITTED_WARNING = ( "The value of `prefit` is `True`, but `check_is_fitted` raised `NotFittedError` on" diff --git a/fairlearn/postprocessing/_cvxpy_threshold_optimizer.py b/fairlearn/postprocessing/_cvxpy_threshold_optimizer.py new file mode 100644 index 000000000..93094493a --- /dev/null +++ b/fairlearn/postprocessing/_cvxpy_threshold_optimizer.py @@ -0,0 +1,662 @@ +# Copyright (c) Fairlearn contributors. +# Licensed under the MIT License. + +"""Threshold optimizer with relaxed fairness constraints. + +TODO +---- +- Add option for constraining equality of positive predictions (independence + criterion, aka demographic parity); +- Add option to use l1 or other distance functions for maximum tolerance between + points (currently l-inf is in use). + +TODO for PR #1248 +----------------- +- [ ] Adapt all data inputs to common fairlearn accepted types (e.g., numpy + arrays, pandas DFs, lists, ...) +- [ ] Try to substitute the classifier helpers under `_randomized_classifiers` + with the InterpolatedThresholder. + - Triangulating target ROC points should still be necessary, but the + returned classifier can now be of type `InterpolatedThresholder` instead + of our own classifier types. +- [ ] Currently the use of our `_randomized_classifiers` API is only compatible + with `predict_method="__call__"`, this should be fixed either in our API + our by substituting out classes with the `InterpolatedThresholder`. + +""" +from __future__ import annotations + +import logging +from typing import Any +from itertools import product +from collections import abc + +import numpy as np +from sklearn.metrics import roc_curve +from sklearn.base import BaseEstimator, MetaEstimatorMixin + +from fairlearn.utils._input_validation import _validate_and_reformat_input + +from fairlearn.utils._common import _get_soft_predictions +from fairlearn.utils._common import unpack_fp_fn_costs + +from ._cvxpy_utils import ( + compute_fair_optimum, + ALL_CONSTRAINTS, + NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE, +) +from ._roc_utils import ( + roc_convex_hull, + calc_cost_of_point, +) + +# TODO: try to use InterpolatedThreshold instead of our classifier API +from ._randomized_classifiers import ( + RandomizedClassifier, + EnsembleGroupwiseClassifiers, +) + + +class _RelaxedThresholdOptimizer(BaseEstimator, MetaEstimatorMixin): + r"""Class to encapsulate all the logic needed to compute the optimal + postprocessing to fulfill fairness constraints with some optional + tolerance. + + The method amounts to finding the set of (potentially randomized) + group-specific decision thresholds that maximize some objective (e.g., accuracy), + given a maximum tolerance (or slack) on the fairness constraint fulfillment. + + This optimization problem amounts to a Linear Program (LP) as detailed in + :footcite:ct:`cruz2023reductions`. Solving the LP requires installing + `cvxpy`. + + Read more in the :ref:`User Guide `. + + Parameters + ---------- + predictor : object + A prefit `scikit-learn compatible estimator `_ # noqa + whose output will be postprocessed. + The predictor should output real-valued scores, as postprocessing + results will be extremely poor when performed over binarized + predictions. + + constraint : str, default='equalized_odds' + Fairness constraint under which threshold optimization is performed. + Possible inputs currently are: + + 'equalized_odds' + match true positive and false positive rates across groups + + tolerance : float + The absolute tolerance for the equalized odds fairness constraint. + Will allow for at most `tolerance` distance between group-wise ROC + points (where distance is measured using l-infinity norm). Provided + value must be in range [0, 1] (closed interval). + + + objective_costs : dict, optional + A dictionary detailing the cost for false positives and false negatives, + of the form :code:`{'fp': , 'fn': }`. Will use the 0-1 + loss by default (maximum accuracy). + + grid_size : int, optional + The maximum number of ticks (points) in each group's ROC curve, by + default 1000. This corresponds to the maximum number of different + thresholds to use over a predictor. + + predict_method : {'auto', 'predict_proba', 'decision_function', 'predict'\ + }, default='auto' + + Defines which method of the ``estimator`` is used to get the output + values. + + 'auto' + use one of :code:`predict_proba`, :code:`decision_function`, or + :code:`predict`, in that order. + + 'predict_proba' + use the second column from the output of :code:`predict_proba`. + It is assumed that the second column represents the positive + outcome. + + 'decision_function' + use the raw values given by the :code:`decision_function`. + + 'predict' + use the hard values reported by the :code:`predict` method if + estimator is a classifier, and the regression values if + estimator is a regressor. + Warning: postprocessing may lead to poor results when using + :code:`predict_method='predict'` with classifiers, as that will + binarize predictions. + + random_state : int, optional + A random seed used for reproducibility when producing randomized + classifiers, by default None (default: non-reproducible behavior). + + Raises + ------ + ValueError + A ValueError will be raised if constructor arguments are not valid. + + Notes + ----- + The procedure for relaxed fairness constraint fulfillment is detailed in + :footcite:ct:`cruz2023reductions`. + + The underlying threshold optimization algorithm is based on + :footcite:ct:`hardt2016equality`. + + This method is also implemented in its + `standalone Python package `_. # noqa + + """ + + def __init__( + self, + *, + predictor: BaseEstimator, + constraint: str = "equalized_odds", + tolerance: float = 0.0, + objective_costs: dict = None, + grid_size: int = 1000, + predict_method: str = "auto", + random_state: int = None, + ): + # Save arguments + self.predictor = predictor + self.constraint = constraint + self.tolerance = tolerance + self.objective_costs = objective_costs or {"fp": 1, "fn": 1} + self.grid_size = grid_size + self.predict_method = predict_method + self.random_state = random_state + + # Validate constraint + if self.constraint not in ALL_CONSTRAINTS: + raise ValueError(NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE) + + # Validate constraint tolerance + if ( + not isinstance(self.tolerance, (float, int)) + or self.tolerance < 0 + or self.tolerance > 1 + ): + raise ValueError( + "Invalid `tolerance` provided: received " + f"tolerance={self.tolerance}, but value should be in range " + "[0, 1]." + ) + + # Check objective costs + unpack_fp_fn_costs(self.objective_costs) + + # Initialize instance variables + self._idx_to_sensitive: dict[int, Any] = None + self._sensitive_to_idx: dict[Any, int] = None + self._all_roc_data: dict = None + self._all_roc_hulls: dict = None + self._groupwise_roc_points: np.ndarray = None + self._global_roc_point: np.ndarray = None + self._global_prevalence: float = None + self._realized_classifier: EnsembleGroupwiseClassifiers = None + + @property + def false_pos_cost(self) -> np.ndarray: + fp_cost, _fn_cost = unpack_fp_fn_costs(self.objective_costs) + return fp_cost + + @property + def false_neg_cost(self) -> np.ndarray: + _fp_cost, fn_cost = unpack_fp_fn_costs(self.objective_costs) + return fn_cost + + @property + def groupwise_roc_points(self) -> np.ndarray: + return self._groupwise_roc_points + + @property + def global_roc_point(self) -> np.ndarray: + return self._global_roc_point + + def cost( + self, + *, + false_pos_cost: float = 1.0, + false_neg_cost: float = 1.0, + ) -> float: + """Compute the theoretical cost of the solution found. + + Use false_pos_cost=false_neg_cost=1 for the 0-1 loss (the standard error + rate), which amounts to maximizing accuracy. + + You can find the cost realized from the LP optimization by calling: + >>> obj.cost( + >>> false_pos_cost=obj.false_pos_cost, + >>> false_neg_cost=obj.false_neg_cost, + >>> ) + + Parameters + ---------- + false_pos_cost : float, optional + The cost of a FALSE POSITIVE error, by default 1. + false_neg_cost : float, optional + The cost of a FALSE NEGATIVE error, by default 1. + + Returns + ------- + float + The cost of the solution found. + """ + self._check_fit_status() + global_fpr, global_tpr = self.global_roc_point + + return calc_cost_of_point( + fpr=global_fpr, + fnr=1 - global_tpr, + prevalence=self._global_prevalence, + false_pos_cost=false_pos_cost, + false_neg_cost=false_neg_cost, + ) + + def constraint_violation(self) -> float: + """Constraint violation of the LP solution found. + + Returns + ------- + float + The fairness constraint violation. + """ + self._check_fit_status() + + if self.constraint not in ALL_CONSTRAINTS: + raise ValueError(NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE) + + if self.constraint == "equalized_odds": + return self.equalized_odds_violation() + + elif self.constraint.endswith("rate_parity"): + constraint_to_error_type = { + "true_positive_rate_parity": "fn", + "false_positive_rate_parity": "fp", + "true_negative_rate_parity": "fp", + "false_negative_rate_parity": "fn", + } + + return self.error_rate_parity_constraint_violation( + error_type=constraint_to_error_type[self.constraint], + ) + + else: + raise NotImplementedError( + "Standalone constraint violation not yet computed for " + f"constraint='{self.constraint}'." + ) + + def error_rate_parity_constraint_violation(self, error_type: str) -> float: + """Compute the theoretical violation of an error-rate parity constraint. + + Parameters + ---------- + error_type : str + One of the following values: + "fp", for false positive errors (FPR or TNR parity); + "fn", for false negative errors (TPR or FNR parity). + + Returns + ------- + float + The maximum constraint violation among all groups. + """ + self._check_fit_status() + valid_error_types = ("fp", "fn") + if error_type not in valid_error_types: + raise ValueError( + f"Invalid error_type='{error_type}', must be one of " + f"{valid_error_types}." + ) + + roc_idx_of_interest = 0 if error_type == "fp" else 1 + + return self._max_l_inf_between_points( + points=[ + roc_point[roc_idx_of_interest] + for roc_point in self.groupwise_roc_points + ], + ) + + def equalized_odds_violation(self) -> float: + """Compute the theoretical violation of the equalized odds constraint. + + That is, the maximum l-inf distance between the ROC point of any pair + of groups. + + Returns + ------- + float + The equal-odds constraint violation. + """ + self._check_fit_status() + + # Compute l-inf distance between each pair of groups + return self._max_l_inf_between_points( + points=self.groupwise_roc_points, + ) + + @staticmethod + def _max_l_inf_between_points(points: list[float | np.ndarray]) -> float: + # Number of points (should correspond to the number of groups) + n_points = len(points) + + # Compute l-inf distance between each pair of groups + l_inf_constraint_violation = [ + (np.linalg.norm(points[i] - points[j], ord=np.inf), (i, j)) + for i, j in product(range(n_points), range(n_points)) + if i < j + ] + + # Return the maximum + max_violation, (groupA, groupB) = max(l_inf_constraint_violation) + logging.info( + ( + "Maximum fairness violation is between " + "group=%d (p=%s) and " + "group=%d (p=%s);" + ), + groupA, + points[groupA], + groupB, + points[groupB], + ) + + return max_violation + + def _parse_sensitive_features( + self, + sensitive_features: abc.Iterable[abc.Hashable], + expect_prebuilt_mapping: bool = False, + ) -> np.ndarray: + """Convert the given sensitive_features to the expected format. + + Expected format is composed of integer elements, sequentially numbered, + starting at zero. That is, if there are four different sensitive groups, + these will take the values: [0, 1, 2, 3]. + + Parameters + ---------- + sensitive_features : numpy.ndarray, pandas.DataFrame, pandas.Series, or list + The sensitive features in any format (numeric, string, etc.). + Elements of `sensitive_features` must be hashable. + + expect_prebuilt_mapping : bool + Whether to expect the sensitive features mapping to have already + been built before this call. + + Returns + ------- + sensitive_features_numeric : np.ndarray[int] + The sensitive features in numeric format, with values sequentially + numbered from zero up to `num_groups-1`, [0, 1, ..., num_groups-1]. + """ + # Check if sensitive_features have the expected format + if ( + isinstance(sensitive_features, np.ndarray) + and np.issubdtype(sensitive_features.dtype, np.number) + and len(np.unique(sensitive_features)) == np.max(sensitive_features) + 1 + ): + return sensitive_features + + # Otherwise, convert to expected format + # Check if mapping has been built + if self._sensitive_to_idx is None: + if expect_prebuilt_mapping: + raise RuntimeError( + "Trying to parse `sensitive_features` but mapping has not " + "yet been built; must call `classifier.fit(...)` before." + ) + + self._build_sensitive_to_idx_mapping(sensitive_features) + + return np.array( + [self._sensitive_to_idx[sens_val] for sens_val in sensitive_features], + dtype=int, + ) + + def _build_sensitive_to_idx_mapping(self, sensitive_features): + """Build an inner mapping from sensitive feature names to indices.""" + if self._sensitive_to_idx is not None or self._idx_to_sensitive is not None: + logging.warning("Re-building sensitive feature map!") + + # Sorted unique groups + unique_groups = sorted(np.unique(sensitive_features)) + + # Mapping (index: int) -> (sensitive_value: Any) + self._idx_to_sensitive = dict(enumerate(unique_groups)) + + # Mapping (sensitive_value: Any) -> (index: int) + self._sensitive_to_idx = { + group: idx for idx, group in self._idx_to_sensitive.items() + } + + def fit( + self, + X, + y, + *, + sensitive_features, + y_scores: np.ndarray = None, + **kwargs, + ): + """Find the optimal fair postprocessing. + + That is, it finds the postprocessing that minimizes loss, while + fulfilling the (possibly relaxed) fairness constraint on the provided + data. + + Parameters + ---------- + X : numpy.ndarray or pandas.DataFrame + The feature matrix. + + y : numpy.ndarray, pandas.DataFrame, pandas.Series, or list + The input labels. + + sensitive_features : numpy.ndarray, pandas.DataFrame, pandas.Series, or list + The sensitive features (group membership) of each sample. + + y_scores : np.ndarray, optional + The pre-computed model predictions on this data. + + Returns + ------- + callable + Returns self. + """ + # Validate input + _, y, sensitive_feature_vector, _ = _validate_and_reformat_input( + X, + y, + sensitive_features=sensitive_features, + enforce_binary_labels=True, + ) + + # Parse sensitive_features to numeric format + sensitive_features = self._parse_sensitive_features(sensitive_feature_vector) + + # Compute group stats + self._global_prevalence = np.sum(y) / len(y) + + unique_groups = np.unique(sensitive_features) + num_groups = len(unique_groups) + if np.max(unique_groups) > num_groups - 1: + raise ValueError( + "Groups should be numbered starting at 0, and up to " + f"num_groups-1. Got {num_groups} groups, but max value is " + f"{np.max(unique_groups)} != num_groups-1 == {num_groups-1}." + ) + + # Relative group sizes for LN and LP samples + group_sizes_label_neg = np.array( + [np.sum(1 - y[sensitive_features == g]) for g in unique_groups] + ) + group_sizes_label_pos = np.array( + [np.sum(y[sensitive_features == g]) for g in unique_groups] + ) + + if np.sum(group_sizes_label_neg) + np.sum(group_sizes_label_pos) != len(y): + raise RuntimeError( + "Failed input validation. Are you using non-binary labels?" + ) + + # Convert to relative sizes + group_sizes_label_neg = group_sizes_label_neg.astype(float) / np.sum( + group_sizes_label_neg + ) + group_sizes_label_pos = group_sizes_label_pos.astype(float) / np.sum( + group_sizes_label_pos + ) + + # Compute predictions (if `y_scores` not provided) + if y_scores is None: + y_scores = _get_soft_predictions( + self.predictor, X, self.predict_method, **kwargs + ) + + else: + if not isinstance(y_scores, np.ndarray): + y_scores = np.array(y_scores) + + if y_scores.shape != y.shape: + raise ValueError( + f"`y_scores.shape={y_scores.shape}` must match labels shape " + f"`y.shape={y.shape}`;" + ) + + # Compute group-wise ROC curves + self._all_roc_data = dict() + for g in unique_groups: + group_filter = sensitive_features == g + + roc_curve_data = roc_curve( + y[group_filter], + y_scores[group_filter], + ) + + # Check if max_roc_ticks is exceeded + fpr, tpr, thrs = roc_curve_data + if self.grid_size is not None and len(fpr) > self.grid_size: + indices_to_keep = np.arange( + 0, len(fpr), len(fpr) / self.grid_size + ).astype(int) + + # Bottom-left (0,0) and top-right (1,1) points must be kept + indices_to_keep[-1] = len(fpr) - 1 + roc_curve_data = ( + fpr[indices_to_keep], + tpr[indices_to_keep], + thrs[indices_to_keep], + ) + + self._all_roc_data[g] = roc_curve_data + + # Compute convex hull of each ROC curve + self._all_roc_hulls = dict() + for g in unique_groups: + group_fpr, group_tpr, _group_thresholds = self._all_roc_data[g] + + curr_roc_points = np.stack((group_fpr, group_tpr), axis=1) + curr_roc_points = np.vstack( + (curr_roc_points, [1, 0]) + ) # Add point (1, 0) to ROC curve + + self._all_roc_hulls[g] = roc_convex_hull(curr_roc_points) + + # Find the group-wise optima that fulfill the fairness criteria + self._groupwise_roc_points, self._global_roc_point = compute_fair_optimum( + fairness_constraint=self.constraint, + groupwise_roc_hulls=self._all_roc_hulls, + tolerance=self.tolerance, + group_sizes_label_pos=group_sizes_label_pos, + group_sizes_label_neg=group_sizes_label_neg, + global_prevalence=self._global_prevalence, + false_positive_cost=self.false_pos_cost, + false_negative_cost=self.false_neg_cost, + ) + + # Construct each group-specific classifier + all_rand_clfs = { + g: RandomizedClassifier.construct_at_target_ROC( # TODO: check InterpolatedThresholder + predictor=self.predictor, + roc_curve_data=self._all_roc_data[g], + target_roc_point=self._groupwise_roc_points[g], + seed=self.random_state, + ) + for g in unique_groups + } + + # Construct the global classifier (can be used for all groups) + # TODO: check InterpolatedThresholder + self._realized_classifier = EnsembleGroupwiseClassifiers( + group_to_clf=all_rand_clfs + ) + return self + + def _check_fit_status(self, raise_error: bool = True) -> bool: + """Check whether this classifier has been fit on some data. + + Parameters + ---------- + raise_error : bool, optional + Whether to raise an error if the classifier is uninitialized + (otherwise will just return False), by default True. + + Returns + ------- + is_fit : bool + Whether the classifier was already fit on some data. + + Raises + ------ + RuntimeError + If `raise_error==True`, raises an error if the classifier is + uninitialized. + """ + if self._realized_classifier is None: + if not raise_error: + return False + + raise RuntimeError( + "This classifier has not yet been fitted to any data. " + "Call clf.fit(...) before this method." + ) + + return True + + def predict(self, X, *, sensitive_features) -> np.ndarray: + """Compute predicted binary labels using the fitted postprocessing. + + Parameters + ---------- + X : numpy.ndarray or pandas.DataFrame + The feature matrix. + + sensitive_features : numpy.ndarray, pandas.DataFrame, pandas.Series, or list + The sensitive features (group membership) of each sample. + + Returns + ------- + np.ndarray + The predicted binary labels. + """ + _, _, sensitive_features_vector, _ = _validate_and_reformat_input( + X=X, + sensitive_features=sensitive_features, + expect_y=False, + expect_sensitive_features=True, + ) + sensitive_features = self._parse_sensitive_features( + sensitive_features_vector, + expect_prebuilt_mapping=True, + ) + return self._realized_classifier(X, sensitive_features=sensitive_features) diff --git a/fairlearn/postprocessing/_cvxpy_utils.py b/fairlearn/postprocessing/_cvxpy_utils.py new file mode 100644 index 000000000..66827cf50 --- /dev/null +++ b/fairlearn/postprocessing/_cvxpy_utils.py @@ -0,0 +1,465 @@ +# Copyright (c) Fairlearn contributors. +# Licensed under the MIT License. + +"""A set of helper functions for defining cvxpy LP objective and constraints.""" + +from __future__ import annotations +import logging +from itertools import product + +import numpy as np + +from ._constants import _CVXPY_IMPORT_ERROR_MESSAGE +from ._roc_utils import calc_cost_of_point, compute_global_roc_from_groupwise + + +# Set of all fairness constraints with a cvxpy LP implementation +ALL_CONSTRAINTS = { + "equalized_odds", + "true_positive_rate_parity", + "false_positive_rate_parity", + "true_negative_rate_parity", + "false_negative_rate_parity", +} + +NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE = ( + "Currently only the following constraints are supported: {}.".format( + ", ".join(sorted(ALL_CONSTRAINTS)) + ) +) + + +# Maximum distance from solution to feasibility or optimality +SOLUTION_TOLERANCE = 1e-9 + + +def _import_cvxpy_if_available(): + """Will try to import `cvxpy` and raise an error if it's not installed.""" + try: + import cvxpy as cp # noqa + except ImportError: + raise RuntimeError(_CVXPY_IMPORT_ERROR_MESSAGE) + + +def compute_line(p1: np.ndarray, p2: np.ndarray) -> tuple[float, float]: + """Compute the slope and intercept of a line given two points. + + The intercept is the value at `x=0` (or NaN for vertical lines). + + For vertical lines just use the x-value of one of the points to find the + value of `x` at y=0 (intersection with the x-axis). + + Parameters + ---------- + p1 : np.ndarray + A 2-D point. + + p2 : np.ndarray + A 2-D point. + + Returns + ------- + tuple[float, float] + A tuple pair with (slope, intercept) of the line that goes through p1 + and p2. + + Raises + ------ + ValueError + Raised when input is invalid, e.g., when p1 == p2. + """ + p1x, p1y = p1 + p2x, p2y = p2 + if all(p1 == p2): + raise ValueError("Invalid points: p1==p2;") + + # Vertical line + if np.isclose(p2x, p1x): + slope = np.inf + intercept = np.nan + + # Diagonal or horizontal line + else: + slope = (p2y - p1y) / (p2x - p1x) + intercept = p1y - slope * p1x + + return slope, intercept + + +def compute_halfspace_inequality( + p1: np.ndarray, + p2: np.ndarray, +) -> tuple[float, float, float]: + """Compute the half-space inequality defined by the vector p1->p2. + + That is, computes the inequality that enforces that all points must lie on + the LEFT of the line defined by the p1->p2 vector. + + Will define the inequality in the form :math:`Ax + b <= 0`, and return a + tuple with :code:`(A_1, A_2, ..., b)` with shape :code:`n_dims + 1`. + + Input points are assumed to be in COUNTER CLOCK-WISE order (right-hand + rule). + + Parameters + ---------- + p1 : np.ndarray + A point in the half-space (or line for 2D). + p2 : np.ndarray + Another point in the half-space (or line for 2D). + + Returns + ------- + tuple[float, float, float] + Returns a tuple of :code:`length=(n_dims + 1)`, with format + :code:`(*A; b)`, representing the inequality :math:`Ax + b <= 0`. + + Raises + ------ + RuntimeError + Thrown in case of inconsistent internal state variables. + """ + slope, intercept = compute_line(p1, p2) + + # Unpack the points for ease of use + p1x, p1y = p1 + p2x, p2y = p2 + + # if slope is infinity, the constraint only applies to the values of x; + # > the halfspace's b intercept value will correspond to this value of x; + if np.isinf(slope): + # Validating vertical line + if not np.isclose(p1x, p2x): + raise RuntimeError( + "Got infinite slope for line containing two points with " + "different x-axis coordinates." + ) + + # Vector pointing downwards? then, x >= b + if p2y < p1y: + return [-1, 0, p1x] + + # Vector pointing upwards? then, x <= b + elif p2y > p1y: + return [1, 0, -p1x] + + # elif slope is zero, the constraint only applies to the values of y; + # > the halfspace's b intercept value will correspond to this value of y; + elif np.isclose(slope, 0.0): + # Validating horizontal line + if not np.isclose(p1y, p2y) or not np.isclose(p1y, intercept): + raise RuntimeError( + "Invalid horizontal line; points p1 and p2 should have same " + f"y-axis value as intercept ({p1y}, {p2y}, {intercept})." + ) + + # Vector pointing leftwards? then, y <= b + if p2x < p1x: + return [0, 1, -p1y] + + # Vector pointing rightwards? then, y >= b + elif p2x > p1x: + return [0, -1, p1y] + + # else, we have a standard diagonal line + else: + # Vector points left? + # then, y <= mx + b <=> -mx + y - b <= 0 + if p2x < p1x: + return [-slope, 1, -intercept] + + # Vector points right? + # then, y >= mx + b <=> mx - y + b <= 0 + elif p2x > p1x: + return [slope, -1, intercept] + + logging.error("No constraint can be concluded from points p1=%s and p2=%s;", p1, p2) + return [0, 0, 0] + + +def make_cvxpy_halfspace_inequality( + p1: np.ndarray, + p2: np.ndarray, + cvxpy_point, +): + """Create a `cvxpy` constraint to enforce a point to be inside a half-space. + + That is, creates a single cvxpy inequality constraint that enforces the + given variable/point, `cvxpy_point`, to lie on the left of the vector p1->p2 + (on the left of the half-space defined by the vector p1->p2). + + Points must be sorted in counter clock-wise order! + + Parameters + ---------- + p1 : np.ndarray + A point p1. + p2 : np.ndarray + Another point p2. + cvxpy_point : cvxpy.Variable + The cvxpy variable over which the constraint will be applied. + + Returns + ------- + cvxpy.Expression + A linear inequality constraint of type Ax + b <= 0. + """ + x_coeff, y_coeff, b = compute_halfspace_inequality(p1, p2) + return np.array([x_coeff, y_coeff]) @ cvxpy_point + b <= 0 + + +def make_cvxpy_point_in_polygon_constraints( + polygon_vertices: np.ndarray, + cvxpy_point, +) -> list: + """Create a set of `cvxpy` constraints for a point to be inside a polygon. + + That is, creates the set of :code:`cvxpy.Expression` constraints that + enforce the given :code:`cvxpy_point: cvxpy.Variable` to lie within the + polygon defined by the given :code:`polygon_vertices` vertices. + + Parameters + ---------- + polygon_vertices : np.ndarray + A sequence of points that make up a polygon. + Points must be sorted in COUNTER CLOCK-WISE order! (right-hand rule) + + cvxpy_point : cvxpy.Variable + A cvxpy variable representing a point, over which the constraints will + be applied. + + Returns + ------- + list[cvxpy.Expression] + A list of cvxpy constraints. + """ + return [ + make_cvxpy_halfspace_inequality( + polygon_vertices[i], + polygon_vertices[(i + 1) % len(polygon_vertices)], + cvxpy_point, + ) + for i in range(len(polygon_vertices)) + ] + + +def compute_fair_optimum( + *, + fairness_constraint: str, + tolerance: float, + groupwise_roc_hulls: dict[int, np.ndarray], + group_sizes_label_pos: np.ndarray, + group_sizes_label_neg: np.ndarray, + global_prevalence: float, + false_positive_cost: float = 1.0, + false_negative_cost: float = 1.0, +) -> tuple[np.ndarray, np.ndarray]: + """Compute the solution to finding the optimal fair (equal odds) classifier. + + Can relax the equal odds constraint by some given tolerance. + + Parameters + ---------- + fairness_constraint : str + The name of the fairness constraint under which the LP will be + optimized. Possible inputs are: + + 'equalized_odds' + match true positive and false positive rates across groups + + tolerance : float + A value for the tolerance when enforcing the fairness constraint. + + groupwise_roc_hulls : dict[int, np.ndarray] + A dict mapping each group to the convex hull of the group's ROC curve. + The convex hull is an np.array of shape (n_points, 2), containing the + points that form the convex hull of the ROC curve, sorted in COUNTER + CLOCK-WISE order. + + group_sizes_label_pos : np.ndarray + The relative or absolute number of positive samples in each group. + + group_sizes_label_neg : np.ndarray + The relative or absolute number of negative samples in each group. + + global_prevalence : float + The global prevalence of positive samples. + + false_positive_cost : float, optional + The cost of a FALSE POSITIVE error, by default 1. + + false_negative_cost : float, optional + The cost of a FALSE NEGATIVE error, by default 1. + + Returns + ------- + (groupwise_roc_points, global_roc_point) : tuple[np.ndarray, np.ndarray] + A tuple pair, (<1>, <2>), containing: + 1: an array with the group-wise ROC points for the solution. + 2: an array with the single global ROC point for the solution. + """ + _import_cvxpy_if_available() + import cvxpy as cp + + if fairness_constraint not in ALL_CONSTRAINTS: + raise ValueError(NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE) + + n_groups = len(groupwise_roc_hulls) + if n_groups != len(group_sizes_label_neg) or n_groups != len(group_sizes_label_pos): + raise ValueError( + "Invalid arguments; all of the following should have the same " + "length: groupwise_roc_hulls, group_sizes_label_neg, group_sizes_label_pos;" + ) + + # Group-wise ROC points + groupwise_roc_points_vars = [ + cp.Variable(shape=2, name=f"ROC point for group {i}", nonneg=True) + for i in range(n_groups) + ] + + # Define global ROC point as a linear combination of the group-wise ROC points + global_roc_point_var = cp.Variable(shape=2, name="Global ROC point", nonneg=True) + constraints = [ + # Global FPR is the average of group FPRs weighted by LNs in each group + global_roc_point_var[0] + == ( + group_sizes_label_neg @ np.array([p[0] for p in groupwise_roc_points_vars]) + ), + # Global TPR is the average of group TPRs weighted by LPs in each group + global_roc_point_var[1] + == ( + group_sizes_label_pos @ np.array([p[1] for p in groupwise_roc_points_vars]) + ), + ] + + # START OF: applying fairness constraints + # If "equalized_odds" + # > i.e., constrain l-inf distance between any two groups' ROCs being less than `tolerance` + if fairness_constraint == "equalized_odds": + constraints += [ + cp.norm_inf(groupwise_roc_points_vars[i] - groupwise_roc_points_vars[j]) + <= tolerance + for i, j in product(range(n_groups), range(n_groups)) + if i < j + ] + + # If some rate parity, i.e., parity of one of {TPR, FPR, TNR, FNR} + # i.e., constrain absolute distance between any two groups' rate metric + elif fairness_constraint.endswith("rate_parity"): + roc_idx_of_interest: int + if ( + fairness_constraint == "true_positive_rate_parity" + or fairness_constraint == "false_negative_rate_parity" + ): + roc_idx_of_interest = 1 + + elif ( + fairness_constraint == "false_positive_rate_parity" + or fairness_constraint == "false_negative_rate_parity" + ): + roc_idx_of_interest = 0 + + else: + # This point should never be reached as fairness constraint was previously validated + raise ValueError(NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE) + + constraints += [ + cp.abs( + groupwise_roc_points_vars[i][roc_idx_of_interest] + - groupwise_roc_points_vars[j][roc_idx_of_interest] + ) + <= tolerance + for i, j in product(range(n_groups), range(n_groups)) + if i < j + ] + + # TODO: implement other constraints here + else: + raise NotImplementedError(NOT_SUPPORTED_CONSTRAINTS_ERROR_MESSAGE) + # END OF: applying fairness constraints + + # Constraints for points in respective group-wise ROC curves + for idx in range(n_groups): + constraints += make_cvxpy_point_in_polygon_constraints( + polygon_vertices=groupwise_roc_hulls[idx], + cvxpy_point=groupwise_roc_points_vars[idx], + ) + + # Define cost function + obj = cp.Minimize( + calc_cost_of_point( + fpr=global_roc_point_var[0], + fnr=1 - global_roc_point_var[1], + prevalence=global_prevalence, + false_pos_cost=false_positive_cost, + false_neg_cost=false_negative_cost, + ) + ) + + # Define cvxpy problem + prob = cp.Problem(obj, constraints) + + # Run solver + prob.solve(solver=cp.ECOS, abstol=SOLUTION_TOLERANCE, feastol=SOLUTION_TOLERANCE) + # NOTE: these tolerances are supposed to be smaller than the default np.isclose tolerances + # (useful when comparing if two points are the same, within the cvxpy accuracy tolerance) + + # Log solution + logging.info( + "cvxpy solver took %fs; status is %s.", + prob.solver_stats.solve_time, + prob.status, + ) + + if prob.status not in ["infeasible", "unbounded"]: + # Otherwise, problem.value is inf or -inf, respectively. + logging.info("Optimal solution value: %s", prob.value) + for variable in prob.variables(): + logging.info("Variable %s: value %s", variable.name(), variable.value) + else: + # This line should never be reached (there are always trivial fair + # solutions in the ROC diagonal) + raise ValueError(f"cvxpy problem has no solution; status={prob.status}") + + groupwise_roc_points = np.vstack([p.value for p in groupwise_roc_points_vars]) + global_roc_point = global_roc_point_var.value + + # Validating solution cost + solution_cost = calc_cost_of_point( + fpr=global_roc_point[0], + fnr=1 - global_roc_point[1], + prevalence=global_prevalence, + false_pos_cost=false_positive_cost, + false_neg_cost=false_negative_cost, + ) + + if not np.isclose(solution_cost, prob.value): + logging.error( + ( + "Solution was found but cost did not pass validation! " + "Found solution ROC point %s with theoretical cost %s, " + "but actual cost is %s;" + ), + global_roc_point, + prob.value, + solution_cost, + ) + + # Validating congruency between group-wise ROC points and global ROC point + global_roc_from_groupwise = compute_global_roc_from_groupwise( + groupwise_roc_points=groupwise_roc_points, + groupwise_label_pos_weight=group_sizes_label_pos, + groupwise_label_neg_weight=group_sizes_label_neg, + ) + if not all(np.isclose(global_roc_from_groupwise, global_roc_point)): + logging.error( + ( + "Solution: global ROC point (%s) does not seem to " + "match group-wise ROC points; global should be " + "(%s) to be consistent with group-wise;" + ), + global_roc_point, + global_roc_from_groupwise, + ) + + return groupwise_roc_points, global_roc_point diff --git a/fairlearn/postprocessing/_plotting.py b/fairlearn/postprocessing/_plotting.py index a5a608d42..111d5269f 100644 --- a/fairlearn/postprocessing/_plotting.py +++ b/fairlearn/postprocessing/_plotting.py @@ -7,6 +7,7 @@ from ._constants import _MATPLOTLIB_IMPORT_ERROR_MESSAGE from ._threshold_optimizer import ThresholdOptimizer +from ._cvxpy_threshold_optimizer import _RelaxedThresholdOptimizer _debug_colors = None _debug_ncolors = 10 @@ -63,7 +64,17 @@ def _plot_curve(ax, sensitive_feature, x_col, y_col, points): def _raise_if_not_threshold_optimizer(obj): - if not isinstance(obj, ThresholdOptimizer): + if isinstance(obj, ThresholdOptimizer): + return # OK + + elif isinstance(obj, _RelaxedThresholdOptimizer): + # TODO: implement plotting functionality for postprocessing w/ relaxed constraints + raise NotImplementedError( + "Plotting functionality is not yet implemented for objects of " + f"type {_RelaxedThresholdOptimizer.__name__}." + ) + + else: raise ValueError( "Argument {} needs to be of type {}.".format( obj.__name__, ThresholdOptimizer.__name__ diff --git a/fairlearn/postprocessing/_randomized_classifiers.py b/fairlearn/postprocessing/_randomized_classifiers.py new file mode 100644 index 000000000..1083a2212 --- /dev/null +++ b/fairlearn/postprocessing/_randomized_classifiers.py @@ -0,0 +1,523 @@ +# Copyright (c) Fairlearn contributors. +# Licensed under the MIT License. + +"""Helper functions to construct and use randomized classifiers. + +TODO: this module will probably be substituted by the InterpolatedThresholder +already implemented in fairlearn. + +""" +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod +from typing import Callable + +import numpy as np +from scipy.spatial import ConvexHull + + +class _Classifier(ABC): + """Public API for a classifier.""" + + @abstractmethod + def __call__( + self, X: np.ndarray, *, sensitive_features: np.ndarray = None + ) -> np.ndarray: + """Return predicted class, Y, for the given input features, X.""" + raise NotImplementedError + + +class BinaryClassifier(_Classifier): + """A deterministic binary classifier.""" + + def __init__( + self, + score_predictor: Callable, + threshold: float, + ): + """Construct a binary classifier by thresholding score predictor. + + Parameters + ---------- + score_predictor : Callable + A real-valued score predictor. + + threshold : float + A threshold value. + """ + self.score_predictor = score_predictor + self.threshold = threshold + + def __call__( + self, X: np.ndarray, *, sensitive_features: np.ndarray = None + ) -> np.ndarray: + """Compute predictions for the given samples, X. + + Parameters + ---------- + X : np.ndarray + The input samples, in shape (num_samples, num_features). + sensitive_features : None, optional + None. This argument will be ignored by this classifier as it does + not consider sensitive attributes. + + Returns + ------- + y_pred_binary : np.ndarray[int] + The predicted class for each input sample. + """ + return (self.score_predictor(X) >= self.threshold).astype(int) + + +class BinaryClassifierAtROCDiagonal(_Classifier): + """A dummy classifier with FPR=TPR. + + That is, a classifier whose predictions have no correlation with the input + features, but achieves whichever target FPR or TPR you want (on ROC + diagonal). + """ + + def __init__( + self, + target_fpr: float = None, + target_tpr: float = None, + seed: int = 42, + ): + err_msg = ( + "Must provide exactly one of 'target_fpr' or 'target_tpr', " + f"got target_fpr={target_fpr}, target_tpr={target_tpr}." + ) + if target_fpr is not None and target_tpr is not None: + raise ValueError(err_msg) + + # Provided FPR + if target_fpr is not None: + self.target_fpr = target_fpr + self.target_tpr = target_fpr + + # Provided TPR + elif target_tpr is not None: + self.target_tpr = target_tpr + self.target_fpr = target_tpr + + # Provided neither! + else: + raise ValueError(err_msg) + + # Initiate random number generator + self.rng = np.random.default_rng(seed) + + def __call__( + self, + X: np.ndarray, + *, + sensitive_features: np.ndarray = None, + ) -> np.ndarray: + """Compute (randomized) predictions. + + Parameters + ---------- + X : np.ndarray + Input features (will be ignored as predictions are random). + + sensitive_features : np.ndarray, optional + Sensitive features (will be ignored if passed). + + Returns + ------- + np.ndarray + The predicted classes for each input sample. + """ + return (self.rng.random(size=len(X)) >= (1 - self.target_fpr)).astype(int) + + +class EnsembleGroupwiseClassifiers(_Classifier): + """Construct a classifier from a set of group-specific classifiers.""" + + def __init__(self, group_to_clf: dict[int | str, Callable]): + """Construct a classifier from a set of group-specific classifiers. + + Must be provided exactly one classifier per unique group value. + + Parameters + ---------- + group_to_clf : dict[int | str, callable] + A mapping of group value to the classifier that should handle + predictions for that specific group. + """ + self.group_to_clf = group_to_clf + + def __call__(self, X: np.ndarray, *, sensitive_features: np.ndarray) -> np.ndarray: + """Compute predictions for the given input samples. + + For the given samples :code:`X` and their sensitive group membership + :code:`sensitive_features`, compute their predicted classes using + group-specific classifiers. + + Parameters + ---------- + X : np.ndarray + Input samples, with shape (num_samples, num_features). + + group : np.ndarray, optional + The sensitive attribute value for each input sample. + + Returns + ------- + y_pred : np.ndarray + The predictions, where the prediction for each sample is handed off + to a group-specific classifier for that sample. + """ + if len(X) != len(sensitive_features): + raise ValueError("Invalid input sizes len(X) != len(group)") + + # Array to store predictions + num_samples = len(X) + y_pred = np.zeros(num_samples) + + # Filter to keep track of all samples that received a prediction + cumulative_filter = np.zeros(num_samples).astype(bool) + + for group_value, group_clf in self.group_to_clf.items(): + group_filter = sensitive_features == group_value + y_pred[group_filter] = group_clf(X[group_filter]) + cumulative_filter |= group_filter + + if np.sum(cumulative_filter) != num_samples: + raise RuntimeError( + f"Computed group-wise predictions for {np.sum(cumulative_filter)} " + f"samples, but got {num_samples} input samples." + ) + + return y_pred + + +class RandomizedClassifier(_Classifier): + """A randomized classifier that interpolates multiple classifiers.""" + + def __init__( + self, + classifiers: list[Callable], + probabilities: list[float], + seed: int = 42, + ): + """Construct a randomized classifier that interpolates multiple classifiers. + + Parameters + ---------- + classifiers : list[Callable] + A list of classifiers + probabilities : list[float] + A list of probabilities for each given classifier, where + probabilities[idx] is the probability of using the prediction from + classifiers[idx]. + seed : int, optional + A random seed, by default 42. + + Returns + ------- + Callable + The corresponding randomized classifier. + """ + if len(classifiers) != len(probabilities): + raise ValueError( + "Invalid arguments: len(classifiers) != len(probabilities); " + f"({len(classifiers)} != {len(probabilities)});" + ) + + self.classifiers = classifiers + self.probabilities = probabilities + self.rng = np.random.default_rng(seed) + + def __call__(self, X: np.ndarray, *, sensitive_features: np.ndarray = None) -> int: + """Compute interpolated predictions.""" + # Assign each sample to a classifier + clf_idx = self.rng.choice( + np.arange(len(self.classifiers)), # possible choices + size=len(X), # size of output array + p=self.probabilities, # prob. of each choice + ) + + # Run predictions for all classifiers on all samples + y_pred_choices = [clf(X) for clf in self.classifiers] + # TODO: + # we could actually just run the classifier for the samples that get + # matched with it... similar to the EnsembleGroupwiseClassifiers call + # method. + + return np.choose(clf_idx, y_pred_choices) + + @staticmethod + def find_weights_given_two_points( + point_A: np.ndarray, + point_B: np.ndarray, + target_point: np.ndarray, + ): + """Find the interpolation of points that achieves the target point. + + That is, given two ROC points corresponding to existing binary + classifiers, :code:`point_A, point_B`, finds the weights that result in + a classifier whose ROC point is the target :code:`target_point`. + + May need to interpolate the two given points with a third point corresponding + to a random classifier (random uniform distribution with different thresholds). + + Returns + ------- + tuple[np.ndarray, np.ndarray] + Returns a tuple of numpy arrays (Ws, Ps), such that Ws @ Ps == target_point. + The 1st array, Ws, corresponds to the weights of each point in the 2nd array, Ps. + """ + # Check if the target point is actually point A or B + if all(np.isclose(point_A, target_point)): + return np.array([1]), np.expand_dims(point_A, axis=0) + + if all(np.isclose(point_B, target_point)): + return np.array([1]), np.expand_dims(point_B, axis=0) + + # If not, we'll have to triangulate the target using A and B + point_A_fpr, point_A_tpr = point_A + point_B_fpr, point_B_tpr = point_B + target_fpr, target_tpr = target_point + if not (point_A_fpr <= target_fpr <= point_B_fpr): + raise ValueError( + "Invalid input. FPR should fulfill: " + f"({point_A_fpr} point_A_FPR) <= ({target_fpr} target_fpr) <= " + f"({point_B_fpr} point_B_fpr)" + ) + + # Calculate weights for points A and B + weight_A = (target_fpr - point_B_fpr) / (point_A_fpr - point_B_fpr) + + # Result of projecting target point P directly UPWARDS towards the AB line + weights_AB = np.array([weight_A, 1 - weight_A]) + point_P_upwards = weights_AB @ np.vstack((point_A, point_B)) + if not np.isclose(point_P_upwards[0], target_fpr): + raise RuntimeError( + "Failed projecting target_fpr to ROC hull frontier. " + f"Got proj. FPR={point_P_upwards[0]}; target FPR={target_fpr};" + ) + + # Check if the target point lies in the AB line (and return if so) + if all(np.isclose(point_P_upwards, target_point)): + return weights_AB, np.vstack((point_A, point_B)) + + # Result of projecting target point P directly DOWNWARDS towards the diagonal tpr==fpr + point_P_downwards = np.array([target_fpr, target_fpr]) + + # Calculate weights for P upwards and P downwards + weight_P_upwards = (target_tpr - point_P_downwards[1]) / ( + point_P_upwards[1] - point_P_downwards[1] + ) + + # Validating triangulation results + all_points = np.vstack((point_A, point_B, point_P_downwards)) + all_weights = np.hstack((weight_P_upwards * weights_AB, 1 - weight_P_upwards)) + + if not np.isclose(all_weights.sum(), 1): + raise RuntimeError( + f"Sum of linear interpolation weights was {all_weights.sum()}, " + "should be 1!" + ) + + if not all(np.isclose(target_point, all_weights @ all_points)): + raise RuntimeError( + "Triangulation of target point failed. " + f"Target was {target_point}; got {all_weights @ all_points}." + ) + + return all_weights, all_points + + @staticmethod + def construct_at_target_ROC( + predictor: Callable, + roc_curve_data: tuple, + target_roc_point: np.ndarray, + seed: int = 42, + ) -> "RandomizedClassifier": + """Construct a classifier at the target ROC point. + + That is, constructs a (possibly randomized) classifier in the interior + of the convex hull of the predictor's ROC curve, at a given target ROC + point. + + Parameters + ---------- + predictor : Callable + A predictor that outputs real-valued scores in range [0; 1]. + + roc_curve_data : tuple[np.array...] + The ROC curve of the given classifier, as a tuple of + (FPR values; TPR values; threshold values). + + target_roc_point : np.ndarray + The target ROC point in (FPR, TPR). + + Returns + ------- + rand_clf : Callable + A (randomized) binary classifier whose expected FPR and TPR + corresponds to the given target ROC point. + """ + # Unpack useful constants + target_fpr, target_tpr = target_roc_point + fpr, tpr, thrs = roc_curve_data + + # Check if we have more than two ROC points + # (3 minimum to compute convex hull) + if len(fpr) <= 1: + raise ValueError( + f"Invalid ROC curve data (only has one point): fpr:{fpr}; tpr:{tpr}." + ) + + if len(fpr) == 2: + logging.warning( + "Got ROC data with only 2 points: producing a random classifier..." + ) + if not np.isclose(target_roc_point[0], target_roc_point[1]): + logging.error( + ( + "Invalid target ROC point (%s) is not in diagonal ROC line, " + "but a random-classifier ROC was provided." + ), + target_roc_point, + ) + + return BinaryClassifierAtROCDiagonal(target_fpr=target_roc_point[0]) + + # Compute hull of ROC curve + roc_curve_points = np.stack((fpr, tpr), axis=1) + hull = ConvexHull(roc_curve_points) + + # Filter out ROC points in the interior of the convex hull and other suboptimal points + points_above_diagonal = np.argwhere(tpr >= fpr).ravel() + useful_points_idx = np.array( + sorted(set(hull.vertices) & set(points_above_diagonal)) + ) + + fpr = fpr[useful_points_idx] + tpr = tpr[useful_points_idx] + thrs = thrs[useful_points_idx] + + # Find points A and B to construct the randomized classifier from + # > point A is the last point with FPR smaller or equal to the target + point_A_idx = 0 + if target_fpr > 0: + point_A_idx = max(np.argwhere(fpr <= target_fpr).ravel()) + point_A_roc = roc_curve_points[useful_points_idx][point_A_idx] + + # > point B is the first point with FPR larger than the target + point_B_idx = min(point_A_idx + 1, len(thrs) - 1) + point_B_roc = roc_curve_points[useful_points_idx][point_B_idx] + + weights, points = RandomizedClassifier.find_weights_given_two_points( + point_A=point_A_roc, + point_B=point_B_roc, + target_point=target_roc_point, + ) + + if max(weights) > 1: + logging.error("Got triangulation weights over 100%: w=%s;", weights) + + # Instantiate classifiers for points A and B + clf_a = BinaryClassifier(predictor, threshold=thrs[point_A_idx]) + clf_b = BinaryClassifier(predictor, threshold=thrs[point_B_idx]) + + # Check if most of the probability mass is on a single classifier + if np.isclose(max(weights), 1.0): + if all(np.isclose(target_roc_point, point_A_roc)): + return clf_a + + elif all(np.isclose(target_roc_point, point_B_roc)): + return clf_b + + else: + # differences from target point to A or B are significant enough + # to warrant triangulating between multiple points + pass + + # If only one point returned, then that point should have weight==1.0 + # (hence, should've been caught by the previous if statement) + if len(weights) == 1: + raise RuntimeError("Invalid triangulation.") + + # If there are two points, return a randomized classifier between the two + elif len(weights) == 2: + return RandomizedClassifier( + classifiers=[clf_a, clf_b], + probabilities=weights, + seed=seed, + ) + + # If it's in the interior of the ROC curve, requires instantiating a + # randomized classifier at the diagonal + elif len(weights) == 3: + fpr_rand, tpr_rand = points[2] + if not np.isclose(fpr_rand, tpr_rand): + raise RuntimeError( + "Triangulation point at ROC diagonal has FPR != TPR " + f"({fpr_rand} != {tpr_rand}); " + ) + + # >>> BUG this would be better but for some reason it doesn't work! + # rng = np.random.default_rng(42) + # clf_rand = lambda X: (rng.random(size=len(X)) >= (1 - fpr_rand)).astype(int) + # # or... + # clf_rand = BinaryClassifierAtROCDiagonal(target_fpr=fpr_rand) + # <<< + def clf_rand(X): + return (np.random.random(size=len(X)) >= (1 - fpr_rand)).astype(int) + + return RandomizedClassifier( + classifiers=[clf_a, clf_b, clf_rand], probabilities=weights, seed=seed + ) + + else: + raise RuntimeError( + "Invalid triangulation of classifiers; " + f"weights: {weights}; points: {points};" + ) + + @staticmethod + def find_points_for_target_ROC(roc_curve_data, target_roc_point): + """Retrieve the realizable points that interpolate the target ROC point. + + That is, retrieves a set of realizable points (and respective weights) + in the provided ROC curve that can be used to realize any target ROC in + the interior of the ROC curve. + + NOTE: this method is a bit redundant -- has functionality in common with + RandomizedClassifier.construct_at_target_ROC() + """ + # Unpack useful constants + target_fpr, target_tpr = target_roc_point + fpr, tpr, thrs = roc_curve_data + + # Compute hull of ROC curve + roc_curve_points = np.stack((fpr, tpr), axis=1) + hull = ConvexHull(roc_curve_points) + + # Filter out ROC points in the interior of the convex hull and other suboptimal points + points_above_diagonal = np.argwhere(tpr >= fpr).ravel() + useful_points_idx = np.array( + sorted(set(hull.vertices) & set(points_above_diagonal)) + ) + + fpr = fpr[useful_points_idx] + tpr = tpr[useful_points_idx] + thrs = thrs[useful_points_idx] + + # Find points A and B to construct the randomized classifier from + # > point A is the last point with FPR smaller or equal to the target + point_A_idx = max(np.argwhere(fpr <= target_fpr).ravel()) + # > point B is the first point with FPR larger than the target + point_B_idx = point_A_idx + 1 + + weights, points = RandomizedClassifier.find_weights_given_two_points( + point_A=roc_curve_points[useful_points_idx][point_A_idx], + point_B=roc_curve_points[useful_points_idx][point_B_idx], + target_point=target_roc_point, + ) + + return weights, points diff --git a/fairlearn/postprocessing/_roc_utils.py b/fairlearn/postprocessing/_roc_utils.py new file mode 100644 index 000000000..fcb394bce --- /dev/null +++ b/fairlearn/postprocessing/_roc_utils.py @@ -0,0 +1,170 @@ +# Copyright (c) Fairlearn contributors. +# Licensed under the MIT License. + +"""Helper functions for threshold optimization methods. + +NOTE +---- +- Most utils defined here likely have a similar counter-part already implemented + somewhere in the `fairlearn` code-base. +- With time they will probably be substituted by that counter-part, and these + implementations removed. +""" +import logging +import numpy as np +from scipy.spatial import ConvexHull +from sklearn.metrics import confusion_matrix + + +def calc_cost_of_point( + fpr: float, + fnr: float, + prevalence: float, + *, + false_pos_cost: float = 1.0, + false_neg_cost: float = 1.0, +) -> float: + """Calculate the cost of the given ROC point. + + Parameters + ---------- + fpr : float + The false positive rate (FPR). + fnr : float + The false negative rate (FNR). + prevalence : float + The prevalence of positive samples in the dataset, + i.e., np.sum(y_true) / len(y_true) + false_pos_cost : float, optional + The cost of a false positive error, by default 1. + false_neg_cost : float, optional + The cost of a false negative error, by default 1. + + Returns + ------- + cost : float + The cost of the given ROC point (divided by the size of the dataset). + """ + cost_vector = np.array([false_pos_cost, false_neg_cost]) + weight_vector = np.array([1 - prevalence, prevalence]) + return cost_vector * weight_vector @ np.array([fpr, fnr]) + + +def compute_roc_point_from_predictions(y_true, y_pred_binary): + """Compute the ROC point associated with the provided binary predictions. + + Parameters + ---------- + y_true : np.ndarray + The true labels. + y_pred_binary : np.ndarray + The binary predictions. + + Returns + ------- + tuple[float, float] + The resulting ROC point, i.e., a tuple (FPR, TPR). + """ + tn, fp, fn, tp = confusion_matrix(y_true, y_pred_binary).ravel() + + # FPR = FP / LN + fpr = fp / (fp + tn) + + # TPR = TP / LP + tpr = tp / (tp + fn) + + return (fpr, tpr) + + +def compute_global_roc_from_groupwise( + groupwise_roc_points: np.ndarray, + groupwise_label_pos_weight: np.ndarray, + groupwise_label_neg_weight: np.ndarray, +) -> np.ndarray: + """Compute the global ROC point that corresponds to the provided group-wise ROC points. + + The global ROC is a linear combination of the group-wise points, with + different weights for computing FPR and TPR -- the first related to LNs, and + the second to LPs. + + Parameters + ---------- + groupwise_roc_points : np.ndarray + An array of shape (n_groups, n_roc_dims) containing one ROC point per + group. + groupwise_label_pos_weight : np.ndarray + The relative size of each group in terms of its label POSITIVE samples + (out of all POSITIVE samples, how many are in each group). + groupwise_label_neg_weight : np.ndarray + The relative size of each group in terms of its label NEGATIVE samples + (out of all NEGATIVE samples, how many are in each group). + + Returns + ------- + global_roc_point : np.ndarray + A single point that corresponds to the global outcome of the given + group-wise ROC points. + """ + n_groups, _ = groupwise_roc_points.shape + + # Validating input shapes + if ( + len(groupwise_label_pos_weight) != len(groupwise_label_neg_weight) + or len(groupwise_label_pos_weight) != n_groups + ): + raise ValueError( + "Invalid input shapes: length of all arguments must be equal (the " + "number of different sensitive groups)." + ) + + # Normalize group LP (/LN) weights by their size + if not np.isclose(groupwise_label_pos_weight.sum(), 1.0): + groupwise_label_pos_weight /= groupwise_label_pos_weight.sum() + if not np.isclose(groupwise_label_neg_weight.sum(), 1.0): + groupwise_label_neg_weight /= groupwise_label_neg_weight.sum() + + # Compute global FPR (weighted by relative number of LNs in each group) + global_fpr = groupwise_label_neg_weight @ groupwise_roc_points[:, 0] + + # Compute global TPR (weighted by relative number of LPs in each group) + global_tpr = groupwise_label_pos_weight @ groupwise_roc_points[:, 1] + + global_roc_point = np.array([global_fpr, global_tpr]) + return global_roc_point + + +def roc_convex_hull(roc_points: np.ndarray) -> np.ndarray: + """Compute the convex hull of the provided ROC points. + + Parameters + ---------- + roc_points : np.ndarray + An array of shape (n_points, n_dims) containing all points + of a provided ROC curve. + + Returns + ------- + hull_points : np.ndarray + An array of shape (n_hull_points, n_dim) containing all + points in the convex hull of the ROC curve. + """ + # Save init data just for logging + init_num_points, _dims = roc_points.shape + + # Compute convex hull + hull = ConvexHull(roc_points) + + # NOTE: discarding points below the diagonal seems to lead to bugs later on, idk why... + # Discard points in the interior of the convex hull, + # and other useless points (below main diagonal) + # points_above_diagonal = np.argwhere(roc_points[:, 1] >= roc_points[:, 0]).ravel() + # hull_indices = sorted(set(hull.vertices) & set(points_above_diagonal)) + + hull_indices = hull.vertices + + logging.info( + "ROC convex hull contains %.1f%% of the original points.", + (len(hull_indices) / init_num_points) * 100, + ) + + return roc_points[hull_indices] diff --git a/fairlearn/postprocessing/_threshold_optimizer.py b/fairlearn/postprocessing/_threshold_optimizer.py index e3642915b..cac9006f3 100644 --- a/fairlearn/postprocessing/_threshold_optimizer.py +++ b/fairlearn/postprocessing/_threshold_optimizer.py @@ -130,6 +130,9 @@ class ThresholdOptimizer(BaseEstimator, MetaEstimatorMixin): estimator : object A `scikit-learn compatible estimator `_ # noqa whose output is postprocessed. + The estimator should output real-valued scores, as postprocessing + results will be extremely poor when performed over binarized + predictions. constraints : str, default='demographic_parity' Fairness constraints under which threshold optimization is performed. @@ -180,15 +183,15 @@ class ThresholdOptimizer(BaseEstimator, MetaEstimatorMixin): 'auto' use one of :code:`predict_proba`, :code:`decision_function`, or :code:`predict`, in that order. - + 'predict_proba' use the second column from the output of :code:`predict_proba`. It is assumed that the second column represents the positive outcome. - + 'decision_function' use the raw values given by the :code:`decision_function`. - + 'predict' use the hard values reported by the :code:`predict` method if estimator is a classifier, and the regression values if diff --git a/fairlearn/reductions/_moments/error_rate.py b/fairlearn/reductions/_moments/error_rate.py index ab390d5d4..27ad27a89 100644 --- a/fairlearn/reductions/_moments/error_rate.py +++ b/fairlearn/reductions/_moments/error_rate.py @@ -4,15 +4,11 @@ import numpy as np import pandas as pd +from fairlearn.utils._common import unpack_fp_fn_costs from fairlearn.utils._input_validation import _validate_and_reformat_input from .moment import _ALL, _LABEL, ClassificationMoment -_MESSAGE_BAD_COSTS = ( - "costs needs to be a dictionary with keys " - "'fp' and 'fn' containing non-negative values, which are not both zero" -) - class ErrorRate(ClassificationMoment): r"""Misclassification error as a moment. @@ -48,17 +44,8 @@ def __init__(self, *, costs=None): if costs is None: self.fp_cost = 1.0 self.fn_cost = 1.0 - elif ( - type(costs) is dict - and costs.keys() == {"fp", "fn"} - and costs["fp"] >= 0.0 - and costs["fn"] >= 0.0 - and costs["fp"] + costs["fn"] > 0.0 - ): - self.fp_cost = costs["fp"] - self.fn_cost = costs["fn"] else: - raise ValueError(_MESSAGE_BAD_COSTS) + self.fp_cost, self.fn_cost = unpack_fp_fn_costs(costs) def load_data(self, X, y, *, sensitive_features, control_features=None): """Load the specified data into the object.""" diff --git a/fairlearn/utils/_common.py b/fairlearn/utils/_common.py index bc76f33fc..de978c153 100644 --- a/fairlearn/utils/_common.py +++ b/fairlearn/utils/_common.py @@ -1,6 +1,51 @@ # Copyright (c) Fairlearn contributors. # Licensed under the MIT License. +from __future__ import annotations + +_MESSAGE_BAD_COSTS = ( + "costs needs to be a dictionary with keys " + "'fp' and 'fn' containing non-negative values, which are not both zero" +) + + +def unpack_fp_fn_costs(costs: dict) -> tuple[float, float]: + """Validate and unpacks the given `costs`. + + Parameters + ---------- + costs : dict + A dictionary detailing the cost for false positives and false negatives, + of the form :code:`{'fp': , 'fn': }`. + + Returns + ------- + tuple[float, float] + A tuple respectively composed of the cost of false positives and the + cost of false negatives, i.e., a tuple with + :code:`(fp_cost, fn_cost)`. + + Raises + ------ + ValueError + Raised when the provided costs are invalid (e.g., missing keys + in the provided dict, or negative costs). + """ + if ( + type(costs) is dict + and costs.keys() == {"fp", "fn"} + and costs["fp"] >= 0.0 + and costs["fn"] >= 0.0 + and costs["fp"] + costs["fn"] > 0.0 + ): + fp_cost = costs["fp"] + fn_cost = costs["fn"] + + else: + raise ValueError(_MESSAGE_BAD_COSTS) + + return fp_cost, fn_cost + def _get_soft_predictions(estimator, X, predict_method): r"""Return soft predictions of a classifier. diff --git a/test/unit/reductions/moments/test_moments_error_rate.py b/test/unit/reductions/moments/test_moments_error_rate.py index 870a71fb5..34659c1b9 100644 --- a/test/unit/reductions/moments/test_moments_error_rate.py +++ b/test/unit/reductions/moments/test_moments_error_rate.py @@ -4,7 +4,7 @@ import pytest from fairlearn.reductions import ErrorRate -from fairlearn.reductions._moments.error_rate import _MESSAGE_BAD_COSTS +from fairlearn.utils._common import _MESSAGE_BAD_COSTS BAD_COSTS_EXAMPLES = [ {"fp": 0.0, "fn": 0.0},