Skip to content

Commit

Permalink
[SPARK-30721][SQL][TESTS] Fix DataFrameAggregateSuite when enabling AQE
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

update `DataFrameAggregateSuite` to make it pass with AQE

### Why are the changes needed?

We don't need to turn off AQE in `DataFrameAggregateSuite`

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

run `DataFrameAggregateSuite` locally with AQE on.

Closes #27451 from cloud-fan/aqe-test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
cloud-fan authored and dongjoon-hyun committed Feb 5, 2020
1 parent 4938905 commit 3b26f80
Showing 1 changed file with 7 additions and 8 deletions.
Expand Up @@ -615,34 +615,33 @@ class DataFrameAggregateSuite extends QueryTest
Seq((true, true), (true, false), (false, true), (false, false))) {
withSQLConf(
(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString),
(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString),
(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false")) {
// When enable AQE, the WholeStageCodegenExec is added during QueryStageExec.
(SQLConf.USE_OBJECT_HASH_AGG.key, useObjectHashAgg.toString)) {

val df = Seq(("1", 1), ("1", 2), ("2", 3), ("2", 4)).toDF("x", "y")

// test case for HashAggregate
val hashAggDF = df.groupBy("x").agg(c, sum("y"))
hashAggDF.collect()
val hashAggPlan = hashAggDF.queryExecution.executedPlan
if (wholeStage) {
assert(hashAggPlan.find {
assert(find(hashAggPlan) {
case WholeStageCodegenExec(_: HashAggregateExec) => true
case _ => false
}.isDefined)
} else {
assert(hashAggPlan.isInstanceOf[HashAggregateExec])
assert(stripAQEPlan(hashAggPlan).isInstanceOf[HashAggregateExec])
}
hashAggDF.collect()

// test case for ObjectHashAggregate and SortAggregate
val objHashAggOrSortAggDF = df.groupBy("x").agg(c, collect_list("y"))
val objHashAggOrSortAggPlan = objHashAggOrSortAggDF.queryExecution.executedPlan
objHashAggOrSortAggDF.collect()
val objHashAggOrSortAggPlan =
stripAQEPlan(objHashAggOrSortAggDF.queryExecution.executedPlan)
if (useObjectHashAgg) {
assert(objHashAggOrSortAggPlan.isInstanceOf[ObjectHashAggregateExec])
} else {
assert(objHashAggOrSortAggPlan.isInstanceOf[SortAggregateExec])
}
objHashAggOrSortAggDF.collect()
}
}
}
Expand Down

0 comments on commit 3b26f80

Please sign in to comment.