/
segment_performance.py
192 lines (160 loc) · 8.46 KB
/
segment_performance.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# ----------------------------------------------------------------------------
# Copyright (C) 2021-2023 Deepchecks (https://www.deepchecks.com)
#
# This file is part of Deepchecks.
# Deepchecks is distributed under the terms of the GNU Affero General
# Public License (version 3 or later).
# You should have received a copy of the GNU Affero General Public License
# along with Deepchecks. If not, see <http://www.gnu.org/licenses/>.
# ----------------------------------------------------------------------------
#
"""Module of segment performance check."""
import warnings
from typing import TYPE_CHECKING, Callable, Dict, List, Optional, Tuple, Union, cast
import numpy as np
import plotly.express as px
from deepchecks.core import CheckResult
from deepchecks.core.errors import DatasetValidationError, DeepchecksValueError
from deepchecks.tabular import Context, SingleDatasetCheck
from deepchecks.utils.docref import doclink
from deepchecks.utils.performance.partition import partition_column
from deepchecks.utils.strings import format_number
from deepchecks.utils.typing import Hashable
if TYPE_CHECKING:
from deepchecks.core.checks import CheckConfig
__all__ = ['SegmentPerformance']
class SegmentPerformance(SingleDatasetCheck):
"""Display performance score segmented by 2 top (or given) features in a heatmap.
.. deprecated:: 0.8.1
The SegmentPerformance check is deprecated and will be removed in the 0.11 version. Please use the
WeakSegmentsPerformance check instead.
Parameters
----------
feature_1 : Optional[Hashable] , default: None
feature to segment by on y-axis.
feature_2 : Optional[Hashable] , default: None
feature to segment by on x-axis.
alternative_scorer : Tuple[str, Union[str, Callable]] , default: None
Score to show, either function or sklearn scorer name.
If is not given a default scorer (per the model type) will be used.
max_segments : int , default: 10
maximal number of segments to split the values into.
n_samples : int , default: 1_000_000
number of samples to use for this check.
random_state : int, default: 42
random seed for all check internals.
"""
feature_1: Optional[Hashable]
feature_2: Optional[Hashable]
alternative_scorer: Optional[Dict[str, Union[str, Callable]]]
max_segments: int
def __init__(
self,
feature_1: Optional[Hashable] = None,
feature_2: Optional[Hashable] = None,
alternative_scorer: Tuple[str, Union[str, Callable]] = None,
max_segments: int = 10,
n_samples: int = 1_000_000,
random_state: int = 42,
**kwargs
):
super().__init__(**kwargs)
warnings.warn('The SegmentPerformance check is deprecated and will be removed in the 0.11 version. '
'Please use the WeakSegmentsPerformance check instead.', DeprecationWarning)
# if they're both none it's ok
if feature_1 and feature_1 == feature_2:
raise DeepchecksValueError('"feature_1" must be different than "feature_2"')
self.feature_1 = feature_1
self.feature_2 = feature_2
self.n_samples = n_samples
self.random_state = random_state
if not isinstance(max_segments, int) or max_segments < 0:
raise DeepchecksValueError('"num_segments" must be positive integer')
self.max_segments = max_segments
self.alternative_scorer = dict([alternative_scorer]) if alternative_scorer else None
def run_logic(self, context: Context, dataset_kind) -> CheckResult:
"""Run check."""
dataset = context.get_data_by_kind(dataset_kind).sample(self.n_samples, random_state=self.random_state)
model = context.model
scorer = context.get_single_scorer(self.alternative_scorer)
dataset.assert_features()
features = dataset.features
if len(features) < 2:
raise DatasetValidationError('Dataset must have at least 2 features')
if self.feature_1 is None and self.feature_2 is None:
# Use feature importance to select features if none were defined
feature_importance = context.feature_importance
if feature_importance is None:
self.feature_1, self.feature_2, *_ = features
else:
feature_importance.sort_values(ascending=False, inplace=True)
self.feature_1, self.feature_2, *_ = cast(List[Hashable], list(feature_importance.keys()))
elif self.feature_1 is None or self.feature_2 is None:
raise DeepchecksValueError('Must define both "feature_1" and "feature_2" or none of them')
else:
# If both are defined, must be in dataset columns
columns = dataset.data.columns
if self.feature_1 not in columns or self.feature_2 not in columns:
raise DeepchecksValueError('"feature_1" and "feature_2" must be in dataset columns')
if self.feature_1 not in (dataset.numerical_features + dataset.cat_features):
raise DeepchecksValueError('"feature_1" must be numerical or categorical, but it neither.')
if self.feature_2 not in (dataset.numerical_features + dataset.cat_features):
raise DeepchecksValueError('"feature_2" must be numerical or categorical, but it neither.')
feature_1_filters = partition_column(dataset, self.feature_1, max_segments=self.max_segments)
feature_2_filters = partition_column(dataset, self.feature_2, max_segments=self.max_segments)
scores = np.empty((len(feature_1_filters), len(feature_2_filters)), dtype=float)
counts = np.empty((len(feature_1_filters), len(feature_2_filters)), dtype=int)
for i, feature_1_filter in enumerate(feature_1_filters):
data = dataset.data
feature_1_df = feature_1_filter.filter(data)
for j, feature_2_filter in enumerate(feature_2_filters):
feature_2_df = feature_2_filter.filter(feature_1_df)
# Run on filtered data and save to matrix
if feature_2_df.empty:
score = np.NaN
else:
score = scorer(model, dataset.copy(feature_2_df))
scores[i, j] = score
counts[i, j] = len(feature_2_df)
x = [v.label for v in feature_2_filters]
y = [v.label for v in feature_1_filters]
scores_text = [[0]*scores.shape[1] for _ in range(scores.shape[0])]
for i in range(len(y)):
for j in range(len(x)):
score = scores[i, j]
if not np.isnan(score):
scores_text[i][j] = f'{format_number(score)}\n({counts[i, j]})'
elif counts[i, j] == 0:
scores_text[i][j] = ''
else:
scores_text[i][j] = f'{score}\n({counts[i, j]})'
# Plotly FigureWidget have bug with numpy nan, so replacing with python None
scores = scores.astype(object)
scores[np.isnan(scores.astype(float))] = None
value = {'scores': scores, 'counts': counts, 'feature_1': self.feature_1, 'feature_2': self.feature_2}
if context.with_display:
fig = px.imshow(scores, x=x, y=y, color_continuous_scale='rdylgn')
fig.update_traces(text=scores_text, texttemplate='%{text}')
fig.update_layout(
title=f'{scorer.name} (count) by features {self.feature_1}/{self.feature_2}',
height=600
)
fig.update_xaxes(title=self.feature_2, showgrid=False, tickangle=-30, side='bottom')
fig.update_yaxes(title=self.feature_1, autorange='reversed', showgrid=False)
else:
fig = None
return CheckResult(value, display=fig)
def config(self, include_version: bool = True, include_defaults: bool = True) -> 'CheckConfig':
"""Return check instance config."""
if self.alternative_scorer is not None:
for k, v in self.alternative_scorer.items():
if not isinstance(v, str):
reference = doclink(
'supported-metrics-by-string',
template='For a list of built-in scorers please refer to {link}. '
)
raise ValueError(
'Only built-in scorers are allowed when serializing check instances. '
f'{reference}Scorer name: {k}'
)
return super().config(include_version, include_defaults=include_defaults)