Skip to content

Commit

Permalink
Merge branch 'develop' into maintenance/great-464/great-610/refactor-…
Browse files Browse the repository at this point in the history
…rbp-config-types
  • Loading branch information
cdkini committed Apr 19, 2022
2 parents a55a949 + d5c0393 commit 8ca2148
Show file tree
Hide file tree
Showing 24 changed files with 1,695 additions and 1,361 deletions.
280 changes: 37 additions & 243 deletions great_expectations/rule_based_profiler/data_assistant/data_assistant.py
@@ -1,10 +1,7 @@
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union

import altair as alt
import pandas as pd

from great_expectations.core import ExpectationConfiguration, ExpectationSuite
from great_expectations.core import ExpectationSuite
from great_expectations.core.batch import Batch, BatchRequestBase
from great_expectations.data_context import BaseDataContext
from great_expectations.execution_engine.execution_engine import MetricDomainTypes
Expand All @@ -25,14 +22,10 @@
RuleBasedProfiler,
)
from great_expectations.rule_based_profiler.types import (
ALTAIR_DEFAULT_CONFIGURATION,
FULLY_QUALIFIED_PARAMETER_NAME_ATTRIBUTED_VALUE_KEY,
FULLY_QUALIFIED_PARAMETER_NAME_METADATA_KEY,
FULLY_QUALIFIED_PARAMETER_NAME_SEPARATOR_CHARACTER,
DataAssistantResult,
Domain,
ParameterNode,
)
from great_expectations.types import ColorPalettes, Colors
from great_expectations.util import measure_execution_time


Expand All @@ -51,9 +44,9 @@ class DataAssistant(ABC):
result: DataAssistantResult = data_assistant.run()
Then:
metrics: Dict[Domain, Dict[str, Any]] = result.metrics
expectation_configurations: List[ExpectationConfiguration] = result.expectation_configurations
metrics: Dict[Domain, Dict[str, ParameterNode]] = result.metrics
expectation_suite: ExpectationSuite = result.expectation_suite
expectation_configurations: List[ExpectationConfiguration] = result.expectation_suite.expectations
expectation_suite_meta: Dict[str, Any] = expectation_suite.meta
profiler_config: RuleBasedProfilerConfig = result.profiler_config
"""
Expand All @@ -72,7 +65,7 @@ def __init__(
Args:
name: the name of this DataAssistant object.
batch_request: specified specified for querying data Batch objects.
batch_request: specified for querying data Batch objects.
data_context: DataContext
"""
self._name = name
Expand Down Expand Up @@ -115,10 +108,10 @@ def _build_profiler(self) -> None:
expectation_configuration_builders: List[ExpectationConfigurationBuilder]

"""
For each Self-Initializing Expectation as specified by "DataAssistant.expectation_kwargs_by_expectation_type()"
interface method, retrieve its "RuleBasedProfiler" configuration, construct "Rule" object based on configuration
For each Self-Initializing Expectation as specified by "DataAssistant.expectation_kwargs_by_expectation_type"
interface property, retrieve its "RuleBasedProfiler" configuration, construct "Rule" object based on configuration
therein and incorporating metrics "ParameterBuilder" objects for "MetricDomainTypes", emitted by "DomainBuilder"
of comprised "Rule", specified by "DataAssistant.metrics_parameter_builders_by_domain_type()" interface method.
of comprised "Rule", specified by "DataAssistant.metrics_parameter_builders_by_domain_type" interface property.
Append this "Rule" object to overall DataAssistant "RuleBasedProfiler" object; incorporate "variables" as well.
"""
expectation_kwargs: Dict[str, Any]
Expand Down Expand Up @@ -163,10 +156,7 @@ def run(
expectation_suite_name: Optional[str] = None,
include_citation: bool = True,
) -> DataAssistantResult:
data_assistant_cls: type = type(self)
result: DataAssistantResult = DataAssistantResult(
data_assistant_cls=data_assistant_cls, execution_time=0.0
)
result: DataAssistantResult = DataAssistantResult(execution_time=0.0)
run_profiler_on_data(
data_assistant=self,
data_assistant_result=result,
Expand All @@ -181,181 +171,6 @@ def run(
)
return result

def _plot(self, charts: List[alt.Chart]):
"""
Display each chart passed in Jupyter Notebook
"""
for c in charts:
c.configure(**ALTAIR_DEFAULT_CONFIGURATION).display()

def get_line_chart(
self,
df: pd.DataFrame,
metric: str,
metric_type: str,
x_axis: str,
x_axis_type: str,
line_color: Optional[str] = Colors.BLUE_2.value,
point_color: Optional[str] = Colors.GREEN.value,
point_color_condition: Optional[alt.condition] = None,
tooltip: Optional[List[alt.Tooltip]] = None,
) -> alt.Chart:
metric_title: str = metric.replace("_", " ").title()
x_axis_title: str = x_axis.title()
title: str = f"{metric_title} by {x_axis_title}"

batch_id: str = "batch_id"
batch_id_type: str = "nominal"

if tooltip is None:
tooltip: List[alt.Tooltip] = [
alt.Tooltip(field=batch_id, type=batch_id_type),
alt.Tooltip(field=metric, type=metric_type, format=","),
]

line: alt.Chart = (
alt.Chart(data=df, title=title)
.mark_line(color=line_color)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(metric, type=metric_type, title=metric_title),
tooltip=tooltip,
)
)

if point_color_condition is not None:
points: alt.Chart = (
alt.Chart(data=df, title=title)
.mark_point(opacity=1.0)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(metric, type=metric_type, title=metric_title),
stroke=point_color_condition,
fill=point_color_condition,
tooltip=tooltip,
)
)
else:
points: alt.Chart = (
alt.Chart(data=df, title=title)
.mark_point(stroke=point_color, fill=point_color, opacity=1.0)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(metric, type=metric_type, title=metric_title),
tooltip=tooltip,
)
)

return line + points

def get_expect_domain_values_to_be_between_chart(
self,
df: pd.DataFrame,
metric: str,
metric_type: str,
x_axis: str,
x_axis_type: str,
) -> alt.Chart:
opacity: float = 0.9
line_color: alt.HexColor = alt.HexColor(ColorPalettes.HEATMAP.value[4])
fill_color: alt.HexColor = alt.HexColor(ColorPalettes.HEATMAP.value[5])

metric_title: str = metric.replace("_", " ").title()
x_axis_title: str = x_axis.title()

batch_id: str = "batch_id"
batch_id_type: str = "nominal"
min_value: str = "min_value"
min_value_type: str = "quantitative"
max_value: str = "max_value"
max_value_type: str = "quantitative"

tooltip: list[alt.Tooltip] = [
alt.Tooltip(field=batch_id, type=batch_id_type),
alt.Tooltip(field=metric, type=metric_type, format=","),
alt.Tooltip(field=min_value, type=min_value_type, format=","),
alt.Tooltip(field=max_value, type=max_value_type, format=","),
]

lower_limit: alt.Chart = (
alt.Chart(data=df)
.mark_line(color=line_color, opacity=opacity)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(min_value, type=metric_type, title=metric_title),
tooltip=tooltip,
)
)

upper_limit: alt.Chart = (
alt.Chart(data=df)
.mark_line(color=line_color, opacity=opacity)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(max_value, type=metric_type, title=metric_title),
tooltip=tooltip,
)
)

band: alt.Chart = (
alt.Chart(data=df)
.mark_area(fill=fill_color, fillOpacity=opacity)
.encode(
x=alt.X(
x_axis,
type=x_axis_type,
title=x_axis_title,
),
y=alt.Y(min_value, title=metric_title, type=metric_type),
y2=alt.Y2(max_value, title=metric_title),
)
)

predicate = (
(alt.datum.min_value > alt.datum.table_row_count)
& (alt.datum.max_value > alt.datum.table_row_count)
) | (
(alt.datum.min_value < alt.datum.table_row_count)
& (alt.datum.max_value < alt.datum.table_row_count)
)
point_color_condition: alt.condition = alt.condition(
predicate=predicate,
if_false=alt.value(Colors.GREEN.value),
if_true=alt.value(Colors.PINK.value),
)
anomaly_coded_line: alt.Chart = self.get_line_chart(
self,
df=df,
metric=metric,
metric_type=metric_type,
x_axis=x_axis,
x_axis_type=x_axis_type,
point_color_condition=point_color_condition,
tooltip=tooltip,
)

return band + lower_limit + upper_limit + anomaly_coded_line

@property
def name(self) -> str:
return self._name
Expand Down Expand Up @@ -413,19 +228,34 @@ def rules(self) -> Optional[List[Rule]]:
"""
pass

def get_metrics(self) -> Dict[Domain, Dict[str, Any]]:
@abstractmethod
def plot(
self,
result: DataAssistantResult,
prescriptive: bool = False,
) -> None:
"""
Use contents of "DataAssistantResult" object to display mentrics and other detail for visualization purposes.
Args:
result: "DataAssistantResult", obtained by executing "DataAssistant.run()" method (contains available data).
prescriptive: Type of plot to generate.
"""
Obtain subset of all parameter values for fully-qualified parameter names by Domain, available from entire
pass

def get_metrics_by_domain(self) -> Dict[Domain, Dict[str, ParameterNode]]:
"""
Obtain subset of all parameter values for fully-qualified parameter names by domain, available from entire
"RuleBasedProfiler" state, where "Domain" types are among keys included in provisions as proscribed by return
value of "DataAssistant.metrics_parameter_builders_by_domain_type()" interface method and actual fully-qualified
parameter names match interface properties of "ParameterBuilder" objects, corresponding to these "Domain" types.
value of "DataAssistant.metrics_parameter_builders_by_domain_type" interface property and actual fully-qualified
parameter names match interface properties of "ParameterBuilder" objects, corresponding to these "domain" types.
Returns:
Dictionaries of values for fully-qualified parameter names by Domain for metrics, computed by "RuleBasedProfiler" state.
returns:
dictionaries of values for fully-qualified parameter names by domain for metrics, computed by "rulebasedprofiler" state.
"""
# noinspection PyTypeChecker
parameter_values_for_fully_qualified_parameter_names_by_domain: Dict[
Domain, Dict[str, Any]
Domain, Dict[str, ParameterNode]
] = dict(
filter(
lambda element: element[0].domain_type
Expand All @@ -445,21 +275,13 @@ def get_metrics(self) -> Dict[Domain, Dict[str, Any]]:
domain_type,
parameter_builders,
) in self.metrics_parameter_builders_by_domain_type.items():
fully_qualified_metrics_parameter_names_by_domain_type[domain_type] = []
for parameter_builder in parameter_builders:
fully_qualified_metrics_parameter_names_by_domain_type[
domain_type
].append(
f"{parameter_builder.fully_qualified_parameter_name}{FULLY_QUALIFIED_PARAMETER_NAME_SEPARATOR_CHARACTER}{FULLY_QUALIFIED_PARAMETER_NAME_ATTRIBUTED_VALUE_KEY}"
)
fully_qualified_metrics_parameter_names_by_domain_type[
domain_type
].append(
f"{parameter_builder.fully_qualified_parameter_name}{FULLY_QUALIFIED_PARAMETER_NAME_SEPARATOR_CHARACTER}{FULLY_QUALIFIED_PARAMETER_NAME_METADATA_KEY}"
)
fully_qualified_metrics_parameter_names_by_domain_type[domain_type] = [
parameter_builder.fully_qualified_parameter_name
for parameter_builder in parameter_builders
]

doain: Domain
parameter_values_for_fully_qualified_parameter_names: Dict[str, Any]
parameter_values_for_fully_qualified_parameter_names: Dict[str, ParameterNode]
# noinspection PyTypeChecker
parameter_values_for_fully_qualified_parameter_names_by_domain = {
domain: dict(
Expand All @@ -476,27 +298,6 @@ def get_metrics(self) -> Dict[Domain, Dict[str, Any]]:

return parameter_values_for_fully_qualified_parameter_names_by_domain

def get_expectation_suite_meta(
self,
expectation_suite: Optional[ExpectationSuite] = None,
expectation_suite_name: Optional[str] = None,
include_citation: bool = True,
) -> Dict[str, Any]:
"""
Args:
expectation_suite: An existing "ExpectationSuite" to update.
expectation_suite_name: A name for returned "ExpectationSuite".
include_citation: Whether or not to include the Profiler config in the metadata for "ExpectationSuite" produced by "RuleBasedProfiler"
Returns:
Dictionary corresponding to meta property of "ExpectationSuite" using "ExpectationConfiguration" objects, computed by "RuleBasedProfiler" state.
"""
return self.profiler.get_expectation_suite_meta(
expectation_suite=expectation_suite,
expectation_suite_name=expectation_suite_name,
include_citation=include_citation,
)

def get_expectation_suite(
self,
expectation_suite: Optional[ExpectationSuite] = None,
Expand All @@ -518,13 +319,6 @@ def get_expectation_suite(
include_citation=include_citation,
)

def get_expectation_configurations(self) -> List[ExpectationConfiguration]:
"""
Returns:
List of "ExpectationConfiguration" objects, computed by "RuleBasedProfiler" state.
"""
return self.profiler.get_expectation_configurations()


@measure_execution_time(
execution_time_holder_object_reference_name="data_assistant_result",
Expand Down Expand Up @@ -560,7 +354,7 @@ def run_profiler_on_data(
)
result: DataAssistantResult = data_assistant_result
result.profiler_config = profiler.config
result.metrics = data_assistant.get_metrics()
result.metrics_by_domain = data_assistant.get_metrics_by_domain()
result.expectation_suite = data_assistant.get_expectation_suite(
expectation_suite=expectation_suite,
expectation_suite_name=expectation_suite_name,
Expand Down
Empty file.

0 comments on commit 8ca2148

Please sign in to comment.