-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
data_assistant.py
374 lines (334 loc) · 15.8 KB
/
data_assistant.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import Batch, BatchRequestBase
from great_expectations.execution_engine.execution_engine import MetricDomainTypes
from great_expectations.rule_based_profiler.domain_builder import DomainBuilder
from great_expectations.rule_based_profiler.expectation_configuration_builder import (
ExpectationConfigurationBuilder,
)
from great_expectations.rule_based_profiler.helpers.configuration_reconciliation import (
DEFAULT_RECONCILATION_DIRECTIVES,
)
from great_expectations.rule_based_profiler.helpers.util import (
convert_variables_to_dict,
)
from great_expectations.rule_based_profiler.parameter_builder import ParameterBuilder
from great_expectations.rule_based_profiler.rule import Rule
from great_expectations.rule_based_profiler.rule_based_profiler import (
BaseRuleBasedProfiler,
RuleBasedProfiler,
)
from great_expectations.rule_based_profiler.types import Domain, ParameterNode
from great_expectations.rule_based_profiler.types.data_assistant_result import (
DataAssistantResult,
)
from great_expectations.util import measure_execution_time
from great_expectations.validator.validator import Validator
class DataAssistant(ABC):
"""
DataAssistant is an application built on top of the Rule-Based Profiler component.
DataAssistant subclasses provide exploration and validation of particular aspects of specified data Batch objects.
DataAssistant usage (e.g., in Jupyter notebook) adheres to the following pattern:
data_assistant: DataAssistant = VolumeDataAssistant(
name="my_volume_data_assistant",
validator=validator,
)
result: DataAssistantResult = data_assistant.run(
expectation_suite=None,
expectation_suite_name="my_suite",
include_citation=True,
save_updated_expectation_suite=False,
)
Then:
metrics: Dict[Domain, Dict[str, ParameterNode]] = result.metrics
expectation_suite: ExpectationSuite = result.expectation_suite
expectation_configurations: List[ExpectationConfiguration] = result.expectation_suite.expectations
expectation_suite_meta: Dict[str, Any] = expectation_suite.meta
profiler_config: RuleBasedProfilerConfig = result.profiler_config
"""
def __init__(
self,
name: str,
validator: Validator,
):
"""
DataAssistant subclasses guide "RuleBasedProfiler" to contain Rule configurations to embody profiling behaviors,
corresponding to indended exploration and validation goals. Then executing "RuleBasedProfiler.run()" yields
"RuleBasedProfilerResult" object, containing metrics by "Domain", list of "ExpectationConfiguration" objects,
and overall "ExpectationSuite" object, immediately available for validating underlying data "Batch" objects.
Args:
name: the name of this DataAssistant object
validator: Validator object, containing loaded Batch objects as well as Expectation and Metric operations
"""
self._name = name
self._validator = validator
self._profiler = RuleBasedProfiler(
name=self.name,
config_version=1.0,
variables=None,
data_context=self._validator.data_context,
)
self._build_profiler()
def _build_profiler(self) -> None:
"""
Builds "RuleBasedProfiler", corresponding to present DataAssistant use case.
Starts with empty "RuleBasedProfiler" (initialized in constructor) and adds Rule objects.
Subclasses can add custom "Rule" objects as appropriate for their respective particular DataAssistant use cases.
"""
variables: dict = {}
profiler: Optional[BaseRuleBasedProfiler]
rules: List[Rule]
rule: Rule
domain_builder: DomainBuilder
parameter_builders: List[ParameterBuilder]
expectation_configuration_builders: List[ExpectationConfigurationBuilder]
"""
For each Self-Initializing "Expectation" as specified by "DataAssistant.expectation_kwargs_by_expectation_type"
interface property, retrieve its "RuleBasedProfiler" configuration, construct "Rule" object based on it, while
incorporating metrics "ParameterBuilder" objects for "MetricDomainTypes", emitted by "DomainBuilder"
of comprised "Rule", specified by "DataAssistant.metrics_parameter_builders_by_domain_type" interface property.
Append this "Rule" object to overall DataAssistant "RuleBasedProfiler" object; incorporate "variables" as well.
"""
expectation_kwargs: Dict[str, Any]
for (
expectation_type,
expectation_kwargs,
) in self.expectation_kwargs_by_expectation_type.items():
profiler = self._validator.build_rule_based_profiler_for_expectation(
expectation_type=expectation_type
)(**expectation_kwargs)
variables.update(convert_variables_to_dict(variables=profiler.variables))
rules = profiler.rules
for rule in rules:
domain_builder = rule.domain_builder
parameter_builders = rule.parameter_builders or []
parameter_builders.extend(
self.metrics_parameter_builders_by_domain_type[
domain_builder.domain_type
]
)
expectation_configuration_builders = (
rule.expectation_configuration_builders or []
)
self.profiler.add_rule(
rule=Rule(
name=rule.name,
variables=rule.variables,
domain_builder=domain_builder,
parameter_builders=parameter_builders,
expectation_configuration_builders=expectation_configuration_builders,
)
)
self.profiler.variables = self.profiler.reconcile_profiler_variables(
variables=variables,
reconciliation_strategy=DEFAULT_RECONCILATION_DIRECTIVES.variables,
)
def run(
self,
expectation_suite: Optional[ExpectationSuite] = None,
expectation_suite_name: Optional[str] = None,
include_citation: bool = True,
save_updated_expectation_suite: bool = False,
) -> DataAssistantResult:
"""
Run the DataAssistant as it is currently configured.
Args:
expectation_suite: An existing "ExpectationSuite" to update
expectation_suite_name: A name for returned "ExpectationSuite"
include_citation: Flag, which controls whether or not to effective Profiler configuration should be included
as a citation in metadata of the "ExpectationSuite" computeds and returned by "RuleBasedProfiler"
save_updated_expectation_suite: Flag, constrolling whether or not updated "ExpectationSuite" must be saved
Returns:
DataAssistantResult: The result object for the DataAssistant
"""
data_assistant_result: DataAssistantResult = DataAssistantResult(
execution_time=0.0
)
run_profiler_on_data(
data_assistant=self,
data_assistant_result=data_assistant_result,
profiler=self.profiler,
variables=self.variables,
rules=self.rules,
batch_list=list(self._validator.batches.values()),
batch_request=None,
expectation_suite=expectation_suite,
expectation_suite_name=expectation_suite_name,
include_citation=include_citation,
save_updated_expectation_suite=save_updated_expectation_suite,
)
return self._build_data_assistant_result(
data_assistant_result=data_assistant_result
)
@property
def name(self) -> str:
return self._name
@property
def profiler(self) -> BaseRuleBasedProfiler:
return self._profiler
@property
@abstractmethod
def expectation_kwargs_by_expectation_type(self) -> Dict[str, Dict[str, Any]]:
"""
DataAssistant subclasses implement this method to return relevant Self-Initializing Expectations with "kwargs".
Returns:
Dictionary of Expectation "kwargs", keyed by "expectation_type".
"""
pass
@property
@abstractmethod
def metrics_parameter_builders_by_domain_type(
self,
) -> Dict[MetricDomainTypes, List[ParameterBuilder]]:
"""
DataAssistant subclasses implement this method to return "ParameterBuilder" objects for "MetricDomainTypes", for
every "Domain" type, for which generating metrics of interest is desired. These metrics will be computed in
addition to metrics already computed as part of "Rule" evaluation for every "Domain", emitted by "DomainBuilder"
of comprised "Rule"; these auxiliary metrics are aimed entirely for data exploration / visualization purposes.
Returns:
Dictionary of "ParameterBuilder" objects, keyed by members of "MetricDomainTypes" Enum.
"""
pass
@property
@abstractmethod
def variables(self) -> Optional[Dict[str, Any]]:
"""
Returns:
Optional "variables" configuration attribute name/value pairs (overrides), commonly-used in Builder objects.
"""
pass
@property
@abstractmethod
def rules(self) -> Optional[List[Rule]]:
"""
Returns:
Optional custom list of "Rule" objects (overrides) can be added by subclasses (return "None" if not needed).
"""
pass
@abstractmethod
def _build_data_assistant_result(
self, data_assistant_result: DataAssistantResult
) -> DataAssistantResult:
"""
DataAssistant subclasses implement this method to return subclasses of DataAssistantResult object, which imbue
base DataAssistantResult class with methods, pertaining to specifics of particular DataAssistantResult subclass.
Args:
data_assistant_result: Base DataAssistantResult result object of DataAssistant (contains only data fields)
Returns:
DataAssistantResult: The appropriate subclass of base DataAssistantResult result object of the DataAssistant
"""
pass
def get_metrics_by_domain(self) -> Dict[Domain, Dict[str, ParameterNode]]:
"""
Obtain subset of all parameter values for fully-qualified parameter names by domain, available from entire
"RuleBasedProfiler" state, where "Domain" types are among keys included in provisions as proscribed by return
value of "DataAssistant.metrics_parameter_builders_by_domain_type" interface property and actual fully-qualified
parameter names match interface properties of "ParameterBuilder" objects, corresponding to these "domain" types.
Returns:
Dictionaries of values for fully-qualified parameter names by Domain for metrics, from "RuleBasedpRofiler"
"""
# noinspection PyTypeChecker
parameter_values_for_fully_qualified_parameter_names_by_domain: Dict[
Domain, Dict[str, ParameterNode]
] = dict(
filter(
lambda element: element[0].domain_type
in list(self.metrics_parameter_builders_by_domain_type.keys()),
self.profiler.get_parameter_values_for_fully_qualified_parameter_names_by_domain().items(),
)
)
fully_qualified_metrics_parameter_names_by_domain_type: Dict[
MetricDomainTypes : List[str]
] = {}
domain_type: MetricDomainTypes
parameter_builders: List[ParameterBuilder]
parameter_builder: ParameterBuilder
for (
domain_type,
parameter_builders,
) in self.metrics_parameter_builders_by_domain_type.items():
fully_qualified_metrics_parameter_names_by_domain_type[domain_type] = [
parameter_builder.fully_qualified_parameter_name
for parameter_builder in parameter_builders
]
domain: Domain
parameter_values_for_fully_qualified_parameter_names: Dict[str, ParameterNode]
# noinspection PyTypeChecker
parameter_values_for_fully_qualified_parameter_names_by_domain = {
domain: dict(
filter(
lambda element: element[0]
in fully_qualified_metrics_parameter_names_by_domain_type[
domain.domain_type
],
parameter_values_for_fully_qualified_parameter_names.items(),
)
)
for domain, parameter_values_for_fully_qualified_parameter_names in parameter_values_for_fully_qualified_parameter_names_by_domain.items()
}
return parameter_values_for_fully_qualified_parameter_names_by_domain
def get_expectation_suite(
self,
expectation_suite: Optional[ExpectationSuite] = None,
expectation_suite_name: Optional[str] = None,
include_citation: bool = True,
save_updated_expectation_suite: bool = False,
) -> ExpectationSuite:
"""
Args:
expectation_suite: An existing "ExpectationSuite" to update
expectation_suite_name: A name for returned "ExpectationSuite"
include_citation: Flag, which controls whether or not effective "RuleBasedProfiler" configuration should be
included as a citation in metadata of the "ExpectationSuite" computeds and returned by "RuleBasedProfiler"
save_updated_expectation_suite: Flag, constrolling whether or not updated "ExpectationSuite" must be saved
Returns:
"ExpectationSuite" using "ExpectationConfiguration" objects, computed by "RuleBasedProfiler" state
"""
return self.profiler.get_expectation_suite(
expectation_suite=expectation_suite,
expectation_suite_name=expectation_suite_name,
include_citation=include_citation,
save_updated_expectation_suite=save_updated_expectation_suite,
)
@measure_execution_time(
execution_time_holder_object_reference_name="data_assistant_result",
execution_time_property_name="execution_time",
pretty_print=False,
)
def run_profiler_on_data(
data_assistant: DataAssistant,
data_assistant_result: DataAssistantResult,
profiler: BaseRuleBasedProfiler,
variables: Optional[Dict[str, Any]] = None,
rules: Optional[Dict[str, Dict[str, Any]]] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequestBase, dict]] = None,
expectation_suite: Optional[ExpectationSuite] = None,
expectation_suite_name: Optional[str] = None,
include_citation: bool = True,
save_updated_expectation_suite: bool = False,
) -> None:
if rules is None:
rules = []
rule: Rule
rules_configs: Optional[Dict[str, Dict[str, Any]]] = {
rule.name: rule.to_json_dict() for rule in rules
}
profiler.run(
variables=variables,
rules=rules_configs,
batch_list=batch_list,
batch_request=batch_request,
recompute_existing_parameter_values=False,
reconciliation_directives=DEFAULT_RECONCILATION_DIRECTIVES,
)
result: DataAssistantResult = data_assistant_result
result.profiler_config = profiler.config
result.metrics_by_domain = data_assistant.get_metrics_by_domain()
result.expectation_suite = data_assistant.get_expectation_suite(
expectation_suite=expectation_suite,
expectation_suite_name=expectation_suite_name,
include_citation=include_citation,
save_updated_expectation_suite=save_updated_expectation_suite,
)