-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
categorical_column_domain_builder.py
362 lines (313 loc) · 14.4 KB
/
categorical_column_domain_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
from great_expectations.execution_engine.execution_engine import MetricDomainTypes
from great_expectations.rule_based_profiler.domain_builder import ColumnDomainBuilder
from great_expectations.rule_based_profiler.helpers.cardinality_checker import (
AbsoluteCardinalityLimit,
CardinalityChecker,
CardinalityLimitMode,
RelativeCardinalityLimit,
validate_input_parameters,
)
from great_expectations.rule_based_profiler.helpers.util import (
build_simple_domains_from_column_names,
get_parameter_value_and_validate_return_type,
get_resolved_metrics_by_key,
)
from great_expectations.rule_based_profiler.types import (
Domain,
ParameterContainer,
SemanticDomainTypes,
)
from great_expectations.validator.metric_configuration import MetricConfiguration
class CategoricalColumnDomainBuilder(ColumnDomainBuilder):
"""
This DomainBuilder uses column cardinality to identify domains.
"""
exclude_field_names: Set[str] = ColumnDomainBuilder.exclude_field_names | {
"cardinality_checker",
}
cardinality_limit_modes: CardinalityLimitMode = CardinalityLimitMode
def __init__(
self,
include_column_names: Optional[Union[str, Optional[List[str]]]] = None,
exclude_column_names: Optional[Union[str, Optional[List[str]]]] = None,
include_column_name_suffixes: Optional[Union[str, Iterable, List[str]]] = None,
exclude_column_name_suffixes: Optional[Union[str, Iterable, List[str]]] = None,
semantic_type_filter_module_name: Optional[str] = None,
semantic_type_filter_class_name: Optional[str] = None,
include_semantic_types: Optional[
Union[str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]]
] = None,
exclude_semantic_types: Optional[
Union[str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]]
] = None,
allowed_semantic_types_passthrough: Optional[
Union[str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]]
] = None,
limit_mode: Optional[Union[CardinalityLimitMode, str]] = None,
max_unique_values: Optional[Union[str, int]] = None,
max_proportion_unique: Optional[Union[str, float]] = None,
data_context: Optional["BaseDataContext"] = None, # noqa: F821
):
"""Create column domains where cardinality is within the specified limit.
Cardinality refers to the number of unique values in a given domain.
Categorical generally refers to columns with relatively limited
number of unique values.
Limit mode can be absolute (number of unique values) or relative
(proportion of unique values). You can choose one of: limit_mode,
max_unique_values or max_proportion_unique to specify the cardinality
limit.
Note that the limit must be met for each Batch separately.
If other Batch objects contain additional columns, these will not be considered.
Args:
include_column_names: Explicitly specified desired columns (if None, it is computed based on active Batch).
exclude_column_names: If provided, these columns are pre-filtered and excluded from consideration.
include_column_name_suffixes: Explicitly specified desired suffixes for corresponding columns to match.
exclude_column_name_suffixes: Explicitly specified desired suffixes for corresponding columns to not match.
semantic_type_filter_module_name: module_name containing class that implements SemanticTypeFilter interfaces
semantic_type_filter_class_name: class_name of class that implements SemanticTypeFilter interfaces
include_semantic_types: single/multiple type specifications using SemanticDomainTypes (or str equivalents)
to be included
exclude_semantic_types: single/multiple type specifications using SemanticDomainTypes (or str equivalents)
to be excluded
allowed_semantic_types_passthrough: single/multiple type specifications using SemanticDomainTypes
(or str equivalents) to be allowed without processing, if encountered among available column_names
limit_mode: CardinalityLimitMode or string name of the mode
defining the maximum allowable cardinality to use when
filtering columns.
Accessible for convenience via CategoricalColumnDomainBuilder.cardinality_limit_modes e.g.:
limit_mode=CategoricalColumnDomainBuilder.cardinality_limit_modes.VERY_FEW,
max_unique_values: number of max unique rows for a custom
cardinality limit to use when filtering columns.
max_proportion_unique: proportion of unique values for a
custom cardinality limit to use when filtering columns.
data_context: BaseDataContext associated with this DomainBuilder
"""
if exclude_column_names is None:
exclude_column_names = [
"id",
]
if exclude_semantic_types is None:
exclude_semantic_types = [
SemanticDomainTypes.BINARY,
SemanticDomainTypes.CURRENCY,
SemanticDomainTypes.IDENTIFIER,
]
if allowed_semantic_types_passthrough is None:
allowed_semantic_types_passthrough = [
SemanticDomainTypes.LOGIC,
]
self._allowed_semantic_types_passthrough = allowed_semantic_types_passthrough
super().__init__(
include_column_names=include_column_names,
exclude_column_names=exclude_column_names,
include_column_name_suffixes=include_column_name_suffixes,
exclude_column_name_suffixes=exclude_column_name_suffixes,
semantic_type_filter_module_name=semantic_type_filter_module_name,
semantic_type_filter_class_name=semantic_type_filter_class_name,
include_semantic_types=include_semantic_types,
exclude_semantic_types=exclude_semantic_types,
data_context=data_context,
)
self._limit_mode = limit_mode
self._max_unique_values = max_unique_values
self._max_proportion_unique = max_proportion_unique
self._cardinality_checker = None
@property
def domain_type(self) -> MetricDomainTypes:
return MetricDomainTypes.COLUMN
@property
def allowed_semantic_types_passthrough(
self,
) -> Optional[
Union[str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]]
]:
return self._allowed_semantic_types_passthrough
@allowed_semantic_types_passthrough.setter
def allowed_semantic_types_passthrough(
self,
value: Optional[
Union[str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]]
],
):
self._allowed_semantic_types_passthrough = value
@property
def limit_mode(self) -> Optional[Union[CardinalityLimitMode, str]]:
return self._limit_mode
@property
def max_unique_values(self) -> Optional[Union[str, int]]:
return self._max_unique_values
@property
def max_proportion_unique(self) -> Optional[Union[str, float]]:
return self._max_proportion_unique
@property
def cardinality_checker(self) -> Optional[CardinalityChecker]:
return self._cardinality_checker
def _get_domains(
self,
variables: Optional[ParameterContainer] = None,
) -> List[Domain]:
"""Return domains matching the selected limit_mode.
Args:
variables: Optional variables to substitute when evaluating.
Returns:
List of domains that match the desired cardinality.
"""
batch_ids: List[str] = self.get_batch_ids(variables=variables)
validator: "Validator" = self.get_validator(variables=variables) # noqa: F821
effective_column_names: List[str] = self.get_effective_column_names(
batch_ids=batch_ids,
validator=validator,
variables=variables,
)
# Obtain limit_mode from "rule state" (i.e., variables and parameters); from instance variable otherwise.
limit_mode: Optional[
Union[CardinalityLimitMode, str]
] = get_parameter_value_and_validate_return_type(
domain=None,
parameter_reference=self.limit_mode,
expected_return_type=None,
variables=variables,
parameters=None,
)
# Obtain max_unique_values from "rule state" (i.e., variables and parameters); from instance variable otherwise.
max_unique_values: Optional[int] = get_parameter_value_and_validate_return_type(
domain=None,
parameter_reference=self.max_unique_values,
expected_return_type=None,
variables=variables,
parameters=None,
)
# Obtain max_proportion_unique from "rule state" (i.e., variables and parameters); from instance variable otherwise.
max_proportion_unique: Optional[
float
] = get_parameter_value_and_validate_return_type(
domain=None,
parameter_reference=self.max_proportion_unique,
expected_return_type=None,
variables=variables,
parameters=None,
)
validate_input_parameters(
limit_mode=limit_mode,
max_unique_values=max_unique_values,
max_proportion_unique=max_proportion_unique,
)
self._cardinality_checker = CardinalityChecker(
limit_mode=limit_mode,
max_unique_values=max_unique_values,
max_proportion_unique=max_proportion_unique,
)
# Obtain allowed_semantic_types_passthrough from "rule state" (i.e., variables and parameters); from instance variable otherwise.
allowed_semantic_types_passthrough: Union[
str, SemanticDomainTypes, List[Union[str, SemanticDomainTypes]]
] = get_parameter_value_and_validate_return_type(
domain=None,
parameter_reference=self.allowed_semantic_types_passthrough,
expected_return_type=None,
variables=variables,
parameters=None,
)
allowed_semantic_types_passthrough = (
self.semantic_type_filter.parse_semantic_domain_type_argument(
semantic_types=allowed_semantic_types_passthrough
)
)
column_name: str
allowed_column_names_passthrough: List[str] = [
column_name
for column_name in effective_column_names
if self.semantic_type_filter.table_column_name_to_inferred_semantic_domain_type_mapping[
column_name
]
in allowed_semantic_types_passthrough
]
effective_column_names = [
column_name
for column_name in effective_column_names
if column_name not in allowed_column_names_passthrough
]
metrics_for_cardinality_check: Dict[
str, List[MetricConfiguration]
] = self._generate_metric_configurations_to_check_cardinality(
batch_ids=batch_ids, column_names=effective_column_names
)
candidate_column_names: List[
str
] = self._column_names_meeting_cardinality_limit(
validator=validator,
metrics_for_cardinality_check=metrics_for_cardinality_check,
)
candidate_column_names.extend(allowed_column_names_passthrough)
return build_simple_domains_from_column_names(
column_names=candidate_column_names,
domain_type=self.domain_type,
)
def _generate_metric_configurations_to_check_cardinality(
self,
batch_ids: List[str],
column_names: List[str],
) -> Dict[str, List[MetricConfiguration]]:
"""Generate metric configurations used to compute metrics for checking cardinality.
Args:
batch_ids: List of batch_ids used to create metric configurations.
column_names: List of column_names used to create metric configurations.
Returns:
Dictionary of the form {
"my_column_name": List[MetricConfiguration],
}
"""
limit_mode: Union[
AbsoluteCardinalityLimit, RelativeCardinalityLimit
] = self.cardinality_checker.limit_mode
batch_id: str
metric_configurations: Dict[str, List[MetricConfiguration]] = {
column_name: [
MetricConfiguration(
metric_name=limit_mode.metric_name_defining_limit,
metric_domain_kwargs={
"column": column_name,
"batch_id": batch_id,
},
metric_value_kwargs=None,
metric_dependencies=None,
)
for batch_id in batch_ids
]
for column_name in column_names
}
return metric_configurations
def _column_names_meeting_cardinality_limit(
self,
validator: "Validator", # noqa: F821
metrics_for_cardinality_check: Dict[str, List[MetricConfiguration]],
) -> List[str]:
"""Compute cardinality and return column names meeting cardinality limit.
Args:
validator: Validator used to compute column cardinality.
metrics_for_cardinality_check: metric configurations used to compute cardinality.
Returns:
List of column names meeting cardinality.
"""
column_name: str
resolved_metrics: Dict[Tuple[str, str, str], Any]
metric_value: Any
resolved_metrics_by_column_name: Dict[
str, Dict[Tuple[str, str, str], Any]
] = get_resolved_metrics_by_key(
validator=validator,
metric_configurations_by_key=metrics_for_cardinality_check,
)
candidate_column_names: List[str] = [
column_name
for column_name, resolved_metrics in resolved_metrics_by_column_name.items()
if all(
[
self.cardinality_checker.cardinality_within_limit(
metric_value=metric_value
)
for metric_value in resolved_metrics.values()
]
)
]
return candidate_column_names