Skip to content

Commit

Permalink
[MAINTENANCE] Small refactor of ExecutionEngine.resolve_metrics() for…
Browse files Browse the repository at this point in the history
… better code readability (and miscellaneous additional clean up) (#6587)
  • Loading branch information
alexsherstinsky committed Dec 15, 2022
1 parent 86cb443 commit 2d38384
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 28 deletions.
2 changes: 1 addition & 1 deletion great_expectations/core/batch_manager.py
Expand Up @@ -98,7 +98,7 @@ def active_batch_id(self) -> Optional[str]:
the list of "Batch" objects, each of whose BatchData has already been loaded (and cached). Since BatchData IDs
are from the same name space as Batch IDs, this helps avoid unnecessary loading of data from different backends.
"""
active_batch_data_id: Optional[str] = self._active_batch_data_id
active_batch_data_id: Optional[str] = self.active_batch_data_id
if active_batch_data_id != self._active_batch_id:
logger.warning(
"ID of active Batch and ID of active loaded BatchData differ."
Expand Down
61 changes: 36 additions & 25 deletions great_expectations/execution_engine/execution_engine.py
Expand Up @@ -347,6 +347,9 @@ def resolve_metrics(
Returns:
resolved_metrics (Dict): a dictionary with the values for the metrics that have just been resolved.
"""
if not metrics_to_resolve:
return metrics or {}

metric_fn_direct_configurations: List[MetricComputationConfiguration]
metric_fn_bundle_configurations: List[MetricComputationConfiguration]
(
Expand All @@ -357,13 +360,10 @@ def resolve_metrics(
metrics=metrics,
runtime_configuration=runtime_configuration,
)
resolved_metrics: Dict[
Tuple[str, str, str], MetricValue
] = self._process_direct_and_bundled_metric_computation_configurations(
return self._process_direct_and_bundled_metric_computation_configurations(
metric_fn_direct_configurations=metric_fn_direct_configurations,
metric_fn_bundle_configurations=metric_fn_bundle_configurations,
)
return resolved_metrics

def resolve_metric_bundle(
self, metric_fn_bundle
Expand Down Expand Up @@ -464,7 +464,8 @@ def _build_direct_and_bundled_metric_computation_configurations(
metrics: Optional[Dict[Tuple[str, str, str], MetricValue]] = None,
runtime_configuration: Optional[dict] = None,
) -> Tuple[
List[MetricComputationConfiguration], List[MetricComputationConfiguration]
List[MetricComputationConfiguration],
List[MetricComputationConfiguration],
]:
"""
This method organizes "metrics_to_resolve" ("MetricConfiguration" objects) into two lists: direct and bundled.
Expand All @@ -479,12 +480,18 @@ def _build_direct_and_bundled_metric_computation_configurations(
Returns:
Tuple with two elements: directly-computable and bundled "MetricComputationConfiguration" objects
"""
if metrics is None:
metrics = {}

metric_fn_direct_configurations: List[MetricComputationConfiguration] = []
metric_fn_bundle_configurations: List[MetricComputationConfiguration] = []

if not metrics_to_resolve:
return (
metric_fn_direct_configurations,
metric_fn_bundle_configurations,
)

if metrics is None:
metrics = {}

resolved_metric_dependencies_by_metric_name: Dict[
str, Union[MetricValue, Tuple[Any, dict, dict]]
]
Expand Down Expand Up @@ -562,7 +569,10 @@ def _build_direct_and_bundled_metric_computation_configurations(
)
)

return metric_fn_direct_configurations, metric_fn_bundle_configurations
return (
metric_fn_direct_configurations,
metric_fn_bundle_configurations,
)

def _get_computed_metric_evaluation_dependencies_by_metric_name(
self,
Expand Down Expand Up @@ -623,6 +633,7 @@ def _process_direct_and_bundled_metric_computation_configurations(
resolved_metrics: Dict[Tuple[str, str, str], MetricValue] = {}

metric_computation_configuration: MetricComputationConfiguration

for metric_computation_configuration in metric_fn_direct_configurations:
try:
resolved_metrics[
Expand All @@ -638,22 +649,22 @@ def _process_direct_and_bundled_metric_computation_configurations(
),
) from e

if len(metric_fn_bundle_configurations) > 0:
try:
# an engine-specific way of computing metrics together
resolved_metric_bundle: Dict[
Tuple[str, str, str], MetricValue
] = self.resolve_metric_bundle(
metric_fn_bundle=metric_fn_bundle_configurations
)
resolved_metrics.update(resolved_metric_bundle)
except Exception as e:
raise ge_exceptions.MetricResolutionError(
message=str(e),
failed_metrics=[
x.metric_configuration for x in metric_fn_bundle_configurations
],
) from e
try:
# an engine-specific way of computing metrics together
resolved_metric_bundle: Dict[
Tuple[str, str, str], MetricValue
] = self.resolve_metric_bundle(
metric_fn_bundle=metric_fn_bundle_configurations
)
resolved_metrics.update(resolved_metric_bundle)
except Exception as e:
raise ge_exceptions.MetricResolutionError(
message=str(e),
failed_metrics=[
metric_computation_configuration.metric_configuration
for metric_computation_configuration in metric_fn_bundle_configurations
],
) from e

if self._caching:
self._metric_cache.update(resolved_metrics)
Expand Down
Expand Up @@ -444,11 +444,13 @@ def _get_reader_fn(self, reader_method=None, path=None):
f'Unable to find reader_method "{reader_method}" in pandas.'
)

def resolve_metric_bundle( # type: ignore[empty-body]
def resolve_metric_bundle(
self, metric_fn_bundle
) -> Dict[Tuple[str, str, str], Any]:
"""Resolve a bundle of metrics with the same compute domain as part of a single trip to the compute engine."""
pass # This method is NO-OP for PandasExecutionEngine (no bundling for direct execution computational backend).
return (
{}
) # This is NO-OP for "PandasExecutionEngine" (no bundling for direct execution computational backend).

def get_domain_records( # noqa: C901 - 17
self,
Expand Down

0 comments on commit 2d38384

Please sign in to comment.