-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
column_pair_domain_builder.py
86 lines (72 loc) · 2.86 KB
/
column_pair_domain_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from typing import Dict, List, Optional, Union
import great_expectations.exceptions as ge_exceptions
from great_expectations.execution_engine.execution_engine import MetricDomainTypes
from great_expectations.rule_based_profiler.domain_builder import ColumnDomainBuilder
from great_expectations.rule_based_profiler.types import Domain, ParameterContainer
class ColumnPairDomainBuilder(ColumnDomainBuilder):
"""
This DomainBuilder uses relative tolerance of specified map metric to identify domains.
"""
def __init__(
self,
include_column_names: Optional[Union[str, Optional[List[str]]]] = None,
data_context: Optional["BaseDataContext"] = None, # noqa: F821
):
"""
Args:
include_column_names: Explicitly specified exactly two desired columns
data_context: BaseDataContext associated with this DomainBuilder
"""
super().__init__(
include_column_names=include_column_names,
exclude_column_names=None,
include_column_name_suffixes=None,
exclude_column_name_suffixes=None,
semantic_type_filter_module_name=None,
semantic_type_filter_class_name=None,
include_semantic_types=None,
exclude_semantic_types=None,
data_context=data_context,
)
@property
def domain_type(self) -> MetricDomainTypes:
return MetricDomainTypes.COLUMN_PAIR
def _get_domains(
self,
variables: Optional[ParameterContainer] = None,
) -> List[Domain]:
"""Return domains matching the specified tolerance limits.
Args:
variables: Optional variables to substitute when evaluating.
Returns:
List of domains that match the desired tolerance limits.
"""
batch_ids: List[str] = self.get_batch_ids(variables=variables)
validator: "Validator" = self.get_validator(variables=variables) # noqa: F821
effective_column_names: List[str] = self.get_effective_column_names(
batch_ids=batch_ids,
validator=validator,
variables=variables,
)
if not (effective_column_names and (len(effective_column_names) == 2)):
raise ge_exceptions.ProfilerExecutionError(
message=f"""Error: Columns specified for {self.__class__.__name__} in sorted order must correspond to \
"column_A" and "column_B" (in this exact order).
"""
)
domain_kwargs: Dict[str, str] = dict(
zip(
[
"column_A",
"column_B",
],
sorted(effective_column_names),
)
)
domains: List[Domain] = [
Domain(
domain_type=self.domain_type,
domain_kwargs=domain_kwargs,
),
]
return domains