-
Notifications
You must be signed in to change notification settings - Fork 246
/
drift.py
173 lines (134 loc) · 5.92 KB
/
drift.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# ----------------------------------------------------------------------------
# Copyright (C) 2021-2022 Deepchecks (https://www.deepchecks.com)
#
# This file is part of Deepchecks.
# Deepchecks is distributed under the terms of the GNU Affero General
# Public License (version 3 or later).
# You should have received a copy of the GNU Affero General Public License
# along with Deepchecks. If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------------
#
"""Common utilities for distribution checks."""
from typing import Tuple, Union, Hashable, Callable
from scipy.stats import wasserstein_distance
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from deepchecks.utils.distribution.plot import drift_score_bar_traces, feature_distribution_traces
from deepchecks.utils.distribution.preprocessing import preprocess_2_cat_cols_to_same_bins
from deepchecks.core.errors import DeepchecksValueError
PSI_MIN_PERCENTAGE = 0.01
__all__ = ['calc_drift_and_plot']
def psi(expected_percents: np.ndarray, actual_percents: np.ndarray):
"""
Calculate the PSI (Population Stability Index).
See https://www.lexjansen.com/wuss/2017/47_Final_Paper_PDF.pdf
Parameters
----------
expected_percents: np.ndarray
array of percentages of each value in the expected distribution.
actual_percents: : np.ndarray
array of percentages of each value in the actual distribution.
Returns
-------
psi
The PSI score
"""
psi_value = 0
for i in range(len(expected_percents)):
# In order for the value not to diverge, we cap our min percentage value
e_perc = max(expected_percents[i], PSI_MIN_PERCENTAGE)
a_perc = max(actual_percents[i], PSI_MIN_PERCENTAGE)
value = (e_perc - a_perc) * np.log(e_perc / a_perc)
psi_value += value
return psi_value
def earth_movers_distance(dist1: Union[np.ndarray, pd.Series], dist2: Union[np.ndarray, pd.Series]):
"""
Calculate the Earth Movers Distance (Wasserstein distance).
See https://en.wikipedia.org/wiki/Wasserstein_metric
Function is for numerical data only.
Parameters
----------
dist1 : Union[np.ndarray, pd.Series]
array of numberical values.
dist2 : Union[np.ndarray, pd.Series]
array of numberical values to compare dist1 to.
Returns
-------
Any
the Wasserstein distance between the two distributions.
"""
unique1 = np.unique(dist1)
unique2 = np.unique(dist2)
sample_space = list(set(unique1).union(set(unique2)))
val_max = max(sample_space)
val_min = min(sample_space)
if val_max == val_min:
return 0
dist1 = (dist1 - val_min) / (val_max - val_min)
dist2 = (dist2 - val_min) / (val_max - val_min)
return wasserstein_distance(dist1, dist2)
def calc_drift_and_plot(train_column: pd.Series, test_column: pd.Series, plot_title: Hashable,
column_type: str, max_num_categories: int = 10) -> Tuple[float, str, Callable]:
"""
Calculate drift score per column.
Parameters
----------
train_column : pd.Series
column from train dataset
test_column : pd.Series
same column from test dataset
plot_title : Hashable
title of plot
column_type : str
type of column (either "numerical" or "categorical")
max_num_categories : int , default: 10
Max number of allowed categories. If there are more, they are binned into an "Other" category.
Returns
-------
Tuple[float, str, Callable]
drift score of the difference between the two columns' distributions (Earth movers distance for
numerical, PSI for categorical)
graph comparing the two distributions (density for numerical, stack bar for categorical)
"""
train_dist = train_column.dropna().values.reshape(-1)
test_dist = test_column.dropna().values.reshape(-1)
if column_type == 'numerical':
scorer_name = "Earth Mover's Distance"
train_dist = train_dist.astype('float')
test_dist = test_dist.astype('float')
score = earth_movers_distance(dist1=train_dist, dist2=test_dist)
bar_traces, bar_x_axis, bar_y_axis = drift_score_bar_traces(score)
dist_traces, dist_x_axis, dist_y_axis = feature_distribution_traces(train_dist, test_dist)
elif column_type == 'categorical':
scorer_name = 'PSI'
expected_percents, actual_percents, _ = \
preprocess_2_cat_cols_to_same_bins(dist1=train_dist, dist2=test_dist, max_num_categories=max_num_categories)
score = psi(expected_percents=expected_percents, actual_percents=actual_percents)
bar_traces, bar_x_axis, bar_y_axis = drift_score_bar_traces(score, bar_max=1)
dist_traces, dist_x_axis, dist_y_axis = feature_distribution_traces(train_dist, test_dist, is_categorical=True,
max_num_categories=max_num_categories)
else:
# Should never reach here
raise DeepchecksValueError(f'Unsupported column type for drift: {column_type}')
fig = make_subplots(rows=2, cols=1, vertical_spacing=0.2, shared_yaxes=False, shared_xaxes=False,
row_heights=[0.1, 0.9],
subplot_titles=['Drift Score - ' + scorer_name, 'Distribution Plot'])
fig.add_traces(bar_traces, rows=[1] * len(bar_traces), cols=[1] * len(bar_traces))
fig.add_traces(dist_traces, rows=[2] * len(dist_traces), cols=[1] * len(dist_traces))
shared_layout = go.Layout(
xaxis=bar_x_axis,
yaxis=bar_y_axis,
xaxis2=dist_x_axis,
yaxis2=dist_y_axis,
legend=dict(
title='Dataset',
yanchor='top',
y=0.6),
width=700,
height=400,
title=plot_title
)
fig.update_layout(shared_layout)
return score, scorer_name, fig