Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE] Performance improvement refactor for Spark unexpected values #3368

Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
ec72b07
[MAINTENANCE] Performance improvement refactor for helper _spark_colu…
Sep 8, 2021
5ad62bc
Merge branch 'develop' into working-branch/DEVREL-154/improve_SparkDF…
NathanFarmer Sep 8, 2021
778051e
Merge branch 'develop' into working-branch/DEVREL-154/improve_SparkDF…
NathanFarmer Sep 16, 2021
4473092
[MAINTENANCE] Performance improvement refactor for helper _spark_colu…
Sep 16, 2021
a784897
Merge branch 'working-branch/DEVREL-154/improve_SparkDFExecutionEngin…
Sep 16, 2021
66b5ae3
Change log
Sep 16, 2021
21eea41
[MAINTENANCE] This test no longer applies to spark because we stopped…
Sep 17, 2021
a39aa19
[MAINTENANCE] Remove all sorting logic from spark provider helpers (#…
Sep 17, 2021
abf47b8
[MAINTENANCE] Sort dictionaries in tests for comparisons (#3368).
Sep 17, 2021
a276eab
Merge branch 'develop' into working-branch/DEVREL-154/improve_SparkDF…
NathanFarmer Sep 17, 2021
fea4e71
Linting
Sep 17, 2021
bd8fc07
Merge branch 'working-branch/DEVREL-154/improve_SparkDFExecutionEngin…
Sep 17, 2021
8050ce9
Clean up
Sep 17, 2021
522a3f3
[MAINTENANCE] Incorrect source of __lt__ in comment (#3368).
Sep 17, 2021
fcb22b8
[MAINTENANCE] Clarify how sorting works for each data type (#3368).
Sep 17, 2021
dd56b24
[MAINTENANCE] Lambda instead of itemgetter for consistency/simplicity…
Sep 17, 2021
ce2105e
Linting
Sep 17, 2021
8b48961
Accidentally re-used variable name
Sep 17, 2021
2e2ff25
Linting
Sep 17, 2021
7a72aaa
[MAINTENANCE] Change final use of boolean_mapped_unexpected_values to…
Sep 21, 2021
19f3bd4
[MAINTENANCE] Helper function for sorting unexpected_values during te…
Sep 21, 2021
7adf3b9
[MAINTENANCE] When exact_match_out is True we still need to sort unex…
Sep 21, 2021
b9bb7cf
[MAINTENANCE] Moved sort logic into helper function (#3368).
Sep 21, 2021
099802a
Cleanup
Sep 21, 2021
1f9d8d3
[MAINTENANCE] Sort should also be applied to partial_unexpected_list …
Sep 21, 2021
93b5d25
[MAINTENANCE] Revert broken test back to its original state (#3368).
Sep 21, 2021
80074e9
Linting
Sep 21, 2021
e1ddfee
Merge branch 'develop' into working-branch/DEVREL-154/improve_SparkDF…
NathanFarmer Sep 21, 2021
341b769
[MAINTENANCE] Consolidate sorting to make it clear that we do it whet…
Sep 21, 2021
d338d03
Merge branch 'working-branch/DEVREL-154/improve_SparkDFExecutionEngin…
Sep 21, 2021
51f2005
Linting
Sep 21, 2021
753e44d
Merge branch 'develop' into working-branch/DEVREL-154/improve_SparkDF…
NathanFarmer Sep 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs_rtd/changelog.rst
Expand Up @@ -7,6 +7,7 @@ Changelog

develop
-----------------
* [MAINTENANCE] Spark performance improvement for metrics that return unexpected values (#3368)

0.13.34
-----------------
Expand Down Expand Up @@ -64,7 +65,6 @@ develop
* [MAINTENANCE] Tests for RuntimeDataConnector at Datasource-level (Spark and Pandas) (#3318)
* [MAINTENANCE] Various doc patches (#3326)
* [MAINTENANCE] clean up imports and method signatures (#3337)
>>>>>>> 9208de453238af6d673aa9184c865b8422165172

0.13.31
-----------------
Expand Down
69 changes: 21 additions & 48 deletions great_expectations/expectations/metrics/map_metric_provider.py
Expand Up @@ -2338,8 +2338,11 @@ def _spark_map_condition_unexpected_count_value(
df = execution_engine.get_domain_records(
domain_kwargs=domain_kwargs,
)

# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

return filtered.count()


Expand Down Expand Up @@ -2373,17 +2376,9 @@ def _spark_column_map_condition_values(
message=f'Error: The column "{column_name}" in BatchData does not exist.'
)

data = (
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1))))
.withColumn("__unexpected", unexpected_condition)
.orderBy(F.col("__row_number"))
)

filtered = (
data.filter(F.col("__unexpected") == True)
.drop(F.col("__unexpected"))
.drop(F.col("__row_number"))
)
# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] == "COMPLETE":
Expand Down Expand Up @@ -2411,7 +2406,6 @@ def _spark_column_map_condition_value_counts(
df = execution_engine.get_domain_records(
domain_kwargs=compute_domain_kwargs,
)
data = df.withColumn("__unexpected", unexpected_condition)

if "column" not in accessor_domain_kwargs:
raise ValueError(
Expand All @@ -2427,9 +2421,12 @@ def _spark_column_map_condition_value_counts(
message=f'Error: The column "{column_name}" in BatchData does not exist.'
)

# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

result_format = metric_value_kwargs["result_format"]

filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer I believe that we can leave the filtered = statement where it was (because there could be an exception raised earlier, so no need to have it before). Would you agree or not? Thanks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexsherstinsky Sure, we can leave this line where it is, but then I would advocate to move lines 2408-2409 down below as well for consistency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer The way I am seeing it now seems good. Thank you.

value_counts = filtered.groupBy(F.col(column_name)).count()
if result_format["result_format"] == "COMPLETE":
rows = value_counts.collect()
Expand Down Expand Up @@ -2458,17 +2455,9 @@ def _spark_map_condition_rows(
domain_kwargs=domain_kwargs,
)

data = (
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1))))
.withColumn("__unexpected", unexpected_condition)
.orderBy(F.col("__row_number"))
)

filtered = (
data.filter(F.col("__unexpected") == True)
.drop(F.col("__unexpected"))
.drop(F.col("__row_number"))
)
# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

result_format = metric_value_kwargs["result_format"]

Expand All @@ -2488,7 +2477,7 @@ def _spark_column_pair_map_condition_values(
):
"""Return values from the specified domain that match the map-style metric in the metrics dictionary."""
(
boolean_mapped_unexpected_values,
unexpected_condition,
compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics["unexpected_condition"]
Expand All @@ -2514,17 +2503,9 @@ def _spark_column_pair_map_condition_values(
message=f'Error: The column "{column_name}" in BatchData does not exist.'
)

data = (
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1))))
.withColumn("__unexpected", boolean_mapped_unexpected_values)
.orderBy(F.col("__row_number"))
)

filtered = (
data.filter(F.col("__unexpected") == True)
.drop(F.col("__unexpected"))
.drop(F.col("__row_number"))
)
# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

result_format = metric_value_kwargs["result_format"]
if result_format["result_format"] == "COMPLETE":
Expand Down Expand Up @@ -2585,7 +2566,7 @@ def _spark_multicolumn_map_condition_values(
):
"""Return values from the specified domain that match the map-style metric in the metrics dictionary."""
(
boolean_mapped_unexpected_values,
unexpected_condition,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer Could we place keep this variable named boolean_mapped_unexpected_values as it was before (I was following the previous style in such methods across all execution engines and would like to keep it consistent, unless you have a strong reason for changing it now). Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for changing this was that metrics are not returning booleans in all cases. Cases that return a window function would fail for the 1-line solution filtered = df.filter(boolean_mapped_unexpected_values). The 2-line solution using withColumn creates a new boolean mapped column from this variable. You can see how I left it alone on line 2504 because it actually is a boolean mapping for all cases there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer I cannot find all the line numbers involved (the UI here is confusing, but I follow your logic). Thank you! P.S.: Should we standardize all cases to use unexpected_condition? Or do we want to preserve a special spot and variable name for the situations where it will always be strictly-boolean mapped? Thoughts welcome. Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexsherstinsky If we were to standardize all cases to use unexpected_condition that would future-proof metrics that return a window function. The tradeoff is that using boolean_mapped_unexpected_values with df.filter (1-line solution) is measurably faster where it is possible. I would propose that we leave boolean_mapped_unexpected_values as-is in places where it is not currently needed, and if a new metric requires it to be changed, it can be done at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer Perhaps we need to clean up our code? Can you please help us out here -- I see in Pandas, SQL, and Spark the pattern that in some cases defines unexpected_condition and in other cases defines boolean_mapped_unexpected_values as variable names, but ultimately these variables are used the same way: in a WHERE type clause. So are we simply misnaming this variable in one case for spark, because it is truly an (unexpected) condition and not merely boolean-mapped (unexpected) values? If this is correct, then please go ahead and fix as you deem appropriate. Thanks so much!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexsherstinsky It is correct that the metric always returns an unexpected_condition for Spark. So for semantic correctness, I have changed the one remaining use of Spark boolean_mapped_unexpected_values to unexpected_condition. In most Spark cases unexpected_condition can be passed directly to a WHERE clause, except in the case of a window function. I went ahead and changed:

    data = df.filter(unexpected_condition)

to:

    data = df.withColumn("__unexpected", unexpected_condition)
    filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

for consistency in this single case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer Thanks -- at least it is consistent, and we can revise later. We now need to solve the remaining problem: the equivalency of results, when the sort order is not enforced. If we can do this, then it is a big gain!

compute_domain_kwargs,
accessor_domain_kwargs,
) = metrics["unexpected_condition"]
Expand Down Expand Up @@ -2613,17 +2594,9 @@ def _spark_multicolumn_map_condition_values(
message=f'Error: The column "{column_name}" in BatchData does not exist.'
)

data = (
df.withColumn("__row_number", F.row_number().over(Window.orderBy(F.lit(1))))
.withColumn("__unexpected", boolean_mapped_unexpected_values)
.orderBy(F.col("__row_number"))
)

filtered = (
data.filter(F.col("__unexpected") == True)
.drop(F.col("__unexpected"))
.drop(F.col("__row_number"))
)
# withColumn is required to transform window functions returned by some metrics to boolean mask
data = df.withColumn("__unexpected", unexpected_condition)
filtered = data.filter(F.col("__unexpected") == True).drop(F.col("__unexpected"))

column_selector = [F.col(column_name) for column_name in column_list]

Expand Down
76 changes: 68 additions & 8 deletions great_expectations/self_check/util.py
Expand Up @@ -1766,6 +1766,30 @@ def generate_expectation_tests(
return parametrized_tests


def sort_unexpected_values(test_value_list, result_value_list):
# check if value can be sorted; if so, sort so arbitrary ordering of results does not cause failure
if (isinstance(test_value_list, list)) & (len(test_value_list) >= 1):
# __lt__ is not implemented for python dictionaries making sorting trickier
# in our case, we will sort on the values for each key sequentially
if isinstance(test_value_list[0], dict):
test_value_list = sorted(
test_value_list,
key=lambda x: tuple(x[k] for k in list(test_value_list[0].keys())),
)
result_value_list = sorted(
result_value_list,
key=lambda x: tuple(x[k] for k in list(test_value_list[0].keys())),
)
# if python built-in class has __lt__ then sorting can always work this way
elif type(test_value_list[0].__lt__(test_value_list[0])) != type(
NotImplemented
):
test_value_list = sorted(test_value_list, key=lambda x: str(x))
result_value_list = sorted(result_value_list, key=lambda x: str(x))

return test_value_list, result_value_list


def evaluate_json_test(data_asset, expectation_type, test):
"""
This method will evaluate the result of a test build using the Great Expectations json test format.
Expand Down Expand Up @@ -1879,7 +1903,32 @@ def evaluate_json_test_cfe(validator, expectation_type, test):

def check_json_test_result(test, result, data_asset=None):
# Check results
if test["exact_match_out"] is True:
# For Spark we cannot guarantee the order in which values are returned, so we sort for testing purposes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NathanFarmer Naive thought: Why not sort for all backends? I do not see detriment in pre-sorting in a well-defined order these unexpected values for all backends indiscriminately. What do you think? Thanks!

if (test["exact_match_out"] is True) and isinstance(
data_asset, (SparkDFDataset, SparkDFBatchData)
):
if ("unexpected_list" in result["result"]) and (
"unexpected_list" in test["out"]["result"]
):
(
test["out"]["result"]["unexpected_list"],
result["result"]["unexpected_list"],
) = sort_unexpected_values(
test["out"]["result"]["unexpected_list"],
result["result"]["unexpected_list"],
)
if ("partial_unexpected_list" in result["result"]) and (
"partial_unexpected_list" in test["out"]["result"]
):
(
test["out"]["result"]["partial_unexpected_list"],
result["result"]["partial_unexpected_list"],
) = sort_unexpected_values(
test["out"]["result"]["partial_unexpected_list"],
result["result"]["partial_unexpected_list"],
)
assert result == expectationValidationResultSchema.load(test["out"])
elif test["exact_match_out"] is True:
assert result == expectationValidationResultSchema.load(test["out"])
else:
# Convert result to json since our tests are reading from json so cannot easily contain richer types (e.g. NaN)
Expand Down Expand Up @@ -1928,13 +1977,9 @@ def check_json_test_result(test, result, data_asset=None):
assert result["result"]["unexpected_index_list"] == value

elif key == "unexpected_list":
# check if value can be sorted; if so, sort so arbitrary ordering of results does not cause failure
if (isinstance(value, list)) & (len(value) >= 1):
if type(value[0].__lt__(value[0])) != type(NotImplemented):
value = sorted(value, key=lambda x: str(x))
result["result"]["unexpected_list"] = sorted(
result["result"]["unexpected_list"], key=lambda x: str(x)
)
value, result["result"]["unexpected_list"] = sort_unexpected_values(
value, result["result"]["unexpected_list"]
)

assert result["result"]["unexpected_list"] == value, (
"expected "
Expand All @@ -1943,6 +1988,21 @@ def check_json_test_result(test, result, data_asset=None):
+ str(result["result"]["unexpected_list"])
)

elif key == "partial_unexpected_list":
(
value,
result["result"]["partial_unexpected_list"],
) = sort_unexpected_values(
value, result["result"]["partial_unexpected_list"]
)

assert result["result"]["partial_unexpected_list"] == value, (
"expected "
+ str(value)
+ " but got "
+ str(result["result"]["partial_unexpected_list"])
)

elif key == "details":
assert result["result"]["details"] == value

Expand Down