-
Notifications
You must be signed in to change notification settings - Fork 109
/
data_repairer.py
190 lines (166 loc) · 6.51 KB
/
data_repairer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from typing import Optional
import numpy as np
import pandas as pd
from aequitas.flow.methods.preprocessing.preprocessing import PreProcessing
class DataRepairer(PreProcessing):
def __init__(
self,
repair_level: float = 1.0,
columns: Optional[list[str]] = None,
definition: int = 101,
):
"""
Transforms the data distribution so that a given feature distribution is
more or less independent of the sensitive attribute s.
This is achieved by matching the conditional distribution P(X|s) to the
global variable distribution P(X), matching the values of quantiles.
Parameters
----------
repair_level: float
How much will the data be transformed to the global distribution.
Defaults to 1.0.
columns: list[str], optional
Which columns to transform. If left empty, transforms all columns of
X.
definition: int
How many quantiles to calculate. Defaults to 101.
Attributes
----------
_quantile_points: np.ndarray
Values of quantiles. The length is determined by the definition.
_global_quantiles: dict[str, numpy.ndarray]
Quantiles of the features to transform.
_group_quantiles: dict[str, dict[str], numpy.ndarray]
Quantiles of the features to transform depending on the group.
Methods
-------
fit(X, y_hat, y, s=None)
Calculates the quantiles in the dataset for each feature and group.
transform(X, y_hat, s=None)
Transform the features to match the global distribution.
These methods are the ones to be implemented, which are defined by the
parent abstract class.
"""
self.repair_level = repair_level
self.columns = columns
self.definition = definition
self.used_in_inference = True
def fit(self, X: pd.DataFrame, y: pd.Series, s: Optional[pd.Series] = None) -> None:
"""
Calculates the quantiles in the dataset for each feature and group.
Parameters
----------
X : pd.DataFrame
The feature matrix.
y : pd.Series
The labels. Note that this is not used by the method but we must
follow the parent class method signature in the example.
s : pd.Series, optional
The sensitive attribute.
"""
super().fit(X, y, s)
if self.columns is None:
self.columns = [
column
for column in X.columns
if (X[column].dtype != "category" and X[column].dtype != "bool")
]
if s is None:
raise ValueError("s must be passed.")
self._quantile_points = np.linspace(0, 1, self.definition)
# Create a dictionary with quantile values per group.
self._group_quantiles = {}
# Also create a dictionary with the global quantiles.
self._global_quantiles = {}
for column in self.columns:
self._global_quantiles[column] = (
X[column].quantile(self._quantile_points).values
)
# Get the quantiles for each group in
self._group_quantiles[column] = self._get_group_quantiles(X, s, column)
def _get_group_quantiles(
self,
X: pd.DataFrame,
s: pd.Series,
column: str,
) -> dict[str, np.ndarray]:
"""
Transforms the quantiles to a more digested value.
The original method of pandas creates a dictionary with a two-level
index, which is harder to query. This transforms the two level index in
dictionaries within a single dictionary.
Parameters
----------
X : pd.DataFrame
The feature matrix.
s : pd.Series
The sensitive attribute.
column : str
The feature to calculate the quantiles.
"""
quantile_dict = X.groupby(s)[column].quantile(self._quantile_points)
transformed_quantile_dict = {}
for group in quantile_dict.index.get_level_values(0).unique():
transformed_quantile_dict[group] = quantile_dict[group].values
return transformed_quantile_dict
def transform(
self,
X: pd.DataFrame,
y: pd.Series,
s: Optional[pd.Series] = None,
) -> tuple[pd.DataFrame, pd.Series, pd.Series]:
"""
Transform the features conditioned of protected attribute to match
the global distribution.
Parameters
----------
X : pd.DataFrame
The features.
y : pandas.Series
The labels.
s : pandas.Series, optional
The protected attribute.
Returns
-------
pd.DataFrame, pd.Series, pd.Series
Transformed features, labels, and sensitive attribute.
"""
super().transform(X, y, s)
if s is None:
raise ValueError("s must be passed.")
X_repaired = X.copy()
for column in self.columns:
# Calculate the quantile of every point for every group (vectorized)
interpolation_quantiles = {}
# Also calculate the global distribution value at that quantile
global_values = {}
# After, calculate the corrected value for every point
corrected_values = {}
for group in s.unique():
interpolation_quantiles[group] = (
np.interp(
X_repaired[column],
self._group_quantiles[column][group],
self._quantile_points,
)
+ np.interp(
-X_repaired[column],
-self._group_quantiles[column][group][::-1],
self._quantile_points[::-1],
)
) / 2
global_values[group] = np.interp(
interpolation_quantiles[group],
self._quantile_points,
self._global_quantiles[column],
)
corrected_values[group] = global_values[
group
] * self.repair_level + X_repaired[column].values * (
1 - self.repair_level
)
repaired_column = [
corrected_values[group][index] for index, group in enumerate(s)
]
X_repaired[column] = repaired_column
return X_repaired, y, s