New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36646][SQL] Push down group by partition column for aggregate #34445
Conversation
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144773 has finished for PR 34445 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huaxingao. I think core logic looks good, just have some minor comments. cc @viirya.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good just a few nits
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
Outdated
Show resolved
Hide resolved
...test/scala/org/apache/spark/sql/execution/datasources/FileSourceAggregatePushDownSuite.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
Outdated
Show resolved
Hide resolved
Test build #144855 has finished for PR 34445 at commit
|
Kubernetes integration test status failure |
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #144957 has finished for PR 34445 at commit
|
if (!partitionSchema.names.sameElements(groupByColNames)) { | ||
groupByColNames.foreach { col => | ||
val index = partitionSchema.names.indexOf(col) | ||
val v = partitionValues.asInstanceOf[GenericInternalRow].values(index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious: is this always guaranteed to be GenericInternalRow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to me that the partitionValues
comes from PartitionPath
, which always contains GenericInternalRow
.
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/AggregatePushDownUtils.scala
Show resolved
Hide resolved
...la/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala
Outdated
Show resolved
Hide resolved
Kubernetes integration test starting |
Kubernetes integration test status failure |
Test build #145204 has finished for PR 34445 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @huaxingao , looks good, just a few more nits after which it's ready to merge I think.
return None | ||
} | ||
|
||
if (aggregation.groupByColumns.nonEmpty && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe add some comments explaining the reasoning why we have this check and only support the case when group by columns is the same as partition columns. What if the number of group by columns is smaller than that of partition columns?
partitionNames.size != aggregation.groupByColumns.length) { | ||
return None | ||
} | ||
aggregation.groupByColumns.foreach { col => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe also add some comments here - it's not that easy to understand and can help the maintenance of this code.
*/ | ||
def getSchemaWithoutGroupingExpression( | ||
aggregation: Aggregation, | ||
aggSchema: StructType): StructType = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe swap the order of aggSchema
and aggregation
here, as we're modifying the schema here with the info from aggregation.
val expected_plan_fragment = | ||
"PushedAggregation: [COUNT(*), COUNT(value), MAX(value), MIN(value)]," + | ||
" PushedFilters: [], PushedGroupBy: [p1, p2, p3, p4]" | ||
// checkKeywordsExistsInExplain(df, expected_plan_fragment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending CI, thanks @huaxingao ! it'd be great if you can add a bit more details in the PR description too.
Kubernetes integration test starting |
Kubernetes integration test status failure |
withTempView("tmp") { | ||
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp"); | ||
Seq("false", "true").foreach { enableVectorizedReader => | ||
withSQLConf(aggPushDownEnabledKey -> "true", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, can you test both aggPushDownEnabledKey
as true and false and see if the results are the same?
withTempView("tmp") { | ||
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp"); | ||
Seq("false", "true").foreach { enableVectorizedReader => | ||
withSQLConf(aggPushDownEnabledKey -> "true", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too. We should make sure aggPushDownEnabledKey
won't change results.
val filePath = new Path(new URI(file.filePath)) | ||
|
||
if (aggregation.nonEmpty) { | ||
return buildReaderWithAggregates(filePath, conf) | ||
return buildReaderWithAggregates(file, conf) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems filePath
can be created after the if
block:
if (aggregation.nonEmpty) {
return buildReaderWithAggregates(file, conf)
}
val filePath = new Path(new URI(file.filePath))
if (aggregation.nonEmpty) { | ||
return buildColumnarReaderWithAggregates(filePath, conf) | ||
return buildColumnarReaderWithAggregates(file, conf) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
@@ -250,8 +261,7 @@ object ParquetUtils { | |||
schemaName = "count(" + count.column.fieldNames.head + ")" | |||
rowCount += block.getRowCount | |||
var isPartitionCol = false | |||
if (partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)) | |||
.toSet.contains(count.column.fieldNames.head)) { | |||
if (partitionSchema.fields.map(_.name).toSet.contains(count.column.fieldNames.head)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't need check case sensitivity now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems to me no need to check case sensitivity because I have normalized aggregates and group by columns in V2ScanRelationPushDown
.
Test build #145294 has finished for PR 34445 at commit
|
spark.read.format(format).load(dir.getCanonicalPath).createOrReplaceTempView("tmp") | ||
val query = "SELECT count(*), count(value), max(value), min(value)," + | ||
" p4, p2, p3, p1 FROM tmp GROUP BY p1, p2, p3, p4" | ||
val expected = sql(query).collect |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, if we enable aggregate push down later one day, this test might be ignorantly missed to its original goal. Should we explicitly set aggPushDownEnabledKey
as false and collect the expected results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. One remaining question about test.
Test build #145344 has finished for PR 34445 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Kubernetes integration test starting |
Kubernetes integration test status failure |
Last commit is test-only and GA was passed. Merging to master. Thanks. |
Thank you all |
Test build #145353 has finished for PR 34445 at commit
|
What changes were proposed in this pull request?
lift the restriction for aggregate push down for parquet and orc if group by columns are the same as the partition cols
Why are the changes needed?
previously, if there are group by columns, we don't push down aggregate to data source.
After the change, if the group by columns are the same as the partition columns, we will push down aggregates.
Does this PR introduce any user-facing change?
no
How was this patch tested?
new tests