Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Nov 16, 2021
1 parent 4fb313b commit 4eeae6d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
Expand Up @@ -94,9 +94,20 @@ object AggregatePushDownUtils {

if (aggregation.groupByColumns.nonEmpty &&
partitionNames.size != aggregation.groupByColumns.length) {
// If there are group by columns, we only push down if the group by columns are the same as
// the partition columns. In theory, if group by columns are a subset of partition columns,
// we should still be able to push down. e.g. if table t has partition columns p1, p2, and p3,
// SELECT MAX(c) FROM t GROUP BY p1, p2 should still be able to push down. However, the
// partial aggregation pushed down to data source needs to be
// SELECT p1, p2, p3, MAX(c) FROM t GROUP BY p1, p2, p3, and Spark layer
// needs to have a final aggregation such as SELECT MAX(c) FROM t GROUP BY p1, p2, then the
// pushed down query schema is different from the query schema at Spark. We will keep
// aggregate push down simple and don't handle this complicate case for now.
return None
}
aggregation.groupByColumns.foreach { col =>
// don't push down if the group by columns are not the same as the partition columns (orders
// doesn't matter because reorder can be done at data source layer)
if (col.fieldNames.length != 1 || !isPartitionCol(col)) return None
finalSchema = finalSchema.add(getStructFieldForCol(col))
}
Expand Down Expand Up @@ -150,8 +161,8 @@ object AggregatePushDownUtils {
* Return the schema for aggregates only (exclude group by columns)
*/
def getSchemaWithoutGroupingExpression(
aggregation: Aggregation,
aggSchema: StructType): StructType = {
aggSchema: StructType,
aggregation: Aggregation): StructType = {
val numOfGroupByColumns = aggregation.groupByColumns.length
if (numOfGroupByColumns > 0) {
new StructType(aggSchema.fields.drop(numOfGroupByColumns))
Expand Down
Expand Up @@ -460,7 +460,7 @@ object OrcUtils extends Logging {
// if there are group by columns, we will build result row first,
// and then append group by columns values (partition columns values) to the result row.
val schemaWithoutGroupBy =
AggregatePushDownUtils.getSchemaWithoutGroupingExpression(aggregation, aggSchema)
AggregatePushDownUtils.getSchemaWithoutGroupingExpression(aggSchema, aggregation)

val aggORCValues: Seq[WritableComparable[_]] =
aggregation.aggregateExpressions.zipWithIndex.map {
Expand Down
Expand Up @@ -170,7 +170,7 @@ object ParquetUtils {
// if there are group by columns, we will build result row first,
// and then append group by columns values (partition columns values) to the result row.
val schemaWithoutGroupBy =
AggregatePushDownUtils.getSchemaWithoutGroupingExpression(aggregation, aggSchema)
AggregatePushDownUtils.getSchemaWithoutGroupingExpression(aggSchema, aggregation)

val schemaConverter = new ParquetToSparkSchemaConverter
val converter = new ParquetRowConverter(schemaConverter, parquetSchema, schemaWithoutGroupBy,
Expand Down
Expand Up @@ -309,7 +309,7 @@ trait FileSourceAggregatePushDownSuite
val expected_plan_fragment =
"PushedAggregation: [COUNT(*), COUNT(value), MAX(value), MIN(value)]," +
" PushedFilters: [], PushedGroupBy: [p1, p2, p3, p4]"
// checkKeywordsExistsInExplain(df, expected_plan_fragment)
checkKeywordsExistsInExplain(df, expected_plan_fragment)
}
checkAnswer(df, Seq(Row(1, 1, 5, 5, 8, 1, 5, 2), Row(1, 1, 4, 4, 9, 1, 4, 2),
Row(2, 2, 6, 3, 8, 1, 4, 2), Row(4, 4, 10, 1, 6, 2, 5, 1),
Expand Down

0 comments on commit 4eeae6d

Please sign in to comment.