Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34796][SQL] Initialize counter variable for LIMIT code-gen in doProduce() #31892

Closed
wants to merge 1 commit into from

Conversation

c21
Copy link
Contributor

@c21 c21 commented Mar 19, 2021

What changes were proposed in this pull request?

This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from BaseLimitExec is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. ColumnarToRowExec operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling BaseLimitExec's doConsume(), e.g. HashJoin.codegenInner. So if we have query that LocalLimit - BroadcastHashJoin - FileScan in the same stage, the whole stage code-gen compilation will be failed.

Here is an example:

  test("failed limit query") {
    withTable("left_table", "empty_right_table", "output_table") {
      spark.range(5).toDF("k").write.saveAsTable("left_table")
      spark.range(0).toDF("k").write.saveAsTable("empty_right_table")
         
      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
        spark.sql("CREATE TABLE output_table (k INT) USING parquet")
        spark.sql(
          s"""
             |INSERT INTO TABLE output_table
             |SELECT t1.k FROM left_table t1
             |JOIN empty_right_table t2
             |ON t1.k = t2.k
             |LIMIT 3
             |""".stripMargin)
      }
    }
  }

Query plan:

Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable(
Database: default
Table: output_table
Created Time: Thu Mar 18 21:46:26 PDT 2021
Last Access: UNKNOWN
Created By: Spark 3.2.0-SNAPSHOT
Type: MANAGED
Provider: parquet
Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table
Schema: root
 |-- k: integer (nullable = true)
), org.apache.spark.sql.execution.datasources.InMemoryFileIndex@b25d08b, [k]
+- *(3) Project [ansi_cast(k#228L as int) AS k#231]
   +- *(3) GlobalLimit 3
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179]
         +- *(2) LocalLimit 3
            +- *(2) Project [k#228L]
               +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false
                  :- *(2) Filter isnotnull(k#228L)
                  :  +- *(2) ColumnarToRow
                  :     +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>
                  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173]
                     +- *(1) Filter isnotnull(k#229L)
                        +- *(1) ColumnarToRow
                           +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:bigint>

Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 .

The uninitialized variable _limit_counter_1 from LocalLimitExec is referenced in ColumnarToRowExec, but BroadcastHashJoinExec does not call LocalLimitExec.doConsume() to initialize the counter variable.

The fix is to move the counter variable initialization to doProduce(), as in whole stage code-gen framework, doProduce() will definitely be called if upstream operators doProduce()/doConsume() is called.

Note: this only happens in AQE disabled case, because we have an AQE optimization rule EliminateUnnecessaryJoin to change the whole query to an empty LocalRelation if inner join broadcast side is empty with AQE enabled.

Why are the changes needed?

Fix query failure.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added unit test in SQLQuerySuite.scala.

@c21
Copy link
Contributor Author

c21 commented Mar 19, 2021

cc @cloud-fan, @maropu and @HyukjinKwon to take a look if you have time, thanks.

@maropu
Copy link
Member

maropu commented Mar 19, 2021

How about branch-3.0 and branch-2.4? They have this issue, too?

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/40818/

@AmplabJenkins
Copy link

Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136236/

@c21
Copy link
Contributor Author

c21 commented Mar 19, 2021

How about branch-3.0 and branch-2.4? They have this issue, too?

@maropu - Based on my knowledge I don't think so. The shortcut for HashJoin (not calling parent's doConsume()) was added in 3.1 - 08b951b . I am not aware of any other operator's shortcut execution.

@maropu
Copy link
Member

maropu commented Mar 19, 2021

@maropu - Based on my knowledge I don't think so. The shortcut for HashJoin (not calling parent's doConsume()) was added in 3.1 - 08b951b . I am not aware of any other operator's shortcut execution.

Ah, I see. It looks interesting. I thought that doConsume would always be called from a upper node, but now that assumption is wrong.

@github-actions github-actions bot added the SQL label Mar 19, 2021
@maropu maropu closed this in 2ff0032 Mar 20, 2021
@maropu
Copy link
Member

maropu commented Mar 20, 2021

Thanks! Merged to master. The commit has a conflict with branch-3.1, so could you open a backport PR for that?

@c21
Copy link
Contributor Author

c21 commented Mar 20, 2021

Thank you @maropu and @cloud-fan for review. Will submit a PR shortly for branch 3.1. Thanks.

@c21 c21 deleted the limit-fix branch March 20, 2021 03:49
c21 added a commit to c21/spark that referenced this pull request Mar 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants