New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MAINTENANCE] Performance improvement refactor for Spark unexpected values #3368
Changes from 10 commits
ec72b07
5ad62bc
778051e
4473092
a784897
66b5ae3
21eea41
a39aa19
abf47b8
a276eab
fea4e71
bd8fc07
8050ce9
522a3f3
fcb22b8
dd56b24
ce2105e
8b48961
2e2ff25
7a72aaa
19f3bd4
7adf3b9
b9bb7cf
099802a
1f9d8d3
93b5d25
80074e9
e1ddfee
341b769
d338d03
51f2005
753e44d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2338,6 +2338,7 @@ def _spark_map_condition_unexpected_count_value( | |
df = execution_engine.get_domain_records( | ||
domain_kwargs=domain_kwargs, | ||
) | ||
# withColumn is required to transform window functions returned by some metrics to boolean mask | ||
data = df.withColumn("__unexpected", unexpected_condition) | ||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
return filtered.count() | ||
|
@@ -2373,17 +2374,9 @@ def _spark_column_map_condition_values( | |
message=f'Error: The column "{column_name}" in BatchData does not exist.' | ||
) | ||
|
||
data = ( | ||
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1)))) | ||
.withColumn("__unexpected", unexpected_condition) | ||
.orderBy(F.col("__row_number")) | ||
) | ||
|
||
filtered = ( | ||
data.filter(F.col("__unexpected") == True) | ||
.drop(F.col("__unexpected")) | ||
.drop(F.col("__row_number")) | ||
) | ||
# withColumn is required to transform window functions returned by some metrics to boolean mask | ||
data = df.withColumn("__unexpected", unexpected_condition) | ||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
|
||
result_format = metric_value_kwargs["result_format"] | ||
if result_format["result_format"] == "COMPLETE": | ||
|
@@ -2411,7 +2404,10 @@ def _spark_column_map_condition_value_counts( | |
df = execution_engine.get_domain_records( | ||
domain_kwargs=compute_domain_kwargs, | ||
) | ||
|
||
# withColumn is required to transform window functions returned by some metrics to boolean mask | ||
data = df.withColumn("__unexpected", unexpected_condition) | ||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
|
||
if "column" not in accessor_domain_kwargs: | ||
raise ValueError( | ||
|
@@ -2429,7 +2425,6 @@ def _spark_column_map_condition_value_counts( | |
|
||
result_format = metric_value_kwargs["result_format"] | ||
|
||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
value_counts = filtered.groupBy(F.col(column_name)).count() | ||
if result_format["result_format"] == "COMPLETE": | ||
rows = value_counts.collect() | ||
|
@@ -2458,17 +2453,9 @@ def _spark_map_condition_rows( | |
domain_kwargs=domain_kwargs, | ||
) | ||
|
||
data = ( | ||
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1)))) | ||
.withColumn("__unexpected", unexpected_condition) | ||
.orderBy(F.col("__row_number")) | ||
) | ||
|
||
filtered = ( | ||
data.filter(F.col("__unexpected") == True) | ||
.drop(F.col("__unexpected")) | ||
.drop(F.col("__row_number")) | ||
) | ||
# withColumn is required to transform window functions returned by some metrics to boolean mask | ||
data = df.withColumn("__unexpected", unexpected_condition) | ||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
|
||
result_format = metric_value_kwargs["result_format"] | ||
|
||
|
@@ -2514,17 +2501,7 @@ def _spark_column_pair_map_condition_values( | |
message=f'Error: The column "{column_name}" in BatchData does not exist.' | ||
) | ||
|
||
data = ( | ||
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1)))) | ||
.withColumn("__unexpected", boolean_mapped_unexpected_values) | ||
.orderBy(F.col("__row_number")) | ||
) | ||
|
||
filtered = ( | ||
data.filter(F.col("__unexpected") == True) | ||
.drop(F.col("__unexpected")) | ||
.drop(F.col("__row_number")) | ||
) | ||
filtered = df.filter(boolean_mapped_unexpected_values) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer Can we please preserve the pattern for having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comments about that pattern being measurably slower. We can discuss the tradeoffs in our face-to-face testing discussion. |
||
|
||
result_format = metric_value_kwargs["result_format"] | ||
if result_format["result_format"] == "COMPLETE": | ||
|
@@ -2585,7 +2562,7 @@ def _spark_multicolumn_map_condition_values( | |
): | ||
"""Return values from the specified domain that match the map-style metric in the metrics dictionary.""" | ||
( | ||
boolean_mapped_unexpected_values, | ||
unexpected_condition, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer Could we place keep this variable named There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason for changing this was that metrics are not returning booleans in all cases. Cases that return a window function would fail for the 1-line solution There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer I cannot find all the line numbers involved (the UI here is confusing, but I follow your logic). Thank you! P.S.: Should we standardize all cases to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alexsherstinsky If we were to standardize all cases to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer Perhaps we need to clean up our code? Can you please help us out here -- I see in Pandas, SQL, and Spark the pattern that in some cases defines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alexsherstinsky It is correct that the metric always returns an
to:
for consistency in this single case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer Thanks -- at least it is consistent, and we can revise later. We now need to solve the remaining problem: the equivalency of results, when the sort order is not enforced. If we can do this, then it is a big gain! |
||
compute_domain_kwargs, | ||
accessor_domain_kwargs, | ||
) = metrics["unexpected_condition"] | ||
|
@@ -2613,17 +2590,9 @@ def _spark_multicolumn_map_condition_values( | |
message=f'Error: The column "{column_name}" in BatchData does not exist.' | ||
) | ||
|
||
data = ( | ||
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1)))) | ||
.withColumn("__unexpected", boolean_mapped_unexpected_values) | ||
.orderBy(F.col("__row_number")) | ||
) | ||
|
||
filtered = ( | ||
data.filter(F.col("__unexpected") == True) | ||
.drop(F.col("__unexpected")) | ||
.drop(F.col("__row_number")) | ||
) | ||
# withColumn is required to transform window functions returned by some metrics to boolean mask | ||
data = df.withColumn("__unexpected", unexpected_condition) | ||
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected")) | ||
|
||
column_selector = [F.col(column_name) for column_name in column_list] | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,6 +10,7 @@ | |
from functools import wraps | ||
from types import ModuleType | ||
from typing import Dict, List, Optional, Union | ||
from operator import itemgetter | ||
|
||
import numpy as np | ||
import pandas as pd | ||
|
@@ -1930,6 +1931,13 @@ def check_json_test_result(test, result, data_asset=None): | |
elif key == "unexpected_list": | ||
# check if value can be sorted; if so, sort so arbitrary ordering of results does not cause failure | ||
if (isinstance(value, list)) & (len(value) >= 1): | ||
# dictionary handling isn't implemented in great_expectations.core.data_context_key.__lt__ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer I ❤️ the idea here; I have two questions about it:
Thank you! I love how this idea absolves us from the need for the computationally-prohibitive There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I clarified the lt comment (it is actually looking at the built-in Python class that is passed). I also changed |
||
# but values still need to be sorted since spark metrics return unordered | ||
if isinstance(value[0], dict): | ||
value = sorted(value, key=itemgetter(*list(value[0].keys()))) | ||
result["result"]["unexpected_list"] = sorted( | ||
result["result"]["unexpected_list"], key=itemgetter(*list(value[0].keys())) | ||
) | ||
if type(value[0].__lt__(value[0])) != type(NotImplemented): | ||
value = sorted(value, key=lambda x: str(x)) | ||
result["result"]["unexpected_list"] = sorted( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -213,7 +213,7 @@ | |
},{ | ||
"title": "unexpected_values_exact_match_out_without_unexpected_index_list", | ||
"exact_match_out" : true, | ||
"suppress_test_for": ["pandas"], | ||
"only_for": ["sqlalchemy"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer I feel that we must make it (or an equivalent, additional test work for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In self_check/util.py#L1882 when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: There is a test above this one for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @NathanFarmer I feel that we should discuss this in order to find a solution for testing the functionality for the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alexsherstinsky I was able to make changes to |
||
"in": { | ||
"column_list": ["a", "b"] | ||
}, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NathanFarmer I believe that we can leave the
filtered =
statement where it was (because there could be an exception raised earlier, so no need to have it before). Would you agree or not? Thanks.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alexsherstinsky Sure, we can leave this line where it is, but then I would advocate to move lines 2408-2409 down below as well for consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@NathanFarmer The way I am seeing it now seems good. Thank you.