Skip to content

Commit

Permalink
[SPARK-21871][SQL] Fix infinite loop when bytecode size is larger tha…
Browse files Browse the repository at this point in the history
…n spark.sql.codegen.hugeMethodLimit

## What changes were proposed in this pull request?
When exceeding `spark.sql.codegen.hugeMethodLimit`, the runtime fallbacks to the Volcano iterator solution. This could cause an infinite loop when `FileSourceScanExec` can use the columnar batch to read the data. This PR is to fix the issue.

## How was this patch tested?
Added a test

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19440 from gatorsmile/testt.
  • Loading branch information
gatorsmile committed Oct 6, 2017
1 parent ae61f18 commit 83488cc
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
Expand Up @@ -392,12 +392,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co

// Check if compiled code has a too large function
if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
logWarning(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size was $maxCodeSize, this value went over the limit " +
logInfo(s"Found too long generated codes and JIT optimization might not work: " +
s"the bytecode size ($maxCodeSize) is above the limit " +
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan. To avoid this, you can raise the limit " +
s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
return child.execute()
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
child match {
// The fallback solution of batch file source scan still uses WholeStageCodegenExec
case f: FileSourceScanExec if f.supportsBatch => // do nothing
case _ => return child.execute()
}
}

val references = ctx.references.toArray
Expand Down
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution

import org.apache.spark.sql.Row
import org.apache.spark.sql.{QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
Expand All @@ -28,7 +28,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}

class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {

test("range/filter should be combined") {
val df = spark.range(10).filter("id = 1").selectExpr("id + 1")
Expand Down Expand Up @@ -185,4 +185,23 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}

test("bytecode of batch file scan exceeds the limit of WHOLESTAGE_HUGE_METHOD_LIMIT") {
import testImplicits._
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = spark.range(10).select(Seq.tabulate(201) {i => ('id + i).as(s"c$i")} : _*)
df.write.mode(SaveMode.Overwrite).parquet(path)

withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "202",
SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key -> "2000") {
// wide table batch scan causes the byte code of codegen exceeds the limit of
// WHOLESTAGE_HUGE_METHOD_LIMIT
val df2 = spark.read.parquet(path)
val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
assert(fileScan2.asInstanceOf[FileSourceScanExec].supportsBatch)
checkAnswer(df2, df)
}
}
}
}

0 comments on commit 83488cc

Please sign in to comment.