/
exponentiated_gradient.py
222 lines (179 loc) · 8.67 KB
/
exponentiated_gradient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import logging
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, MetaEstimatorMixin
from ._constants import _ACCURACY_MUL, _REGRET_CHECK_START_T, _REGRET_CHECK_INCREASE_T, \
_SHRINK_REGRET, _SHRINK_ETA, _MIN_T, _RUN_LP_STEP, _PRECISION, _INDENTATION
from ._lagrangian import _Lagrangian
from fairlearn.reductions._moments import ClassificationMoment
from fairlearn._input_validation import _validate_and_reformat_input
logger = logging.getLogger(__name__)
class ExponentiatedGradient(BaseEstimator, MetaEstimatorMixin):
"""An Estimator which implements the exponentiated gradient approach to reductions.
The exponentiated gradient algorithm is described in detail by
`Agarwal et al. (2018) <https://arxiv.org/abs/1803.02453>`_.
:param estimator: An estimator implementing methods :code:`fit(X, y, sample_weight)` and
:code:`predict(X)`, where `X` is the matrix of features, `y` is the vector of labels, and
`sample_weight` is a vector of weights; labels `y` and predictions returned by
:code:`predict(X)` are either 0 or 1.
:type estimator: estimator
:param constraints: The disparity constraints expressed as moments
:type constraints: fairlearn.reductions.Moment
:param eps: Allowed fairness constraint violation; the solution is guaranteed to have the
error within :code:`2*best_gap` of the best error under constraint eps; the constraint
violation is at most :code:`2*(eps+best_gap)`
:type eps: float
:param T: Maximum number of iterations
:type T: int
:param nu: Convergence threshold for the duality gap, corresponding to a
conservative automatic setting based on the statistical uncertainty in measuring
classification error
:type nu: float
:param eta_mul: Initial setting of the learning rate
:type eta_mul: float
"""
def __init__(self, estimator, constraints, eps=0.01, T=50, nu=None, eta_mul=2.0): # noqa: D103
self._estimator = estimator
self._constraints = constraints
self._eps = eps
self._T = T
self._nu = nu
self._eta_mul = eta_mul
self._best_gap = None
self._predictors = None
self._weights = None
self._last_t = None
self._best_t = None
self._n_oracle_calls = 0
self._n_oracle_calls_dummy_returned = 0
self._oracle_execution_times = None
self._lambda_vecs = pd.DataFrame()
self._lambda_vecs_LP = pd.DataFrame()
self._lambda_vecs_lagrangian = pd.DataFrame()
def fit(self, X, y, **kwargs):
"""Return a fair classifier under specified fairness constraints.
:param X: The feature matrix
:type X: numpy.ndarray or pandas.DataFrame
:param y: The label vector
:type y: numpy.ndarray, pandas.DataFrame, pandas.Series, or list
"""
if isinstance(self._constraints, ClassificationMoment):
logger.debug("Classification problem detected")
is_classification_reduction = True
else:
logger.debug("Regression problem detected")
is_classification_reduction = False
_, y_train, sensitive_features = _validate_and_reformat_input(
X, y, enforce_binary_labels=is_classification_reduction, **kwargs)
n = y_train.shape[0]
logger.debug("...Exponentiated Gradient STARTING")
B = 1 / self._eps
lagrangian = _Lagrangian(X, sensitive_features, y_train, self._estimator,
self._constraints, self._eps, B)
theta = pd.Series(0, lagrangian.constraints.index)
Qsum = pd.Series(dtype="float64")
gaps_EG = []
gaps = []
Qs = []
last_regret_checked = _REGRET_CHECK_START_T
last_gap = np.PINF
for t in range(0, self._T):
logger.debug("...iter=%03d", t)
# set lambdas for every constraint
lambda_vec = B * np.exp(theta) / (1 + np.exp(theta).sum())
self._lambda_vecs[t] = lambda_vec
lambda_EG = self._lambda_vecs.mean(axis=1)
# select classifier according to best_h method
h, h_idx = lagrangian.best_h(lambda_vec)
if t == 0:
if self._nu is None:
self._nu = _ACCURACY_MUL * (h(X) - y_train).abs().std() / np.sqrt(n)
eta_min = self._nu / (2 * B)
eta = self._eta_mul / B
logger.debug("...eps=%.3f, B=%.1f, nu=%.6f, T=%d, eta_min=%.6f",
self._eps, B, self._nu, self._T, eta_min)
if h_idx not in Qsum.index:
Qsum.at[h_idx] = 0.0
Qsum[h_idx] += 1.0
gamma = lagrangian.gammas[h_idx]
Q_EG = Qsum / Qsum.sum()
result_EG = lagrangian.eval_gap(Q_EG, lambda_EG, self._nu)
gap_EG = result_EG.gap()
gaps_EG.append(gap_EG)
if t == 0 or not _RUN_LP_STEP:
gap_LP = np.PINF
else:
# saddle point optimization over the convex hull of
# classifiers returned so far
Q_LP, self._lambda_vecs_LP[t], result_LP = lagrangian.solve_linprog(self._nu)
gap_LP = result_LP.gap()
# keep values from exponentiated gradient or linear programming
if gap_EG < gap_LP:
Qs.append(Q_EG)
gaps.append(gap_EG)
else:
Qs.append(Q_LP)
gaps.append(gap_LP)
logger.debug("%seta=%.6f, L_low=%.3f, L=%.3f, L_high=%.3f, gap=%.6f, disp=%.3f, "
"err=%.3f, gap_LP=%.6f",
_INDENTATION, eta, result_EG.L_low, result_EG.L, result_EG.L_high,
gap_EG, result_EG.gamma.max(), result_EG.error, gap_LP)
if (gaps[t] < self._nu) and (t >= _MIN_T):
# solution found
break
# update regret
if t >= last_regret_checked * _REGRET_CHECK_INCREASE_T:
best_gap = min(gaps_EG)
if best_gap > last_gap * _SHRINK_REGRET:
eta *= _SHRINK_ETA
last_regret_checked = t
last_gap = best_gap
# update theta based on learning rate
theta += eta * (gamma - self._eps)
# retain relevant result data
gaps_series = pd.Series(gaps)
gaps_best = gaps_series[gaps_series <= gaps_series.min() + _PRECISION]
self._best_t = gaps_best.index[-1]
self._best_gap = gaps[self._best_t]
self._weights = Qs[self._best_t]
self._hs = lagrangian.hs
for h_idx in self._hs.index:
if h_idx not in self._weights.index:
self._weights.at[h_idx] = 0.0
self._last_t = len(Qs) - 1
self._predictors = lagrangian.classifiers
self._n_oracle_calls = lagrangian.n_oracle_calls
self._n_oracle_calls_dummy_returned = lagrangian.n_oracle_calls_dummy_returned
self._oracle_execution_times = lagrangian.oracle_execution_times
self._lambda_vecs_lagrangian = lagrangian.lambdas
logger.debug("...eps=%.3f, B=%.1f, nu=%.6f, T=%d, eta_min=%.6f",
self._eps, B, self._nu, self._T, eta_min)
logger.debug("...last_t=%d, best_t=%d, best_gap=%.6f, n_oracle_calls=%d, n_hs=%d",
self._last_t, self._best_t, self._best_gap, lagrangian.n_oracle_calls,
len(lagrangian.classifiers))
def predict(self, X):
"""Provide a prediction for the given input data.
Note that this is non-deterministic, due to the nature of the
exponentiated gradient algorithm.
:param X: Feature data
:type X: numpy.ndarray or pandas.DataFrame
:return: The prediction. If `X` represents the data for a single example
the result will be a scalar. Otherwise the result will be a vector
:rtype: Scalar or vector
"""
positive_probs = self._pmf_predict(X)[:, 1]
return (positive_probs >= np.random.rand(len(positive_probs))) * 1
def _pmf_predict(self, X):
"""Probability mass function for the given input data.
:param X: Feature data
:type X: numpy.ndarray or pandas.DataFrame
:return: Array of tuples with the probabilities of predicting 0 and 1.
:rtype: pandas.DataFrame
"""
pred = pd.DataFrame()
for t in range(len(self._hs)):
pred[t] = self._hs[t](X)
positive_probs = pred[self._weights.index].dot(self._weights).to_frame()
return np.concatenate((1-positive_probs, positive_probs), axis=1)