New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6006][SQL]: Optimize count distinct for high cardinality columns #4764
Conversation
@marmbrus can you please guide how to rewrite this in a better way ? |
test this please |
Test build #27960 has finished for PR 4764 at commit
|
can we test this again please ? |
Jenkins, retest this please. |
Test build #27994 has finished for PR 4764 at commit
|
Fixed the null count test failure. Optimization works only in case of single count distinct in select clause |
please retest |
please restest |
ok to test |
Test build #29635 has finished for PR 4764 at commit
|
… distinct in select clause (since it relies on hash partitioning of column values)
fixed the test case of zero count when there is no data. rebased with latest master. please retest |
Test FAILed. |
fixed test failures because of class cast exceptions. Please retest. |
Thanks for working ont his and sorry for the delay in reviewing it. My high level feedback is that I think we should optimize handling of distinct aggregation, but there are already plans to do this more holistically instead of as a point solution. If this is really important to you for some specific production workload, we could consider adding something simple now and removing it later, but otherwise I'd prefer to wait for the full solution. More specifically, I have some advice on how I would structure this if we were to move forward with this approach.
|
As a very rough sketch (this is totally untested and I'm probably missing cases), I'd hope the solution could look something like the following: object OptimizeSimpleDistincts extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Aggregate(Nil, Seq(agg), c) =>
val rewritten = agg transform {
case CountDistinct(Seq(c)) => Count(c)
case SumDistinct(c) => Sum(c)
}
if (rewritten != agg) {
Aggregate(Nil, rewritten.asInstanceOf[NamedExpression] :: Nil, Distinct(c))
} else {
plan
}
}
} With tests of course :) See |
hi @marmbrus , can you share other plans of modifying aggregates that you mentioned earlier? Can I help with that ? Otherwise i'll modify this one for now as you have suggested. |
Here is the JIRA: SPARK-4366. Unless you think you will have something in the next day or two, would you mind closing this PR. I'd like to keep the PR queue to only active issues so that we don't missing things. Thanks! |
thanks @marmbrus . Let me refactor this then and open another PR later. |
Currently the plan for count distinct looks like this :
Aggregate false, [snAppProtocol#448], [CombineAndCount(partialSets#513) AS _c0#437L]
Exchange SinglePartition
Aggregate true, [snAppProtocol#448], [snAppProtocol#448,AddToHashSet(snAppProtocol#448) AS partialSets#513]
!OutputFaker [snAppProtocol#448]
ParquetTableScan [snAppProtocol#587], (ParquetRelation hdfs://192.168.160.57:9000/data/collector/13/11/14, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6b1ed434, [ptime#443], ptime=2014-11-13 00%3A55%3A00), []
This can be slow if there are too many distinct values in a column. This PR changes the above plan to :
Aggregate false, [], [SUM(_c0#437L) AS totalCount#514L]
Exchange SinglePartition
Aggregate false, [snAppProtocol#448], [CombineAndCount(partialSets#513) AS _c0#437L]
Exchange (HashPartitioning [snAppProtocol#448], 200)
Aggregate true, [snAppProtocol#448], [snAppProtocol#448,AddToHashSet(snAppProtocol#448) AS partialSets#513]
!OutputFaker [snAppProtocol#448]
ParquetTableScan [snAppProtocol#587], (ParquetRelation hdfs://192.168.160.57:9000/data/collector/13/11/14, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.hive.HiveContext@6b1ed434, [ptime#443], ptime=2014-11-13 00%3A55%3A00), []
This way even if there are too many distinct values; we insert them into partial maps and computation remains distributed and thus faster.