Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Small refactor of ExecutionEngine.resolve_metrics() for better code readability (and miscellaneous additional clean up) #6587

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion great_expectations/core/batch_manager.py
Expand Up @@ -98,7 +98,7 @@ def active_batch_id(self) -> Optional[str]:
the list of "Batch" objects, each of whose BatchData has already been loaded (and cached). Since BatchData IDs
are from the same name space as Batch IDs, this helps avoid unnecessary loading of data from different backends.
"""
active_batch_data_id: Optional[str] = self._active_batch_data_id
active_batch_data_id: Optional[str] = self.active_batch_data_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this a bug? What is the difference between these two attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer Using full property takes advantage of the checks that property method performs -- this is more robust.

if active_batch_data_id != self._active_batch_id:
logger.warning(
"ID of active Batch and ID of active loaded BatchData differ."
Expand Down
61 changes: 36 additions & 25 deletions great_expectations/execution_engine/execution_engine.py
Expand Up @@ -347,6 +347,9 @@ def resolve_metrics(
Returns:
resolved_metrics (Dict): a dictionary with the values for the metrics that have just been resolved.
"""
if not metrics_to_resolve:
return metrics or {}

metric_fn_direct_configurations: List[MetricComputationConfiguration]
metric_fn_bundle_configurations: List[MetricComputationConfiguration]
(
Expand All @@ -357,13 +360,10 @@ def resolve_metrics(
metrics=metrics,
runtime_configuration=runtime_configuration,
)
resolved_metrics: Dict[
Tuple[str, str, str], MetricValue
] = self._process_direct_and_bundled_metric_computation_configurations(
return self._process_direct_and_bundled_metric_computation_configurations(
metric_fn_direct_configurations=metric_fn_direct_configurations,
metric_fn_bundle_configurations=metric_fn_bundle_configurations,
)
return resolved_metrics

def resolve_metric_bundle(
self, metric_fn_bundle
Expand Down Expand Up @@ -464,7 +464,8 @@ def _build_direct_and_bundled_metric_computation_configurations(
metrics: Optional[Dict[Tuple[str, str, str], MetricValue]] = None,
runtime_configuration: Optional[dict] = None,
) -> Tuple[
List[MetricComputationConfiguration], List[MetricComputationConfiguration]
List[MetricComputationConfiguration],
List[MetricComputationConfiguration],
]:
"""
This method organizes "metrics_to_resolve" ("MetricConfiguration" objects) into two lists: direct and bundled.
Expand All @@ -479,12 +480,18 @@ def _build_direct_and_bundled_metric_computation_configurations(
Returns:
Tuple with two elements: directly-computable and bundled "MetricComputationConfiguration" objects
"""
if metrics is None:
metrics = {}

metric_fn_direct_configurations: List[MetricComputationConfiguration] = []
metric_fn_bundle_configurations: List[MetricComputationConfiguration] = []

if not metrics_to_resolve:
return (
metric_fn_direct_configurations,
metric_fn_bundle_configurations,
)

if metrics is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably want to keep with the pattern used elsewhere: if not metrics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer In this case we are actually focusing on "None" (as opposed to "having nothing in it" -- i.e., being "Falsy"). So this is a stronger statement. Happy to discuss. Thanks!

metrics = {}

resolved_metric_dependencies_by_metric_name: Dict[
str, Union[MetricValue, Tuple[Any, dict, dict]]
]
Expand Down Expand Up @@ -562,7 +569,10 @@ def _build_direct_and_bundled_metric_computation_configurations(
)
)

return metric_fn_direct_configurations, metric_fn_bundle_configurations
return (
metric_fn_direct_configurations,
metric_fn_bundle_configurations,
)

def _get_computed_metric_evaluation_dependencies_by_metric_name(
self,
Expand Down Expand Up @@ -623,6 +633,7 @@ def _process_direct_and_bundled_metric_computation_configurations(
resolved_metrics: Dict[Tuple[str, str, str], MetricValue] = {}

metric_computation_configuration: MetricComputationConfiguration

for metric_computation_configuration in metric_fn_direct_configurations:
try:
resolved_metrics[
Expand All @@ -638,22 +649,22 @@ def _process_direct_and_bundled_metric_computation_configurations(
),
) from e

if len(metric_fn_bundle_configurations) > 0:
try:
# an engine-specific way of computing metrics together
resolved_metric_bundle: Dict[
Tuple[str, str, str], MetricValue
] = self.resolve_metric_bundle(
metric_fn_bundle=metric_fn_bundle_configurations
)
resolved_metrics.update(resolved_metric_bundle)
except Exception as e:
raise ge_exceptions.MetricResolutionError(
message=str(e),
failed_metrics=[
x.metric_configuration for x in metric_fn_bundle_configurations
],
) from e
try:
# an engine-specific way of computing metrics together
resolved_metric_bundle: Dict[
Tuple[str, str, str], MetricValue
] = self.resolve_metric_bundle(
metric_fn_bundle=metric_fn_bundle_configurations
)
resolved_metrics.update(resolved_metric_bundle)
except Exception as e:
raise ge_exceptions.MetricResolutionError(
message=str(e),
failed_metrics=[
metric_computation_configuration.metric_configuration
for metric_computation_configuration in metric_fn_bundle_configurations
],
) from e

if self._caching:
self._metric_cache.update(resolved_metrics)
Expand Down
Expand Up @@ -444,11 +444,13 @@ def _get_reader_fn(self, reader_method=None, path=None):
f'Unable to find reader_method "{reader_method}" in pandas.'
)

def resolve_metric_bundle( # type: ignore[empty-body]
def resolve_metric_bundle(
self, metric_fn_bundle
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexsherstinsky can you add an annotation (other than Any) for metric_fn_bundle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Kilo59 No -- this has already been discussed before -- it can be Callable, any of a great many of SqlAlchemy functions, and any of a great many Spark functions. I do not see an easy possibility here. Happy to discuss. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callable is better than no annotation.

) -> Dict[Tuple[str, str, str], Any]:
"""Resolve a bundle of metrics with the same compute domain as part of a single trip to the compute engine."""
pass # This method is NO-OP for PandasExecutionEngine (no bundling for direct execution computational backend).
return (
{}
) # This is NO-OP for "PandasExecutionEngine" (no bundling for direct execution computational backend).

def get_domain_records( # noqa: C901 - 17
self,
Expand Down