Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support to include ID/PK in validation result for each row - SQL #6448

Merged
merged 56 commits into from
Dec 5, 2022
Merged
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
017350c
Before the fix
Nov 17, 2022
2af5d8d
Merge branch 'develop' into b/dx-67/bugfix-metrics-return-empty-value
Nov 17, 2022
43d30ac
cleaned up db references
Nov 18, 2022
0c93b5d
Merge branch 'develop' into b/dx-67/bugfix-metrics-return-empty-value
Nov 18, 2022
015b84f
bugfix
Nov 18, 2022
65c3e3a
Merge branch 'b/dx-67/bugfix-metrics-return-empty-value' of https://g…
Nov 18, 2022
61867c2
updated etst to not include extra comments or output
Nov 18, 2022
c2043bd
Update test_map_metric.py
Nov 28, 2022
c06b838
Merge branch 'develop' into b/dx-67/bugfix-metrics-return-empty-value
Nov 28, 2022
583f89d
Update test_map_metric.py
Nov 28, 2022
93ae465
Update test_map_metric.py
Nov 29, 2022
3b983a6
update column names
Nov 29, 2022
712d9e2
Merge branch 'develop' into b/dx-67/bugfix-metrics-return-empty-value
Nov 29, 2022
cb54172
Merge branch 'develop' into b/dx-67/bugfix-metrics-return-empty-value
Nov 29, 2022
7dd34c3
oops
Nov 29, 2022
75cc9b2
Sql Metrics added from other PR
Nov 29, 2022
5a3b22f
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Nov 30, 2022
f5d1a05
Update test_map_metric.py
Nov 30, 2022
6ab5d3e
see if this works for now
Nov 30, 2022
1cfc4df
push before final check
Nov 30, 2022
9bfdde1
fixed
Dec 1, 2022
2447162
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Dec 1, 2022
5a11c0e
cleaned up
Dec 1, 2022
75134e9
cleaned up final function
Dec 1, 2022
7f03f3d
very large
Dec 1, 2022
c4c78c7
final clean up
Dec 1, 2022
1c4d0b9
make it shorter
Dec 1, 2022
4b18bc7
Update map_metric_provider.py
Dec 1, 2022
ab9be1f
simplifying the filtering
Dec 2, 2022
94e54a9
whoohoo
Dec 2, 2022
a2c7fcb
let's add some tests
Dec 2, 2022
5d9c923
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Dec 2, 2022
b8c06b1
Update test_metrics_util.py
Dec 2, 2022
145c806
Update map_metric_provider.py
Dec 2, 2022
1b1163f
Update test_metrics_util.py
Dec 2, 2022
1bf4bf8
last fix
Dec 3, 2022
f75ea66
AB testing
Dec 3, 2022
a8a5997
final set
Dec 3, 2022
1090b07
much cleaner
Dec 3, 2022
159966f
athena needs special treatment
Dec 3, 2022
97cf510
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Dec 3, 2022
2783c04
monkeypatch everything
Dec 3, 2022
8ca9b3f
Update test_metrics_util.py
Dec 3, 2022
47caaca
wow what was that
Dec 3, 2022
9e2c00d
ok so we here go again
Dec 3, 2022
ed131f1
i think i finally haev it this time
Dec 3, 2022
6deb72e
Update metrics_test.db
Dec 4, 2022
72494f9
Update test_metrics_util.py
Dec 4, 2022
7d63cd2
now the tests are fixed
Dec 5, 2022
071cf19
q1
Dec 5, 2022
40cf1f6
clean up of tests
Dec 5, 2022
ba177f6
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Dec 5, 2022
c5e9741
Revert "clean up of tests"
Dec 5, 2022
2eb7aa9
Revert "Revert "clean up of tests""
Dec 5, 2022
2abc598
updated after synchronous review
Dec 5, 2022
2fe0627
Merge branch 'develop' into b/dx-67/sql-pk-id-metric
Dec 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions great_expectations/expectations/expectation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@
InvalidExpectationConfigurationError,
InvalidExpectationKwargsError,
)
from great_expectations.execution_engine import ExecutionEngine, PandasExecutionEngine
from great_expectations.execution_engine import (
ExecutionEngine,
PandasExecutionEngine,
SqlAlchemyExecutionEngine,
)
from great_expectations.expectations.registry import (
_registered_metrics,
_registered_renderers,
Expand Down Expand Up @@ -2307,7 +2311,6 @@ def get_validation_dependencies(
assert (
self.metric_dependencies == tuple()
), "ColumnMapExpectation must be configured using map_metric, and cannot have metric_dependencies declared."
# convenient name for updates

metric_kwargs: dict

Expand Down Expand Up @@ -2425,7 +2428,34 @@ def get_validation_dependencies(
metric_value_kwargs=metric_kwargs["metric_value_kwargs"],
),
)

if isinstance(execution_engine, SqlAlchemyExecutionEngine):
if "unexpected_index_column_names" in validation_dependencies.result_format:
metric_kwargs = get_metric_kwargs(
f"{self.map_metric}.unexpected_index_list",
configuration=configuration,
runtime_configuration=runtime_configuration,
)
validation_dependencies.set_metric_configuration(
metric_name=f"{self.map_metric}.unexpected_index_list",
metric_configuration=MetricConfiguration(
metric_name=f"{self.map_metric}.unexpected_index_list",
metric_domain_kwargs=metric_kwargs["metric_domain_kwargs"],
metric_value_kwargs=metric_kwargs["metric_value_kwargs"],
),
)
metric_kwargs = get_metric_kwargs(
f"{self.map_metric}.unexpected_index_query",
configuration=configuration,
runtime_configuration=runtime_configuration,
)
validation_dependencies.set_metric_configuration(
metric_name=f"{self.map_metric}.unexpected_index_query",
metric_configuration=MetricConfiguration(
metric_name=f"{self.map_metric}.unexpected_index_query",
metric_domain_kwargs=metric_kwargs["metric_domain_kwargs"],
metric_value_kwargs=metric_kwargs["metric_value_kwargs"],
),
)
return validation_dependencies

def _validate(
Expand Down Expand Up @@ -2458,6 +2488,9 @@ def _validate(
unexpected_index_list: Optional[List[int]] = metrics.get(
f"{self.map_metric}.unexpected_index_list"
)
unexpected_index_query: Optional[str] = metrics.get(
f"{self.map_metric}.unexpected_index_query"
)
unexpected_rows = None
if include_unexpected_rows:
unexpected_rows = metrics.get(f"{self.map_metric}.unexpected_rows")
Expand Down Expand Up @@ -2488,6 +2521,7 @@ def _validate(
unexpected_list=unexpected_values,
unexpected_index_list=unexpected_index_list,
unexpected_rows=unexpected_rows,
unexpected_index_query=unexpected_index_query,
)


Expand Down Expand Up @@ -2552,8 +2586,6 @@ def get_validation_dependencies(
assert (
self.metric_dependencies == tuple()
), "ColumnPairMapExpectation must be configured using map_metric, and cannot have metric_dependencies declared."
# convenient name for updates

metric_kwargs: dict

metric_kwargs = get_metric_kwargs(
Expand Down Expand Up @@ -2937,6 +2969,7 @@ def _format_map_output(
unexpected_count: Optional[int] = None,
unexpected_list: Optional[List[Any]] = None,
unexpected_index_list: Optional[List[int]] = None,
unexpected_index_query: Optional[str] = None,
unexpected_rows=None,
) -> Dict:
"""Helper function to construct expectation result objects for map_expectations (such as column_map_expectation
Expand Down Expand Up @@ -3060,7 +3093,8 @@ def _format_map_output(
return_obj["result"].update({"unexpected_list": unexpected_list})
if unexpected_index_list is not None:
return_obj["result"].update({"unexpected_index_list": unexpected_index_list})

if unexpected_index_query is not None:
return_obj["result"].update({"unexpected_index_query": unexpected_index_query})
if result_format["result_format"] == "COMPLETE":
return return_obj

Expand Down
149 changes: 149 additions & 0 deletions great_expectations/expectations/metrics/map_metric_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
Label,
Select,
get_dbms_compatible_column_names,
get_sqlalchemy_source_table_and_schema,
sql_statement_with_post_compile_to_string,
verify_column_names_exist,
)
from great_expectations.expectations.registry import (
Expand Down Expand Up @@ -2377,6 +2379,134 @@ def _sqlalchemy_map_condition_rows(
)


def _sqlalchemy_map_condition_query(
cls,
execution_engine: SqlAlchemyExecutionEngine,
metric_domain_kwargs: Dict,
metric_value_kwargs: Dict,
metrics: Dict[str, Any],
**kwargs,
) -> Optional[str]:
"""
Returns query that will return all rows which do not meet an expected Expectation condition for instances
of ColumnMapExpectation.

Requires `unexpected_index_column_names` to be part of `result_format` dict to specify primary_key columns
to return, along with column the Expectation is run on.
"""
(
unexpected_condition,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics.get("unexpected_condition")

domain_kwargs: dict = dict(**compute_domain_kwargs, **accessor_domain_kwargs)
result_format: dict = metric_value_kwargs["result_format"]

column_selector: List[sa.Column] = [sa.column(domain_kwargs["column"])]
all_table_columns: List[str] = metrics.get("table.columns")
unexpected_index_column_names: List[str] = result_format.get(
"unexpected_index_column_names"
)
for column_name in unexpected_index_column_names:
if column_name not in all_table_columns:
raise ge_exceptions.InvalidMetricAccessorDomainKwargsKeyError(
message=f'Error: The unexpected_index_column: "{column_name}" in does not exist in SQL Table. '
f"Please check your configuration and try again."
)
column_selector.append(sa.column(column_name))

unexpected_condition_query_with_selected_columns: sa.select = sa.select(
column_selector
).where(unexpected_condition)
source_table_and_schema: sa.Table = get_sqlalchemy_source_table_and_schema(
execution_engine
)

source_table_and_schema_as_selectable: Union[
sa.Table, sa.Select
] = get_sqlalchemy_selectable(source_table_and_schema)
final_select_statement: sa.select = (
unexpected_condition_query_with_selected_columns.select_from(
source_table_and_schema_as_selectable
)
)

query_as_string: str = sql_statement_with_post_compile_to_string(
engine=execution_engine, select_statement=final_select_statement
)
return query_as_string


def _sqlalchemy_map_condition_index(
cls,
execution_engine: PandasExecutionEngine,
metric_domain_kwargs: Dict,
metric_value_kwargs: Dict,
metrics: Dict[str, Any],
**kwargs,
) -> List[Dict[str, Any]]:
"""
Returns indices of the metric values which do not meet an expected Expectation condition for instances
of ColumnMapExpectation.

Requires `unexpected_index_column_names` to be part of `result_format` dict to specify primary_key columns
to return.
"""
(
unexpected_condition,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics.get("unexpected_condition")

domain_kwargs: dict = dict(**compute_domain_kwargs, **accessor_domain_kwargs)
result_format: dict = metric_value_kwargs["result_format"]

column_selector: List[sa.Column] = []
all_table_columns: List[str] = metrics.get("table.columns")
unexpected_index_column_names: List[str] = result_format.get(
"unexpected_index_column_names"
)
for column_name in unexpected_index_column_names:
if column_name not in all_table_columns:
raise ge_exceptions.InvalidMetricAccessorDomainKwargsKeyError(
message=f'Error: The unexpected_index_column: "{column_name}" in does not exist in SQL Table. '
f"Please check your configuration and try again."
)
column_selector.append(sa.column(column_name))

domain_records_as_selectable: sa.Selectable = execution_engine.get_domain_records(
domain_kwargs=domain_kwargs
)
unexpected_condition_query_with_selected_columns: sa.select = sa.select(
column_selector
).where(unexpected_condition)

if not MapMetricProvider.is_sqlalchemy_metric_selectable(map_metric_provider=cls):
domain_records_as_selectable: Union[
sa.Table, sa.Select
] = get_sqlalchemy_selectable(domain_records_as_selectable)

# since SQL tables can be **very** large, truncate query_result values at 10, or at `partial_unexpected_count`
final_query: sa.select = (
unexpected_condition_query_with_selected_columns.select_from(
domain_records_as_selectable
).limit(result_format["partial_unexpected_count"])
)
query_result: List[tuple] = execution_engine.engine.execute(final_query).fetchall()

unexpected_index_list: Optional[List[Dict[str, Any]]] = []

for row in query_result:
primary_key_dict: Dict[str, Any] = {}
for index in range(len(unexpected_index_column_names)):
name: str = unexpected_index_column_names[index]
primary_key_dict[name] = row[index]
unexpected_index_list.append(primary_key_dict)

return unexpected_index_list


def _spark_map_condition_unexpected_count_aggregate_fn(
cls,
execution_engine: SparkDFExecutionEngine,
Expand Down Expand Up @@ -2909,6 +3039,24 @@ def _register_metric_functions(cls):
metric_provider=_sqlalchemy_map_condition_rows,
metric_fn_type=MetricFunctionTypes.VALUE,
)
register_metric(
metric_name=f"{metric_name}.unexpected_index_list",
metric_domain_keys=metric_domain_keys,
metric_value_keys=(*metric_value_keys, "result_format"),
execution_engine=engine,
metric_class=cls,
metric_provider=_sqlalchemy_map_condition_index,
metric_fn_type=MetricFunctionTypes.VALUE,
)
register_metric(
metric_name=f"{metric_name}.unexpected_index_query",
metric_domain_keys=metric_domain_keys,
metric_value_keys=(*metric_value_keys, "result_format"),
execution_engine=engine,
metric_class=cls,
metric_provider=_sqlalchemy_map_condition_query,
metric_fn_type=MetricFunctionTypes.VALUE,
)
if metric_fn_type == MetricPartialFunctionTypes.MAP_CONDITION_FN:
if domain_type == MetricDomainTypes.COLUMN:
register_metric(
Expand Down Expand Up @@ -3190,6 +3338,7 @@ def _get_evaluation_dependencies(
for metric_suffix in [
".unexpected_count.aggregate_fn",
".unexpected_value_counts",
".unexpected_index_query",
".unexpected_index_list",
".filtered_row_count",
".unexpected_values",
Expand Down
71 changes: 70 additions & 1 deletion great_expectations/expectations/metrics/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@

try:
import sqlalchemy as sa
from sqlalchemy import Table
from sqlalchemy.dialects import registry
from sqlalchemy.engine import Engine, reflection
from sqlalchemy.engine import Connection, Engine, reflection
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.exc import OperationalError
from sqlalchemy.sql import Insert, Select, TableClause
Expand All @@ -47,6 +48,7 @@
sa = None
registry = None
Engine = None
Connection = None
reflection = None
Dialect = None
Insert = None
Expand Down Expand Up @@ -992,3 +994,70 @@ def is_valid_continuous_partition_object(partition_object):
and np.all(np.diff(partition_object["bins"]) > 0)
and np.allclose(np.sum(comb_weights), 1.0)
)


def sql_statement_with_post_compile_to_string(
engine: SqlAlchemyExecutionEngine, select_statement: "sqlalchemy.sql.Select"
) -> str:
"""
Util method to compile SQL select statement with post-compile parameters into a string. Logic lifted directly
from sqlalchemy documentation.

https://docs.sqlalchemy.org/en/14/faq/sqlexpressions.html#rendering-postcompile-parameters-as-bound-parameters

Used by _sqlalchemy_map_condition_index() in map_metric_provider to build query that will allow you to
return unexpected_index_values.

Args:
engine (sqlalchemy.engine.Engine): Sqlalchemy engine used to do the compilation.
select_statement (sqlalchemy.sql.Select): Select statement to compile into string.
Returns:
String representation of select_statement

"""
sqlalchemy_connection: "sa.engine.base.Connection" = engine.engine
compiled = select_statement.compile(
sqlalchemy_connection,
compile_kwargs={"render_postcompile": True},
dialect=engine.dialect,
)
dialect_name: str = engine.dialect_name

if dialect_name in ["sqlite", "trino", "mssql"]:
params = (repr(compiled.params[name]) for name in compiled.positiontup)
query_as_string = re.sub(r"\?", lambda m: next(params), str(compiled))

else:
params = (repr(compiled.params[name]) for name in list(compiled.params.keys()))
query_as_string = re.sub(r"%\(.*?\)s", lambda m: next(params), str(compiled))

# bigquery inserts extra '`' character for compiled statement.
# clean up string before returning
if dialect_name == "bigquery":
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: adjust test to compare the query from bigquery, rather than adjust the query from bigquery to match the test

query_as_string = re.sub(r"`", "", query_as_string)

return query_as_string
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return SQL query according to backend



def get_sqlalchemy_source_table_and_schema(
engine: SqlAlchemyExecutionEngine,
) -> "Table":
"""
Util method to return table name that is associated with current batch.

This is used by `_sqlalchemy_map_condition_query()` which returns a query that allows users to return
unexpected_index_values.

Args:
engine (SqlAlchemyExecutionEngine): Engine that is currently being used to calculate the Metrics
Returns:
SqlAlchemy Table that is the source table and schema.
"""
schema_name = engine.batch_manager.active_batch_data.source_schema_name
table_name = engine.batch_manager.active_batch_data.source_table_name
selectable = sa.Table(
table_name,
sa.MetaData(),
schema=schema_name,
)
return selectable
Loading