Skip to content

Conversation

@amanomer
Copy link
Contributor

What changes were proposed in this pull request?

Added a new rule NestColumnAliasing.Overaggregate which will help pushdown nested columns wrapped inside Aggregate.

Why are the changes needed?

Since, spark is supporting nested schema pushdown when used with Project (SELECT query), we also need to support same pushdown ability when user perform aggregation (such as sum) on nested columns.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added test cases.

@amanomer
Copy link
Contributor Author

amanomer commented Jan 2, 2020

@cloud-fan @HyukjinKwon @maropu kindly review this approach for nested schema pruning.

@maropu
Copy link
Member

maropu commented Jan 2, 2020

also, cc: @viirya @dbtsai

@maropu
Copy link
Member

maropu commented Jan 2, 2020

ok to test

case _ => false
}

object OverAggregate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregateNestedColumnAliasing?

case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) =>
a.copy(child = prunedChild(child, a.references))
// case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) =>
// a.copy(child = prunedChild(child, a.references))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

checkAnswer(query, Row("Y.", 1) :: Row("X.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
}

testSchemaPruning("Spark-27217: Push nested column when used in Aggregate") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: plz capitalize Spark in the head.

@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116032 has finished for PR 27056 at commit 32dd333.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 2, 2020

Test build #116034 has finished for PR 27056 at commit bb77e86.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@amanomer
Copy link
Contributor Author

amanomer commented Jan 3, 2020

@cloud-fan kindly give feedback on current approach

@amanomer
Copy link
Contributor Author

amanomer commented Jan 3, 2020

cc @cloud-fan @wangyum
Kindly review this PR

Comment on lines +189 to +210
val (nestedFieldReferences, otherRootReferences) =
allExpressions.flatMap(collectRootReferenceAndExtractValue).partition {
case _: ExtractValue => true
case _ => false
}

val aliasSub = nestedFieldReferences.asInstanceOf[Seq[ExtractValue]]
.filter(!_.references.subsetOf(AttributeSet(otherRootReferences)))
.groupBy(_.references.head).flatMap {
case (attr, nestedFields: Seq[ExtractValue]) =>
val nestedFieldToAlias = nestedFields.distinct.map { f =>
Alias(f, f.sql)()
}

if (nestedFieldToAlias.nonEmpty &&
nestedFieldToAlias.length < totalFieldNum(attr.dataType)) {
Some(nestedFieldToAlias)
} else {
None
}
}
val newProjectList: Seq[NamedExpression] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code seems be copied from NestedColumnAliasing. I think we can reuse the methods like getAliasSubMap.

Comment on lines -589 to -590
case a @ Aggregate(_, _, child) if !child.outputSet.subsetOf(a.references) =>
a.copy(child = prunedChild(child, a.references))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove that? This is for top-level column pruning.

Comment on lines +302 to +305
testSchemaPruning("SPARK-27217: Push nested column when used in Aggregate") {
val query = sql("select sum(employer.id) from contacts")
checkScan(query, "struct<employer:struct<id:INT>>")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not a bug. We may not need a JIRA ticket prefix.

@viirya
Copy link
Member

viirya commented Jan 3, 2020

Actually I'm thinking to add nested column pruning rule for these logical operators. I think it should be feasible to have a more general one instead of adding one by one for each operator.

@dongjoon-hyun
Copy link
Member

Retest this please

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amanomer . Could you fix all UT failures?

@SparkQA
Copy link

SparkQA commented Jan 5, 2020

Test build #116122 has finished for PR 27056 at commit bb77e86.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@amanomer
Copy link
Contributor Author

amanomer commented Jan 6, 2020

Actually I'm thinking to add nested column pruning rule for these logical operators. I think it should be feasible to have a more general one instead of adding one by one for each operator.

I will make this rule general after resolving issues for Aggregate.

@maropu
Copy link
Member

maropu commented Jan 8, 2020

Yea, as @viirya said above, I also like more general one.

@dongjoon-hyun
Copy link
Member

Gentle ping, @amanomer .

@nandini57
Copy link

Hi Spark Team, I am inclined to add this change as a custom logical rule by copying over the Aggregate Nesting object in my spark project.We are using Spark 2.3 version and no immediate plans to move over to 3.x. Do you think it is a viable approach ?Any guidance much appreciated

@maropu
Copy link
Member

maropu commented Feb 23, 2020

Probably, you'd be better to ask that in the spark mailing list. Anyway, we already have SparkSessionExtensions (or SparkSession.experimental.extraOptimizations) for injecting custom rules in 3rd-party projects. So, I think you can do so by using these interfaces (they are experimental interfaces though).

@nandini57
Copy link

nandini57 commented Feb 23, 2020 via email

@ntlanglois
Copy link

This optimization in aggregates would greatly benefit some of our most expensive queries against our nested schema. We've seen up to 8x performance improvement against the same schema outside of aggregates, and to see anywhere close to this for our aggregation queries would be amazing!

There are numerous other optimizations in 3.0 that we're very excited for, but this SPARK-27217 seems like the only thing left that would hold back some of those optimizations from realizing their full potential in aggregations.

Thanks so much for everyone's time and work on this so far. Just patiently wondering, @amanomer, are there are any plans to re-open this pull request with the requested changes in the near future?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants