Skip to content

Commit

Permalink
[FEATURE] ParameterBuilder for Computing Average Unexpected Values Fr…
Browse files Browse the repository at this point in the history
…actions for any Map Metric (#4340)
  • Loading branch information
alexsherstinsky committed Mar 7, 2022
1 parent daa626b commit bb306e0
Show file tree
Hide file tree
Showing 12 changed files with 542 additions and 14 deletions.
Expand Up @@ -188,8 +188,8 @@ def _generate_metric_configurations_to_check_cardinality(
MetricConfiguration(
metric_name=limit_mode.metric_name_defining_limit,
metric_domain_kwargs={
"batch_id": batch_id,
"column": column_name,
"batch_id": batch_id,
},
metric_value_kwargs=None,
metric_dependencies=None,
Expand Down
@@ -0,0 +1,191 @@
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np

from great_expectations.core.batch import Batch, BatchRequest, RuntimeBatchRequest
from great_expectations.rule_based_profiler.helpers.util import (
get_parameter_value_and_validate_return_type,
)
from great_expectations.rule_based_profiler.parameter_builder import (
MetricMultiBatchParameterBuilder,
)
from great_expectations.rule_based_profiler.parameter_builder.parameter_builder import (
MetricValues,
)
from great_expectations.rule_based_profiler.types import (
Domain,
ParameterContainer,
ParameterNode,
)


class MeanUnexpectedMapMetricMultiBatchParameterBuilder(
MetricMultiBatchParameterBuilder
):
"""
Compute mean unexpected count ratio (as a fraction) of a specified map-style metric across all specified batches.
"""

def __init__(
self,
name: str,
map_metric_name: str,
total_count_parameter_builder_name: str,
null_count_parameter_builder_name: Optional[str] = None,
metric_domain_kwargs: Optional[Union[str, dict]] = None,
metric_value_kwargs: Optional[Union[str, dict]] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Args:
name: the name of this parameter -- this is user-specified parameter name (from configuration);
it is not the fully-qualified parameter name; a fully-qualified parameter name must start with "$parameter."
and may contain one or more subsequent parts (e.g., "$parameter.<my_param_from_config>.<metric_name>").
map_metric_name: the name of a map metric (must be a supported and registered map metric); the suffix
".unexpected_count" will be appended to "map_metric_name" to be used in MetricConfiguration to get values.
total_count_parameter_builder_name: name of parameter that computes total_count (of rows in Batch).
null_count_parameter_builder_name: name of parameter that computes null_count (of domain values in Batch).
metric_domain_kwargs: used in MetricConfiguration
metric_value_kwargs: used in MetricConfiguration
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
name=name,
metric_name=f"{map_metric_name}.unexpected_count",
metric_domain_kwargs=metric_domain_kwargs,
metric_value_kwargs=metric_value_kwargs,
enforce_numeric_metric=True,
replace_nan_with_zero=True,
reduce_scalar_metric=True,
batch_list=batch_list,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

self._map_metric_name = map_metric_name
self._total_count_parameter_builder_name = total_count_parameter_builder_name
self._null_count_parameter_builder_name = null_count_parameter_builder_name

@property
def map_metric_name(self) -> str:
return self._map_metric_name

@property
def total_count_parameter_builder_name(self) -> str:
return self._total_count_parameter_builder_name

@property
def null_count_parameter_builder_name(self) -> Optional[str]:
return self._null_count_parameter_builder_name

def _build_parameters(
self,
parameter_container: ParameterContainer,
domain: Domain,
variables: Optional[ParameterContainer] = None,
parameters: Optional[Dict[str, ParameterContainer]] = None,
) -> Tuple[Any, dict]:
"""
Builds ParameterContainer object that holds ParameterNode objects with attribute name-value pairs and optional
details.
return: Tuple containing computed_parameter_value and parameter_computation_details metadata.
"""
# Obtain total_count_parameter_builder_name from "rule state" (i.e., variables and parameters); from instance variable otherwise.
total_count_parameter_builder_name: str = (
get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=self.total_count_parameter_builder_name,
expected_return_type=str,
variables=variables,
parameters=parameters,
)
)

fully_qualified_total_count_parameter_builder_name: str = (
f"$parameter.{total_count_parameter_builder_name}"
)
# Obtain total_count from "rule state" (i.e., variables and parameters); from instance variable otherwise.
total_count_parameter_node: ParameterNode = (
get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=fully_qualified_total_count_parameter_builder_name,
expected_return_type=None,
variables=variables,
parameters=parameters,
)
)
total_count_values: MetricValues = total_count_parameter_node.value

# Obtain null_count_parameter_builder_name from "rule state" (i.e., variables and parameters); from instance variable otherwise.
null_count_parameter_builder_name: str = (
get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=self.null_count_parameter_builder_name,
expected_return_type=str,
variables=variables,
parameters=parameters,
)
)

batch_ids: Optional[List[str]] = self.get_batch_ids(
domain=domain,
variables=variables,
parameters=parameters,
)
num_batch_ids: int = len(batch_ids)

null_count_values: MetricValues
if null_count_parameter_builder_name is None:
null_count_values = np.zeros(shape=(num_batch_ids,))
else:
fully_qualified_null_count_parameter_builder_name: str = (
f"$parameter.{null_count_parameter_builder_name}"
)
# Obtain null_count from "rule state" (i.e., variables and parameters); from instance variable otherwise.
null_count_parameter_node: ParameterNode = get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=fully_qualified_null_count_parameter_builder_name,
expected_return_type=None,
variables=variables,
parameters=parameters,
)
null_count_values = null_count_parameter_node.value

nonnull_count_values: np.ndarray = total_count_values - null_count_values

# Compute "unexpected_count" corresponding to "map_metric_name" (given as argument to this "ParameterBuilder").
super().build_parameters(
parameter_container=parameter_container,
domain=domain,
variables=variables,
parameters=parameters,
parameter_computation_impl=super()._build_parameters,
)

# Retrieve "unexpected_count" corresponding to "map_metric_name" (given as argument to this "ParameterBuilder").
parameter_node: ParameterNode = get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=self.fully_qualified_parameter_name,
expected_return_type=None,
variables=variables,
parameters=parameters,
)
unexpected_count_values: MetricValues = parameter_node.value

unexpected_count_ratio_values: np.ndarray = (
unexpected_count_values / nonnull_count_values
)
mean_unexpected_count_ratio: np.float64 = np.mean(unexpected_count_ratio_values)

return (
mean_unexpected_count_ratio,
parameter_node.details,
)
Expand Up @@ -31,6 +31,7 @@ def __init__(
reduce_scalar_metric: Union[str, bool] = True,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -47,13 +48,15 @@ def __init__(
reduce_scalar_metric: if True (default), then reduces computation of 1-dimensional metric to scalar value.
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
name=name,
data_context=data_context,
batch_list=batch_list,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

self._metric_name = metric_name
Expand Down
Expand Up @@ -70,6 +70,7 @@ def __init__(
] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -96,12 +97,14 @@ def __init__(
(i.e., lower_bound, upper_bound) to take on values outside the specified bounds when packaged on output.
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
name=name,
batch_list=batch_list,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

Expand Down
Expand Up @@ -92,6 +92,7 @@ def __init__(
name: str,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -103,6 +104,7 @@ def __init__(
and may contain one or more subsequent parts (e.g., "$parameter.<my_param_from_config>.<metric_name>").
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
Expand All @@ -113,6 +115,8 @@ def __init__(

self._name = name

self._json_serialize = json_serialize

def build_parameters(
self,
parameter_container: ParameterContainer,
Expand All @@ -137,9 +141,20 @@ def build_parameters(
parameters=parameters,
)

# Obtain json_serialize directive from "rule state" (i.e., variables and parameters); from instance variable otherwise.
json_serialize: bool = get_parameter_value_and_validate_return_type(
domain=domain,
parameter_reference=self.json_serialize,
expected_return_type=bool,
variables=variables,
parameters=parameters,
)

parameter_values: Dict[str, Any] = {
self.fully_qualified_parameter_name: {
"value": convert_to_json_serializable(data=computed_parameter_value),
"value": convert_to_json_serializable(data=computed_parameter_value)
if json_serialize
else computed_parameter_value,
"details": parameter_computation_details,
},
}
Expand All @@ -157,6 +172,14 @@ def fully_qualified_parameter_name(self) -> str:
def name(self) -> str:
return self._name

@property
def json_serialize(self) -> bool:
return self._json_serialize

@property
def data_context(self) -> "DataContext": # noqa: F821
return self._data_context

"""
Full getter/setter accessors for "batch_request" and "batch_list" are for configuring ParameterBuilder dynamically.
"""
Expand All @@ -179,10 +202,6 @@ def batch_list(self) -> Optional[List[Batch]]:
def batch_list(self, value: List[Batch]) -> None:
self._batch_list = value

@property
def data_context(self) -> "DataContext": # noqa: F821
return self._data_context

@abstractmethod
def _build_parameters(
self,
Expand Down
Expand Up @@ -49,6 +49,7 @@ def __init__(
candidate_regexes: Optional[Union[Iterable[str], str]] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -61,13 +62,15 @@ def __init__(
candidate_regexes: a list of candidate regex strings that will REPLACE the default
batch_list: Optional[List[Batch]] = None,
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
name=name,
batch_list=batch_list,
data_context=data_context,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

self._metric_domain_kwargs = metric_domain_kwargs
Expand Down
Expand Up @@ -94,6 +94,7 @@ def __init__(
candidate_strings: Optional[Union[Iterable[str], str]] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: bool = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -108,13 +109,15 @@ def __init__(
candidate_strings: a list of candidate date format strings that will replace the default
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
name=name,
data_context=data_context,
batch_list=batch_list,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

self._metric_domain_kwargs = metric_domain_kwargs
Expand Down
Expand Up @@ -40,6 +40,7 @@ def __init__(
metric_value_kwargs: Optional[Union[str, dict]] = None,
batch_list: Optional[List[Batch]] = None,
batch_request: Optional[Union[BatchRequest, RuntimeBatchRequest, dict]] = None,
json_serialize: Union[str, bool] = True,
data_context: Optional["DataContext"] = None, # noqa: F821
):
"""
Expand All @@ -51,6 +52,7 @@ def __init__(
metric_value_kwargs: used in MetricConfiguration
batch_list: explicitly passed Batch objects for parameter computation (take precedence over batch_request).
batch_request: specified in ParameterBuilder configuration to get Batch objects for parameter computation.
json_serialize: If True (default), convert computed value to JSON prior to saving results.
data_context: DataContext
"""
super().__init__(
Expand All @@ -63,6 +65,7 @@ def __init__(
reduce_scalar_metric=False,
batch_list=batch_list,
batch_request=batch_request,
json_serialize=json_serialize,
data_context=data_context,
)

Expand Down

0 comments on commit bb306e0

Please sign in to comment.