From 2ff0032e018abc3f799357a222c31fce745e8b4b Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sat, 20 Mar 2021 11:20:52 +0900 Subject: [PATCH] [SPARK-34796][SQL] Initialize counter variable for LIMIT code-gen in doProduce() ### What changes were proposed in this pull request? This PR is to fix the LIMIT code-gen bug in https://issues.apache.org/jira/browse/SPARK-34796, where the counter variable from `BaseLimitExec` is not initialized but used in code-gen. This is because the limit counter variable will be used in upstream operators (LIMIT's child plan, e.g. `ColumnarToRowExec` operator for early termination), but in the same stage, there can be some operators doing the shortcut and not calling `BaseLimitExec`'s `doConsume()`, e.g. [HashJoin.codegenInner](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L402). So if we have query that `LocalLimit - BroadcastHashJoin - FileScan` in the same stage, the whole stage code-gen compilation will be failed. Here is an example: ``` test("failed limit query") { withTable("left_table", "empty_right_table", "output_table") { spark.range(5).toDF("k").write.saveAsTable("left_table") spark.range(0).toDF("k").write.saveAsTable("empty_right_table") withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { spark.sql("CREATE TABLE output_table (k INT) USING parquet") spark.sql( s""" |INSERT INTO TABLE output_table |SELECT t1.k FROM left_table t1 |JOIN empty_right_table t2 |ON t1.k = t2.k |LIMIT 3 |""".stripMargin) } } } ``` Query plan: ``` Execute InsertIntoHadoopFsRelationCommand file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table, false, Parquet, Map(path -> file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table), Append, CatalogTable( Database: default Table: output_table Created Time: Thu Mar 18 21:46:26 PDT 2021 Last Access: UNKNOWN Created By: Spark 3.2.0-SNAPSHOT Type: MANAGED Provider: parquet Location: file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sql.SQLQuerySuite/output_table Schema: root |-- k: integer (nullable = true) ), org.apache.spark.sql.execution.datasources.InMemoryFileIndexb25d08b, [k] +- *(3) Project [ansi_cast(k#228L as int) AS k#231] +- *(3) GlobalLimit 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#179] +- *(2) LocalLimit 3 +- *(2) Project [k#228L] +- *(2) BroadcastHashJoin [k#228L], [k#229L], Inner, BuildRight, false :- *(2) Filter isnotnull(k#228L) : +- *(2) ColumnarToRow : +- FileScan parquet default.left_table[k#228L] Batched: true, DataFilters: [isnotnull(k#228L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#173] +- *(1) Filter isnotnull(k#229L) +- *(1) ColumnarToRow +- FileScan parquet default.empty_right_table[k#229L] Batched: true, DataFilters: [isnotnull(k#229L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/chengsu/spark/sql/core/spark-warehouse/org.apache.spark.sq..., PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct ``` Codegen failure - https://gist.github.com/c21/ea760c75b546d903247582be656d9d66 . The uninitialized variable `_limit_counter_1` from `LocalLimitExec` is referenced in `ColumnarToRowExec`, but `BroadcastHashJoinExec` does not call `LocalLimitExec.doConsume()` to initialize the counter variable. The fix is to move the counter variable initialization to `doProduce()`, as in whole stage code-gen framework, `doProduce()` will definitely be called if upstream operators `doProduce()`/`doConsume()` is called. Note: this only happens in AQE disabled case, because we have an AQE optimization rule [EliminateUnnecessaryJoin](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala#L69) to change the whole query to an empty `LocalRelation` if inner join broadcast side is empty with AQE enabled. ### Why are the changes needed? Fix query failure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `SQLQuerySuite.scala`. Closes #31892 from c21/limit-fix. Authored-by: Cheng Su Signed-off-by: Takeshi Yamamuro --- .../apache/spark/sql/execution/limit.scala | 12 ++++++++---- .../org/apache/spark/sql/SQLQuerySuite.scala | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 0b74a2667a273..d8f67fb7357e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -132,14 +132,18 @@ trait BaseLimitExec extends LimitExec with CodegenSupport { } protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { // The counter name is already obtained by the upstream operators via `limitNotReachedChecks`. // Here we have to inline it to not change its name. This is fine as we won't have many limit // operators in one query. + // + // Note: create counter variable here instead of `doConsume()` to avoid compilation error, + // because upstream operators might not call `doConsume()` here + // (e.g. `HashJoin.codegenInner()`). ctx.addMutableState(CodeGenerator.JAVA_INT, countTerm, forceInline = true, useFreshName = false) + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { s""" | if ($countTerm < $limit) { | $countTerm += 1; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f709d80345606..00cbd73533ab9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4097,6 +4097,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkAnswer(df2, Seq(Row(2, 1, 1), Row(4, 2, 2))) } } + + test("SPARK-34796: Avoid code-gen compilation error for LIMIT query") { + withTable("left_table", "empty_right_table", "output_table") { + spark.range(5).toDF("k").write.saveAsTable("left_table") + spark.range(0).toDF("k").write.saveAsTable("empty_right_table") + + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + spark.sql("CREATE TABLE output_table (k INT) USING parquet") + spark.sql( + """ + |INSERT INTO TABLE output_table + |SELECT t1.k FROM left_table t1 + |JOIN empty_right_table t2 + |ON t1.k = t2.k + |LIMIT 3 + """.stripMargin) + } + } + } } case class Foo(bar: Option[String])