New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-34952][SQL] DSv2 Aggregate push down APIs #33352
Conversation
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141047 has finished for PR 33352 at commit
|
0cce896
to
a5386f0
Compare
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141053 has finished for PR 33352 at commit
|
|
||
/** | ||
* A mix-in interface for {@link ScanBuilder}. Data source can implement this interface to | ||
* push down aggregates. Depends on the data source implementation, the aggregates may not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A mix-in interface for {@link ScanBuilder}. Data sources can implement this interface to
push down aggregates. Spark assumes that the data source can't fully complete the
grouping work, and will group the data source output again. For queries like
"SELECT min(value) AS m FROM t GROUP BY key", after pushing down the aggregate
to the data source, the data source can still output data with duplicated keys, which is OK
as Spark will do GROUP BY key again. The final query plan can be something like this:
{{{
Aggregate [key#1], [min(min(value)#2) AS m#3]
+- RelationV2[key#1, min(value)#2]
}}}
Similarly, if there is no grouping expression, the data source can still output more than one rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also let's use valid Java doc syntax, e.g., add <p>
between paragraphs, properly format code blocks, etc.
case class Max(column: Expression, dataType: DataType) extends AggregateFunc | ||
case class Sum(column: Expression, dataType: DataType, isDistinct: Boolean) | ||
extends AggregateFunc | ||
case class Count(column: Expression, dataType: DataType, isDistinct: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry to change my mind at the last second. I think it's very unlikely that a data source can support something like max(a + b)
, group by a + b
. I think it's clearer to use NamedReference
instead of Expression
here.
For v2 partitioning, it's always named. e.g. CREATE TABLE ... PARTITIONED BY year(ts)
, the partitioning has a name and you can get it by DESC TABLE
, which calls SupportsPartitionManagement.partitionSchema
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For count(1)
, let's create a special one class CountOne
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW count
doesn't need a data type? It always returns long.
@@ -870,6 +870,12 @@ object SQLConf { | |||
.checkValue(threshold => threshold >= 0, "The threshold must not be negative.") | |||
.createWithDefault(10) | |||
|
|||
val PARQUET_AGGREGATE_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.aggregatePushdown") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the config is per source. For this API-only PR, we don't need any config.
// Aggregate Functions in SQL statement. | ||
// e.g. SELECT COUNT(EmployeeID), Max(salary) FROM dept GROUP BY deptID | ||
// aggregateExpressions are (COUNT(EmployeeID), Max(salary)), groupByColumns are (deptID) | ||
case class Aggregation(aggregateExpressions: Seq[AggregateFunc], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is public DS v2 API, can we write it in Java?
// No need to do column pruning because only the aggregate columns are used as | ||
// DataSourceV2ScanRelation output columns. All the other columns are not | ||
// included in the output. Since PushDownUtils.pruneColumns is not called, | ||
// ScanBuilder.requiredSchema is not pruned, but ScanBuilder.requiredSchema is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no ScanBuilder.requiredSchema
.
""".stripMargin) | ||
|
||
val scanRelation = DataSourceV2ScanRelation(sHolder.relation, scan, output) | ||
assert(scanRelation.output.length == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we check this earlier? right after val newOutput = scan.readSchema().toAttributes
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
Test build #141171 has finished for PR 33352 at commit
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141175 has finished for PR 33352 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #141182 has finished for PR 33352 at commit
|
val dialect = JdbcDialects.get(jdbcOptions.url) | ||
val compiledAgg = JDBCRDD.compileAggregates(aggregation.getAggregateExpressions, dialect) | ||
|
||
var pushedSchema = new StructType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputSchema
is a better name
f.pushedFilters() | ||
case _ => Array.empty[sources.Filter] | ||
} | ||
V1ScanWrapper(v1, Array.empty[sources.Filter], pushedFilters, aggregation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
V1ScanWrapper.translatedFilters
is always Nil now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like only for display purposes. I'm OK to remove it but let's do it more explicitly and remove this field from V1ScanWrapper
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #141450 has finished for PR 33352 at commit
|
@@ -332,6 +334,7 @@ object DataSourceStrategy | |||
l.output.toStructType, | |||
Set.empty, | |||
Set.empty, | |||
Option.empty[Aggregation], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nit: we can just write None
case aggNode @ Aggregate(groupingExpressions, resultExpressions, child) => | ||
child match { | ||
case ScanOperation(project, filters, sHolder: ScanBuilderHolder) | ||
if project.forall(_.isInstanceOf[AttributeReference]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can add filter.isEmpty
here, instead of writing if (filters.length == 0)
in the body
if (!jdbcOptions.pushDownAggregate) return false | ||
|
||
val dialect = JdbcDialects.get(jdbcOptions.url) | ||
val compiledAgg = JDBCRDD.compileAggregates(aggregation.aggregateExpressions, dialect) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we return false earlier if there are nested fields? Otherwise we will hit assertion error in compileAggregates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, we should return false earlier if there are nested fields. Fixed. Please check one more time.
var outputSchema = new StructType() | ||
aggregation.groupByColumns.foreach { col => | ||
val structField = getStructFieldForCol(col) | ||
outputSchema = outputSchema.add(StructField(structField.name, structField.dataType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outputSchema.add(structField)
?
@@ -214,4 +237,204 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession { | |||
checkAnswer(sql("SELECT name, id FROM h2.test.abc"), Row("bob", 4)) | |||
} | |||
} | |||
|
|||
test("scan with aggregate push-down") { | |||
val df1 = sql("select MAX(SALARY), MIN(BONUS) FROM h2.test.employee where dept > 0" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create one test for each of them? then we can a title for each test. e.g. this one is aggregate pushdown with GROUP BY
, and the next is aggregate pushdown without GROUP BY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for some minor comments, thanks for your patience!
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #141532 has finished for PR 33352 at commit
|
thanks, merging to master/3.2! |
### What changes were proposed in this pull request? Add interfaces and APIs to push down Aggregates to V2 Data Source ### Why are the changes needed? improve performance ### Does this PR introduce _any_ user-facing change? SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED was added. If this is set to true, Aggregates are pushed down to Data Source. ### How was this patch tested? New tests were added to test aggregates push down in #32049. The original PR is split into two PRs. This PR doesn't contain new tests. Closes #33352 from huaxingao/aggPushDownInterface. Authored-by: Huaxin Gao <huaxin_gao@apple.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit c561ee6) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@@ -143,6 +172,8 @@ object JDBCRDD extends Logging { | |||
* @param parts - An array of JDBCPartitions specifying partition ids and | |||
* per-partition WHERE clauses. | |||
* @param options - JDBC options that contains url, table and other information. | |||
* @param requiredSchema - The schema of the columns to SELECT. | |||
* @param aggregation - The pushed down aggregation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the param doc correct? I don't see aggregation
parameter but outputSchema
and groupByColumns
.
* be: grouping columns, aggregate columns (in the same order as the aggregate functions in | ||
* the given Aggregation). | ||
*/ | ||
boolean pushAggregation(Aggregation aggregation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For public API, we should document what the returned value means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. What is the returned boolean for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, although there is an option to control JDBC aggregate pushdown, do we need an overall SQL config to control it? Said we have other data source implementing the API, we may not have an option to disable it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for reviewing this late! I was interested in taking a look too missed it. Added a bunch of comments and I saw @viirya also left some. Perhaps we can address them in a separate PR?
Also, it seems this PR not only added APIs but also implementation for JDBC data sources? If so, it's better to update the PR description accordingly.
*/ | ||
@Evolving | ||
public final class Aggregation implements Serializable { | ||
private AggregateFunc[] aggregateExpressions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: mark these as final
?
public FieldReference column() { | ||
return column; | ||
} | ||
public boolean isDinstinct() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: isDinstinct
-> isDistinct
.
*/ | ||
@Evolving | ||
public final class Count implements AggregateFunc { | ||
private FieldReference column; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto: make these final
private boolean isDistinct; | ||
|
||
public Count(FieldReference column, boolean isDistinct) { | ||
this.column = column; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 space indentation?
} | ||
|
||
public FieldReference column() { | ||
return column; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
* "SELECT min(value) AS m FROM t GROUP BY key", after pushing down the aggregate | ||
* to the data source, the data source can still output data with duplicated keys, which is OK | ||
* as Spark will do GROUP BY key again. The final query plan can be something like this: | ||
* {{{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not properly rendered, you can use:
* <pre>
* Aggregate [key#1], [min(min(value)#2) AS m#3]
* +- RelationV2[key#1, min(value)#2]
* </pre>
* Similarly, if there is no grouping expression, the data source can still output more than one
* rows.
instead. Note that the following <p>
is also removed.
* be: grouping columns, aggregate columns (in the same order as the aggregate functions in | ||
* the given Aggregation). | ||
*/ | ||
boolean pushAggregation(Aggregation aggregation); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. What is the returned boolean for?
} | ||
|
||
val agg = new Aggregation(translatedAggregates.toArray, translatedGroupBys.toArray) | ||
if (r.pushAggregation(agg)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can just use Some(agg).filter(r.pushAggregation)
|
||
scanBuilder match { | ||
case r: SupportsPushDownAggregates => | ||
val translatedAggregates = aggregates.map(DataSourceStrategy.translateAggregate).flatten |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can use flatMap
instead of map
+ flatten
.
filterCondition.map(Filter(_, sHolder)).getOrElse(sHolder) | ||
} | ||
|
||
def pushdownAggregate(plan: LogicalPlan): LogicalPlan = plan.transform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to pushDownAggregate
to keep it consistent with pushDownFilters
?
@huaxingao created a followup #33526. |
The config is per data source. For example, when we implement the parquet aggregate push down later, we will add a config for parquet, something like |
### What changes were proposed in this pull request? This is a followup of #33352 , to simplify the JDBC aggregate pushdown: 1. We should get the schema of the aggregate query by asking the JDBC server, instead of calculating it by ourselves. This can simplify the code a lot, and is also more robust: the data type of SUM may vary in different databases, it's fragile to assume they are always the same as Spark. 2. because of 1, now we can remove the `dataType` property from the public `Sum` expression. This PR also contains some small improvements: 1. Spark should deduplicate the aggregate expressions before pushing them down. 2. Improve the `toString` of public aggregate expressions to make them more SQL. ### Why are the changes needed? code and API simplification ### Does this PR introduce _any_ user-facing change? this API is not released yet. ### How was this patch tested? existing tests Closes #33579 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
### What changes were proposed in this pull request? This is a followup of #33352 , to simplify the JDBC aggregate pushdown: 1. We should get the schema of the aggregate query by asking the JDBC server, instead of calculating it by ourselves. This can simplify the code a lot, and is also more robust: the data type of SUM may vary in different databases, it's fragile to assume they are always the same as Spark. 2. because of 1, now we can remove the `dataType` property from the public `Sum` expression. This PR also contains some small improvements: 1. Spark should deduplicate the aggregate expressions before pushing them down. 2. Improve the `toString` of public aggregate expressions to make them more SQL. ### Why are the changes needed? code and API simplification ### Does this PR introduce _any_ user-facing change? this API is not released yet. ### How was this patch tested? existing tests Closes #33579 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 387a251) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
What changes were proposed in this pull request?
Add interfaces and APIs to push down Aggregates to V2 Data Source.
Also add JDBC implementation so we can test the new APIs.
Parquet implementation will be added using a separate PR.
Why are the changes needed?
improve performance
Does this PR introduce any user-facing change?
SupportsPushDownAggregates
is added. Data sources can implement this interface to push down aggregates.JDBC_PUSHDOWN_AGGREGATE
is added. If sets to true, we will push down aggregate to JDBC data source.How was this patch tested?
Add new tests in
JDBCV2Suite
to test aggregate push down.