Skip to content

Commit

Permalink
[SPARK-37262][SQL] Don't log empty aggregate and group by in JDBCScan
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Currently, the empty pushed aggregate and pushed group by are logged in Explain for JDBCScan
```
Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$172e75786 [NAME#1,SALARY#2] PushedAggregates: [], PushedFilters: [IsNotNull(SALARY), GreaterThan(SALARY,100.00)], PushedGroupby: [], ReadSchema: struct<NAME:string,SALARY:decimal(20,2)>
```

After the fix, the JDBCSScan will be
```
Scan org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$172e75786 [NAME#1,SALARY#2] PushedFilters: [IsNotNull(SALARY), GreaterThan(SALARY,100.00)], ReadSchema: struct<NAME:string,SALARY:decimal(20,2)>
```

### Why are the changes needed?
address this comment #34451 (comment)

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
existing tests

Closes #34540 from huaxingao/aggExplain.

Authored-by: Huaxin Gao <huaxin_gao@apple.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
huaxingao authored and cloud-fan committed Nov 11, 2021
1 parent f1532a2 commit f9713b6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 26 deletions.
Expand Up @@ -134,13 +134,6 @@ case class RowDataSourceScanExec(

def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")

val (aggString, groupByString) = if (pushedDownOperators.aggregation.nonEmpty) {
(seqToString(pushedDownOperators.aggregation.get.aggregateExpressions),
seqToString(pushedDownOperators.aggregation.get.groupByColumns))
} else {
("[]", "[]")
}

val markedFilters = if (filters.nonEmpty) {
for (filter <- filters) yield {
if (handledFilters.contains(filter)) s"*$filter" else s"$filter"
Expand All @@ -151,9 +144,10 @@ case class RowDataSourceScanExec(

Map(
"ReadSchema" -> requiredSchema.catalogString,
"PushedFilters" -> seqToString(markedFilters.toSeq),
"PushedAggregates" -> aggString,
"PushedGroupby" -> groupByString) ++
"PushedFilters" -> seqToString(markedFilters.toSeq)) ++
pushedDownOperators.aggregation.fold(Map[String, String]()) { v =>
Map("PushedAggregates" -> seqToString(v.aggregateExpressions),
"PushedGroupByColumns" -> seqToString(v.groupByColumns))} ++
pushedDownOperators.limit.map(value => "PushedLimit" -> s"LIMIT $value") ++
pushedDownOperators.sample.map(v => "PushedSample" ->
s"SAMPLE (${(v.upperBound - v.lowerBound) * 100}) ${v.withReplacement} SEED(${v.seed})"
Expand Down
32 changes: 16 additions & 16 deletions sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
Expand Up @@ -328,7 +328,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(10000, 1000), Row(12000, 1200), Row(12000, 1200)))
Expand All @@ -345,7 +345,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(ID), MIN(ID)], " +
"PushedFilters: [IsNotNull(ID), GreaterThan(ID,0)], " +
"PushedGroupby: []"
"PushedGroupByColumns: []"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(2, 1)))
Expand Down Expand Up @@ -424,7 +424,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
Expand All @@ -437,7 +437,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(DISTINCT SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(19000), Row(22000), Row(12000)))
Expand All @@ -455,7 +455,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT, NAME]"
"PushedGroupByColumns: [DEPT, NAME]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(9000, 1200), Row(12000, 1200), Row(10000, 1300),
Expand All @@ -474,7 +474,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MAX(SALARY), MIN(BONUS)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(12000, 1200), Row(12000, 1200)))
Expand All @@ -489,7 +489,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [MIN(SALARY)], " +
"PushedFilters: [], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(1, 9000), Row(2, 10000), Row(6, 12000)))
Expand All @@ -512,7 +512,7 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val expected_plan_fragment =
"PushedAggregates: [SUM(SALARY)], " +
"PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
"PushedGroupby: [DEPT]"
"PushedGroupByColumns: [DEPT]"
checkKeywordsExistsInExplain(query, expected_plan_fragment)
}
checkAnswer(query, Seq(Row(6, 12000), Row(1, 19000), Row(2, 22000)))
Expand All @@ -536,10 +536,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
val df1 = sql("select * from h2.test.employee").toDF(cols: _*)
val df2 = df1.groupBy().sum("c")
df2.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregates: []" // aggregate over alias not push down
checkKeywordsExistsInExplain(df2, expected_plan_fragment)
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.aggregation.isEmpty)
}
}
checkAnswer(df2, Seq(Row(53000.00)))
}
Expand All @@ -554,10 +554,10 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
.filter(name($"shortName"))
.agg(sum($"SALARY").as("sum_salary"))
query.queryExecution.optimizedPlan.collect {
case _: DataSourceV2ScanRelation =>
val expected_plan_fragment =
"PushedAggregates: []"
checkKeywordsExistsInExplain(query, expected_plan_fragment)
case relation: DataSourceV2ScanRelation => relation.scan match {
case v1: V1ScanWrapper =>
assert(v1.pushedDownOperators.aggregation.isEmpty)
}
}
checkAnswer(query, Seq(Row(29000.0)))
}
Expand Down

0 comments on commit f9713b6

Please sign in to comment.