From f7218559f359effbb59be643623bfc199c9a81e4 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 29 Oct 2020 22:17:04 -0700 Subject: [PATCH 1/2] Make auto bucketed scan work with AQE --- .../adaptive/AdaptiveSparkPlanExec.scala | 5 ++- .../DisableUnnecessaryBucketedScan.scala | 4 ++- .../DisableUnnecessaryBucketedScanSuite.scala | 32 +++++++++++++++---- 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index a4a58dfe1de53..759da199dcfa9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._ +import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange._ @@ -85,6 +86,7 @@ case class AdaptiveSparkPlanExec( @transient private val removeRedundantProjects = RemoveRedundantProjects @transient private val removeRedundantSorts = RemoveRedundantSorts @transient private val ensureRequirements = EnsureRequirements + @transient private val disableUnnecessaryBucketedScan = DisableUnnecessaryBucketedScan // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of @@ -92,7 +94,8 @@ case class AdaptiveSparkPlanExec( private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( removeRedundantProjects, removeRedundantSorts, - ensureRequirements + ensureRequirements, + disableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules // A list of physical optimizer rules to be applied to a new stage before its execution. These diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala index 2bbd5f5d969dc..bb59f44abc761 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala @@ -101,7 +101,9 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] { case scan: FileSourceScanExec => if (isBucketedScanWithoutFilter(scan)) { if (!withInterestingPartition || (withExchange && withAllowedNode)) { - scan.copy(disableBucketedScan = true) + val nonBucketedScan = scan.copy(disableBucketedScan = true) + scan.logicalLink.foreach(nonBucketedScan.setLogicalLink) + nonBucketedScan } else { scan } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala index 70b74aed40eca..60845ce69820b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -21,6 +21,8 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION @@ -28,7 +30,8 @@ import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} class DisableUnnecessaryBucketedScanWithoutHiveSupportSuite extends DisableUnnecessaryBucketedScanSuite - with SharedSparkSession { + with SharedSparkSession + with DisableAdaptiveExecutionSuite { protected override def beforeAll(): Unit = { super.beforeAll() @@ -36,7 +39,22 @@ class DisableUnnecessaryBucketedScanWithoutHiveSupportSuite } } -abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest with SQLTestUtils { +class DisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE + extends DisableUnnecessaryBucketedScanSuite + with SharedSparkSession + with EnableAdaptiveExecutionSuite { + + protected override def beforeAll(): Unit = { + super.beforeAll() + assert(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") + } +} + +abstract class DisableUnnecessaryBucketedScanSuite + extends QueryTest + with SQLTestUtils + with AdaptiveSparkPlanHelper { + import testImplicits._ private lazy val df1 = @@ -51,7 +69,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest with SQLTes def checkNumBucketedScan(query: String, expectedNumBucketedScan: Int): Unit = { val plan = sql(query).queryExecution.executedPlan - val bucketedScan = plan.collect { case s: FileSourceScanExec if s.bucketedScan => s } + val bucketedScan = collect(plan) { case s: FileSourceScanExec if s.bucketedScan => s } assert(bucketedScan.length == expectedNumBucketedScan) } @@ -230,14 +248,14 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest with SQLTes assertCached(spark.table("t1")) // Verify cached bucketed table scan not disabled - val partitioning = spark.table("t1").queryExecution.executedPlan - .outputPartitioning - assert(partitioning match { + val inMemoryScan = find(spark.table("t1").queryExecution.executedPlan)( + _.isInstanceOf[InMemoryTableScanExec]) + assert(inMemoryScan.get.outputPartitioning match { case HashPartitioning(Seq(column: AttributeReference), 8) if column.name == "i" => true case _ => false }) val aggregateQueryPlan = sql("SELECT SUM(i) FROM t1 GROUP BY i").queryExecution.executedPlan - assert(aggregateQueryPlan.find(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) + assert(find(aggregateQueryPlan)(_.isInstanceOf[ShuffleExchangeExec]).isEmpty) } } } From 36815fc2d7ef1f436db3fc26b3dffa924d5b65ae Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 29 Oct 2020 23:40:35 -0700 Subject: [PATCH 2/2] Address comments --- .../execution/adaptive/AdaptiveSparkPlanExec.scala | 13 ++++--------- .../DisableUnnecessaryBucketedScanSuite.scala | 6 +++--- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 759da199dcfa9..4ae33311d5a24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -83,19 +83,14 @@ case class AdaptiveSparkPlanExec( // The logical plan optimizer for re-optimizing the current logical plan. @transient private val optimizer = new AQEOptimizer(conf) - @transient private val removeRedundantProjects = RemoveRedundantProjects - @transient private val removeRedundantSorts = RemoveRedundantSorts - @transient private val ensureRequirements = EnsureRequirements - @transient private val disableUnnecessaryBucketedScan = DisableUnnecessaryBucketedScan - // A list of physical plan rules to be applied before creation of query stages. The physical // plan should reach a final status of query stages (i.e., no more addition or removal of // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( - removeRedundantProjects, - removeRedundantSorts, - ensureRequirements, - disableUnnecessaryBucketedScan + RemoveRedundantProjects, + RemoveRedundantSorts, + EnsureRequirements, + DisableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules // A list of physical optimizer rules to be applied to a new stage before its execution. These diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala index 60845ce69820b..1fdd3be88f782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala @@ -248,9 +248,9 @@ abstract class DisableUnnecessaryBucketedScanSuite assertCached(spark.table("t1")) // Verify cached bucketed table scan not disabled - val inMemoryScan = find(spark.table("t1").queryExecution.executedPlan)( - _.isInstanceOf[InMemoryTableScanExec]) - assert(inMemoryScan.get.outputPartitioning match { + val partitioning = stripAQEPlan(spark.table("t1").queryExecution.executedPlan) + .outputPartitioning + assert(partitioning match { case HashPartitioning(Seq(column: AttributeReference), 8) if column.name == "i" => true case _ => false })