Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_dpp_bypass / test_dpp_via_aggregate_subquery failures in CI Databricks 13.3 #10548

Closed
NvTimLiu opened this issue Mar 5, 2024 · 3 comments · Fixed by #10551
Closed
Labels
bug Something isn't working

Comments

@NvTimLiu
Copy link
Collaborator

NvTimLiu commented Mar 5, 2024

Describe the bug

[24.02] test_dpp_bypass / test_dpp_via_aggregate_subquery failures in CI Databricks 13.3

 FAILED ../../src/main/python/dpp_test.py::test_dpp_bypass[true-5-parquet][DATAGEN_SEED=1709607357, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
 FAILED ../../src/main/python/dpp_test.py::test_dpp_bypass[true-5-orc][DATAGEN_SEED=1709607357, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
 FAILED ../../src/main/python/dpp_test.py::test_dpp_via_aggregate_subquery[true-5-parquet][DATAGEN_SEED=1709607357, INJECT_OOM, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...
 FAILED ../../src/main/python/dpp_test.py::test_dpp_via_aggregate_subquery[true-5-orc][DATAGEN_SEED=1709607357, IGNORE_ORDER] - py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.s...


-----------------------------------------------------------------------------------------------------------------------------------


----------------------------- Captured stdout call -----------------------------
### CPU RUN ###
### GPU RUN ###
_________________ test_dpp_via_aggregate_subquery[true-5-orc] __________________
[gw1] linux -- Python 3.10.12 /usr/bin/python

spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7ff1e5f22d40>
store_format = 'orc', s_index = 5, aqe_enabled = 'true'

    @ignore_order
    @pytest.mark.parametrize('store_format', ['parquet', 'orc'], ids=idfn)
    @pytest.mark.parametrize('s_index', list(range(len(_statements))), ids=idfn)
    @pytest.mark.parametrize('aqe_enabled', [
        'false',
        pytest.param('true', marks=pytest.mark.skipif(is_before_spark_320() and not is_databricks_runtime(),
                                                      reason='Only in Spark 3.2.0+ AQE and DPP can be both enabled'))
    ], ids=idfn)
    def test_dpp_via_aggregate_subquery(spark_tmp_table_factory, store_format, s_index, aqe_enabled):
        fact_table, dim_table = spark_tmp_table_factory.get(), spark_tmp_table_factory.get()
        create_fact_table(fact_table, store_format)
        filter_val = create_dim_table(dim_table, store_format)
        statement = _statements[s_index].format(fact_table, dim_table, filter_val)
>       assert_cpu_and_gpu_are_equal_collect_with_capture(
            lambda spark: spark.sql(statement),
            # SubqueryExec appears if we plan extra subquery for DPP
            exist_classes='DynamicPruningExpression,SubqueryExec',
            conf=dict(_no_exchange_reuse_conf + [('spark.sql.adaptive.enabled', aqe_enabled)]))

../../src/main/python/dpp_test.py:255: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../src/main/python/asserts.py:410: in assert_cpu_and_gpu_are_equal_collect_with_capture
    jvm.org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(gpu_df._jdf, clz)
/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
/databricks/spark/python/pyspark/errors/exceptions/captured.py:188: in deco
    return f(*a, **kw)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

answer = 'xro342590'
gateway_client = <py4j.clientserver.JavaClient object at 0x7ff2000e5b70>
target_id = 'z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback'
name = 'assertContains'

    def get_return_value(answer, gateway_client, target_id=None, name=None):
        """Converts an answer received from the Java gateway into a Python object.
    
        For example, string representation of integers are converted to Python
        integer, string representation of objects are converted to JavaObject
        instances, etc.
    
        :param answer: the string returned by the Java gateway
        :param gateway_client: the gateway client used to communicate with the Java
            Gateway. Only necessary if the answer is a reference (e.g., object,
            list, map)
        :param target_id: the name of the object from which the answer comes from
            (e.g., *object1* in `object1.hello()`). Optional.
        :param name: the name of the member from which the answer comes from
            (e.g., *hello* in `object1.hello()`). Optional.
        """
        if is_error(answer)[0]:
            if len(answer) > 1:
                type = answer[1]
                value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
                if answer[1] == REFERENCE_TYPE:
>                   raise Py4JJavaError(
                        "An error occurred while calling {0}{1}{2}.\n".
                        format(target_id, ".", name), value)
E                   py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains.
E                   : java.lang.AssertionError: assertion failed: Could not find DynamicPruningExpression in the Spark plan
E                   AdaptiveSparkPlan isFinalPlan=true
E                   +- == Final Plan ==
E                      ResultQueryStage 4, Statistics(sizeInBytes=0.0 B, ColumnStat: N/A)
E                      +- LocalTableScan <empty>, [key#119441, max(value)#119457L]
E                   +- == Initial Plan ==
E                      Sort [key#119441 ASC NULLS FIRST, max(value)#119457L ASC NULLS FIRST], true, 0
E                      +- Exchange rangepartitioning(key#119441 ASC NULLS FIRST, max(value)#119457L ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [plan_id=208239]
E                         +- HashAggregate(keys=[key#119441], functions=[finalmerge_max(merge max#119507L) AS max(value#119442L)#119456L], output=[key#119441, max(value)#119457L])
E                            +- Exchange hashpartitioning(key#119441, 4), ENSURE_REQUIREMENTS, [plan_id=208237]
E                               +- HashAggregate(keys=[key#119441], functions=[partial_max(value#119442L) AS max#119507L], output=[key#119441, max#119507L])
E                                  +- Union
E                                     :- Project [key#119350 AS key#119441, value#119446L AS value#119442L]
E                                     :  +- BroadcastHashJoin [key#119350], [key#119352], Inner, BuildRight, false
E                                     :     :- HashAggregate(keys=[key#119350], functions=[finalmerge_sum(merge sum#119509L) AS sum(value#119349)#119450L], output=[key#119350, value#119446L])
E                                     :     :  +- Exchange hashpartitioning(key#119350, 4), ENSURE_REQUIREMENTS, [plan_id=208223]
E                                     :     :     +- HashAggregate(keys=[key#119350], functions=[partial_sum(value#119349) AS sum#119509L], output=[key#119350, sum#119509L])
E                                     :     :        +- Project [value#119349, key#119350]
E                                     :     :           +- Filter (isnotnull(value#119349) AND (value#119349 > 0))
E                                     :     :              +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_0[value#119349,key#119350,skey#119351] Batched: true, DataFilters: [isnotnull(value#119349), (value#119349 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-20240..., PartitionFilters: [isnotnull(key#119350), dynamicpruning#119503 119502], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
E                                     :     +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=208202]
E                                     :        +- Project [key#119352]
E                                     :           +- Filter ((((isnotnull(ex_key#119354) AND isnotnull(filter#119356)) AND (ex_key#119354 = 3)) AND (filter#119356 = 451)) AND isnotnull(key#119352))
E                                     :              +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_1[key#119352,ex_key#119354,filter#119356] Batched: true, DataFilters: [isnotnull(ex_key#119354), isnotnull(filter#119356), (ex_key#119354 = 3), (filter#119356 = 451), ..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-202403..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,451), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
E                                     +- Project [key#119490, value#119493L]
E                                        +- BroadcastHashJoin [key#119490], [key#119494], Inner, BuildRight, false
E                                           :- HashAggregate(keys=[key#119490], functions=[finalmerge_sum(merge sum#119511L) AS sum(value#119489)#119450L], output=[key#119490, value#119493L])
E                                           :  +- Exchange hashpartitioning(key#119490, 4), ENSURE_REQUIREMENTS, [plan_id=208231]
E                                           :     +- HashAggregate(keys=[key#119490], functions=[partial_sum(value#119489) AS sum#119511L], output=[key#119490, sum#119511L])
E                                           :        +- Project [value#119489, key#119490]
E                                           :           +- Filter (isnotnull(value#119489) AND (value#119489 > 0))
E                                           :              +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_0[value#119489,key#119490,skey#119491] Batched: true, DataFilters: [isnotnull(value#119489), (value#119489 > 0)], Format: ORC, Location: InMemoryFileIndex(50 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-20240..., PartitionFilters: [isnotnull(key#119490), dynamicpruning#119505 119504], PushedFilters: [IsNotNull(value), GreaterThan(value,0)], ReadSchema: struct<value:int>
E                                           +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=208208]
E                                              +- Project [key#119494]
E                                                 +- Filter ((((isnotnull(ex_key#119496) AND isnotnull(filter#119498)) AND (ex_key#119496 = 3)) AND (filter#119498 = 451)) AND isnotnull(key#119494))
E                                                    +- FileScan orc spark_catalog.default.tmp_table_gw1_142524449_1[key#119494,ex_key#119496,filter#119498] Batched: true, DataFilters: [isnotnull(ex_key#119496), isnotnull(filter#119498), (ex_key#119496 = 3), (filter#119498 = 451), ..., Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/home/ubuntu/spark-rapids/integration_tests/target/run_dir-202403..., PartitionFilters: [], PushedFilters: [IsNotNull(ex_key), IsNotNull(filter), EqualTo(ex_key,3), EqualTo(filter,451), IsNotNull(key)], ReadSchema: struct<key:int,ex_key:int,filter:int>
E                   
E                       at scala.Predef$.assert(Predef.scala:223)
E                       at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:170)
E                       at org.apache.spark.sql.rapids.ShimmedExecutionPlanCaptureCallbackImpl.assertContains(ShimmedExecutionPlanCaptureCallbackImpl.scala:175)
E                       at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback$.assertContains(ExecutionPlanCaptureCallback.scala:76)
E                       at org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertContains(ExecutionPlanCaptureCallback.scala)
E                       at sun.reflect.GeneratedMethodAccessor422.invoke(Unknown Source)
E                       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E                       at java.lang.reflect.Method.invoke(Method.java:498)
E                       at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E                       at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:397)
E                       at py4j.Gateway.invoke(Gateway.java:306)
E                       at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E                       at py4j.commands.CallCommand.execute(CallCommand.java:79)
E                       at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
E                       at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
E                       at java.lang.Thread.run(Thread.java:750)

/databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:326: Py4JJavaError
----------------------------- Captured stdout call -----------------------------
@NvTimLiu NvTimLiu added bug Something isn't working ? - Needs Triage Need team to review and classify labels Mar 5, 2024
@NvTimLiu
Copy link
Collaborator Author

NvTimLiu commented Mar 5, 2024

similar to #10182

@NVnavkumar
Copy link
Collaborator

Same failure occurs in Apache Spark 3.2.4

@NvTimLiu
Copy link
Collaborator Author

NvTimLiu commented Mar 6, 2024

Close as #10551 was merged

@NvTimLiu NvTimLiu closed this as completed Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants