Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1937: fix issue with task locality #892

Closed
wants to merge 15 commits into from

Conversation

lirui-apache
Copy link

Don't check executor/host availability when creating a TaskSetManager. Because the executors may haven't been registered when the TaskSetManager is created, in which case all tasks will be considered "has no preferred locations", and thus losing data locality in later scheduling.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented May 27, 2014

This is better handled in user code and not in scheduler.
We need a way to surface how many executors have registered - but once that is available, spinning until some minimum threshold of executors is available should be done in user code and then start computations.

Ofcourse, if there is interest, this could be abstracted out for common use in contrib.

Currently, for our jobs, we have an arbitrary sleep introduced which is based on expectation of cluster load.

One unstated impact of this, which is not mentioned in the PR description, is that default parallelism is determined (in yarn mode atleast) by the number of available executors.
So the number of reducers used can be very low resulting for the first (few) jobs - which might have changed 'later' when reducers actually kick in.

@rxin
Copy link
Contributor

rxin commented May 27, 2014

Tagging @kayousterhout ...

@kayousterhout
Copy link
Contributor

Yeah I agree with @mridulm. It seems like, with this change, you're counting on the executors to finish registering before the delay threshold runs out or before all of the tasks have been run. It seems cleaner to just wait until enough executors have started before launching jobs.

@lirui-apache
Copy link
Author

If I understand, the application cannot control how many executors to launch (at least with the standalone mode). So I suppose it's difficult for the user to determine when he/she gets "enough" executors.

Another possible way to solve this issue is that when the TaskScheduler finds new executors/hosts in the resource offer, it can inform the TaskSetManager to re-compute the pending lists. TaskSetManager currently handles event like "executor lost", maybe it should also handle "executor added"?

I have another concern about pendingTasksWithNoPrefs in TaskSetManager. This list now contains two kinds of tasks: tasks that truly have no preferred locations, and tasks whose preferred locations are not available. Tasks in this list are considered as PROCESS_LOCAL, which means they take precedence over tasks in NODE_LOCAL and RACK_LOCAL lists. Therefore in some scenario (the allowed locality is PROCESS_LOCAL), TaskSetManager may prefer tasks whose desired location is unavailable, to tasks that can run with some locality. Not sure if this behavior is intended?

@lirui-apache
Copy link
Author

I've made some modifications, please help to see if this makes sense :)
@mridulm @rxin @kayousterhout

// Re-compute the pending lists. This should be called when new executor is added
def reAddPendingTasks() {
logInfo("Re-computing pending task lists.")
for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you filter out tasks that already have a copy running? Seems like you might as well use the better location in the event of task failure or speculation

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually this is the same filter used in "findTaskFromList". TaskSetManager will re-add failed task to pending lists, so it can have the updated executor availability. For speculative task, TaskSetManager won't check the availability of executors in "findSpeculativeTask". Please correct me if I misunderstand anything.

@kayousterhout
Copy link
Contributor

It looks like when you re-add tasks after an executor is added, tasks that newly have one of their preferred locations available are not removed from the pendingTasksWithNoPrefs list? Doesn't that mean that the task's locality constraints can still get ignored?

@@ -111,6 +111,8 @@ private[spark] class TaskSchedulerImpl(
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, why are you adding a flag for this instead of always leaving it on? It seems like something we should have on all the time.

@mateiz
Copy link
Contributor

mateiz commented Jun 5, 2014

Hei @lirui-intel, I talked about this a bit with Kay and we think there's a simpler and more general approach.

  • In addPendingTask, always add the task to host, rack and executor-specific lists even if those executors aren't currently up (i.e. remove the current if statements that present that); then add it to pendingTasksWithNoPrefs only if there were no such executors up
  • When a new executor comes up, remove any tasks from pendingTasksWithNoPrefs that have preferred locations on that executor / host / rack; now locality-aware scheduling will automatically happen for those

One other thing to watch out for will be the time complexity of these operations -- we don't want linear-time stuff in the size of pendingTasksWithNoPrefs if we can help it because there might be hundreds of executors coming up over time. But we can worry about that once we have an implementation.

This feature is great to have overall though because it will allow dynamic resizing of clusters (e.g. on YARN we might scale up and down based on whether we have pending task sets).

@lirui-apache
Copy link
Author

Yes @mateiz great idea. One quick question is that tasks in pendingTasksWithNoPrefs are considered as PROCESS_LOCAL. Suppose we have no tasks in pendingTasksForExecutor, but some tasks in pendingTasksForHost which are NODE_LOCAL. And also some tasks in pendingTasksWithNoPrefs whose preferred locations are not available. Now if the current allowed locality is PROCESS_LOCAL, TaskSetManager will pick tasks from pendingTasksWithNoPrefs rather than pendingTasksForHost.

@mateiz
Copy link
Contributor

mateiz commented Jun 5, 2014

I see, we might be able to fix that as a separate issue. We could count pendingTasksWithNoPrefs as being at a "special" level we try only after we try the real locality levels. But that seems independent of this patch, maybe you can report it as a separate JIRA.

@lirui-apache
Copy link
Author

Hi @mateiz , I think we should distinguish between tasks that truly have no preference, and tasks whose preference is unavailable when picking tasks from pendingTasksWithNoPrefs. If a task really have no preference, it should indeed be considered as PROCESS_LOCAL because it runs equally well anywhere. And it should take precedence over NODE_LOCAL tasks. For tasks whose preference is unavailable, they should wait until we try all the real locality levels.
Maybe we should firstly figure out how to deal with this pendingTasksWithNoPrefs? Because otherwise it could be difficult to even verify if the patch works.

@kayousterhout
Copy link
Contributor

My understanding is that the problem mentioned only exists when a job has a combination of tasks with constraints and tasks without constraints. AFAIK this is not a common case at all -- since a stage will either be the first in the job, in which case all tasks have locality constraints on the input data, or will be a reduce stage, in which case none of the tasks have locality constraints.

@lirui-apache
Copy link
Author

That's true @kayousterhout , but my point is that what if all the tasks only specify NODE_LOCAL preference (common case when the RDD is created from some HDFS file)? Then tasks in pendingTasksWithNoPrefs (considered PROCESS_LOCAL) will get priority to run. I ran into this during the test for the patch.
Is it possible that we set some time out waiting for new executors to come up? Before the time out, only tasks without preference (if any) in pendingTasksWithNoPrefs can run as PROCESS_LOCAL. After this period, we believe it's unlikely new executors gonna come up, so all tasks in pendingTasksWithNoPrefs can be treated as PROCESS_LOCAL (pendingTasksWithNoPrefs can still be re-computed if new executors do come up later).

@lirui-apache
Copy link
Author

I've revised the patch as @mateiz suggested: tasks will be added to corresponding lists even when preferred location is unavailable, in which case it'll also be added to pendingTasksWithNoPrefs. Later when new executor comes up, pendingTasksWithNoPrefs is re-computed to remove tasks that have available preferred locations.
I also added a "waiting for new executor" period. During that period, tasks in pendingTasksWithNoPrefs will wait until we tried all other locality levels (unless they really have no constraints). After this period, tasks left in pendingTasksWithNoPrefs will be scheduled as PROCESS_LOCAL.
Please help to review this, thanks.

val conf = sched.sc.conf

// The period we wait for new executors to come up
// After this period, tasks in pendingTasksWithNoPrefs will be considered as PROCESS_LOCAL
private val WAIT_NEW_EXEC_TIMEOUT = conf.getLong("spark.scheduler.waitNewExecutorTime", 3000L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something the application can just do: if it wants to wait 3 seconds before scheduling anything on non-local executors, just sleep for 3 seconds before trying to launch any jobs? I'm wary of adding more config options to the scheduler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This waiting period is only intended for pendingTasksWithNoPrefs. Suppose pendingTasksWithNoPrefs contains tasks whose preference is unavailable. Within this waiting period, we want to try pendingTasksForExecutor, pendingTasksForHost and pendingTasksForRack first because tasks in these lists do have some locality. And when an executor is added, we remove tasks newly have locality from pendingTasksWithNoPrefs. Then after the waiting period, we believe no executor will come for tasks still remain in pendingTasksWithNoPrefs. So they can be shceduled as PROCESS_LOCAL.
You can see tasks in pendingTasksForHost can still get scheduled even within the period. We're just holding back on pendingTasksWithNoPrefs. I think it's better than holding back the whole application and schedule nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to put this here because it will apply to every TaskSet, no matter how late into the application it was submitted, so you'll get a 3-second latency on every TaskSet that is missing one of its preferred nodes. Can we not add this as part of this patch, and simply make the change to put tasks in the node- and rack-local lists even if no nodes are available in those right now? Then later we can update the code that calls resourceOffer to treat tasks that have preferred locations but are missing executors for them specially.

@kayousterhout
Copy link
Contributor

@lirui-intel I still don't understand the problem you mentioned: in TaskSetManager.findTask(), it tries to schedule process local, node local, and rack local tasks before looking at tasks from pendingTasksWithNoPrefs. So how can a task from pendingTasksWithNoPrefs be scheduled before a PROCESS_LOCAL one? (I see that tasks from pendingTasksWithNoPrefs are marked with locality level PROCESS_LOCAL when they're launched -- but this doesn't look like it affects the scheduling order)

@lirui-apache
Copy link
Author

Hi @kayousterhout , let's consider a map stage whose tasks all have NODE_LOCAL preference. So pendingTasksForExecutor is empty and all tasks are added to pendingTasksForHost. If at the beginning no node is available, then all tasks will also be added to pendingTasksWithNoPrefs (as @mateiz suggested). When we look for a task to launch, we always try to launch a PROCESS_LOCAL one first. Suppose now an executor comes in and it can satisfy some tasks in pendingTasksForHost. However, since we always try with PROCESS_LOCAL first, pendingTasksForHost will simply be skipped in TaskSetManager.findTask. And we end up picking a task in pendingTasksWithNoPrefs. You can see there's an if statement when trying to pick tasks from pendingTasksForHost and pendingTasksForRack, to test if we currently allow such "low" locality level. But tasks in pendingTasksForExecutor and pendingTasksWithNoPrefs are picked unconditionally since they are considered have the highest level.

@@ -725,10 +723,12 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should be indented with 2 more spaces:

if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
    pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
  levels += PROCESS_LOCAL
}

@mateiz
Copy link
Contributor

mateiz commented Jun 21, 2014

Hey @lirui-intel, sorry for taking a while to get back to this, but it looks good to me. Iterating through only the TaskSet's valid locality levels sounds fine. I made some small comments on the code to improve it but once those are done it should be good to merge.

@lirui-apache
Copy link
Author

Sorry about the code style and thanks @mateiz for pointing out. I've updated the patch.

@mateiz
Copy link
Contributor

mateiz commented Jun 23, 2014

Jenkins, this is ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16037/

@asfgit asfgit closed this in 924b708 Jun 24, 2014
@mateiz
Copy link
Contributor

mateiz commented Jun 24, 2014

Thanks Rui. I've merged this in now.

@lirui-apache
Copy link
Author

Thanks everybody :-)

pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This test ensures that when there are no
alive executors that satisfy a particular locality level,
the TaskSetManager doesn't ever use that as the maximum
allowed locality level (this optimization ensures that a
job doesn't wait extra time in an attempt to satisfy
a scheduling locality level that is impossible).

@mateiz and @lirui-intel this unit test illustrates an issue
with apache#892 (it fails with that patch).

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes apache#1024 from kayousterhout/scheduler_unit_test and squashes the following commits:

de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Don't check executor/host availability when creating a TaskSetManager. Because the executors may haven't been registered when the TaskSetManager is created, in which case all tasks will be considered "has no preferred locations", and thus losing data locality in later scheduling.

Author: Rui Li <rui.li@intel.com>
Author: lirui-intel <rui.li@intel.com>

Closes apache#892 from lirui-intel/delaySchedule and squashes the following commits:

8444d7c [Rui Li] fix code style
fafd57f [Rui Li] keep locality constraints within the valid levels
18f9e05 [Rui Li] restrict allowed locality
5b3fb2f [Rui Li] refine UT
99f843e [Rui Li] add unit test and fix bug
fff4123 [Rui Li] fix computing valid locality levels
685ed3d [Rui Li] remove delay shedule for pendingTasksWithNoPrefs
7b0177a [Rui Li] remove redundant code
c7b93b5 [Rui Li] revise patch
3d7da02 [lirui-intel] Update TaskSchedulerImpl.scala
cab4c71 [Rui Li] revised patch
539a578 [Rui Li] fix code style
cf0d6ac [Rui Li] fix code style
3dfae86 [Rui Li] re-compute pending tasks when new host is added
a225ac2 [Rui Li] SPARK-1937: fix issue with task locality
@lirui-apache lirui-apache deleted the delaySchedule branch August 25, 2014 06:17
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This test ensures that when there are no
alive executors that satisfy a particular locality level,
the TaskSetManager doesn't ever use that as the maximum
allowed locality level (this optimization ensures that a
job doesn't wait extra time in an attempt to satisfy
a scheduling locality level that is impossible).

@mateiz and @lirui-intel this unit test illustrates an issue
with apache#892 (it fails with that patch).

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes apache#1024 from kayousterhout/scheduler_unit_test and squashes the following commits:

de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Don't check executor/host availability when creating a TaskSetManager. Because the executors may haven't been registered when the TaskSetManager is created, in which case all tasks will be considered "has no preferred locations", and thus losing data locality in later scheduling.

Author: Rui Li <rui.li@intel.com>
Author: lirui-intel <rui.li@intel.com>

Closes apache#892 from lirui-intel/delaySchedule and squashes the following commits:

8444d7c [Rui Li] fix code style
fafd57f [Rui Li] keep locality constraints within the valid levels
18f9e05 [Rui Li] restrict allowed locality
5b3fb2f [Rui Li] refine UT
99f843e [Rui Li] add unit test and fix bug
fff4123 [Rui Li] fix computing valid locality levels
685ed3d [Rui Li] remove delay shedule for pendingTasksWithNoPrefs
7b0177a [Rui Li] remove redundant code
c7b93b5 [Rui Li] revise patch
3d7da02 [lirui-intel] Update TaskSchedulerImpl.scala
cab4c71 [Rui Li] revised patch
539a578 [Rui Li] fix code style
cf0d6ac [Rui Li] fix code style
3dfae86 [Rui Li] re-compute pending tasks when new host is added
a225ac2 [Rui Li] SPARK-1937: fix issue with task locality
Agirish pushed a commit to HPEEzmeral/apache-spark that referenced this pull request May 5, 2022
wangyum added a commit that referenced this pull request May 26, 2023
* [CARMEL-5851] Push partial aggregate through join  (#999)

* [CARMEL-5851] Push partial aggregate through join (#977)

* [CARMEL-5851] Make partial aggregation adaptive (#892)

* Make partial aggregation adaptive

* Fix codegen

* fix

* Support group only

* Fix test error

* Only support deterministic

* Add another config

* Fix data issue

* fix

* Remove isSupportPartialAgg

* Fix

* Deduplicate right side of left semi anti join (#893)

* DeduplicateRightSideOfLeftSemiAntiJoin

* Fix test

* Add test

* Introduce stats

* Fix

* PushPartialAggregationThroughJoin

* PushPartialAggregationThroughJoin

* isPartialAgg = true,

* push project through join

* Add PullOutGroupingExpressions and reduce changes

* val (leftProjectList, rightProjectList, remainingProjectList) =
      split(projectList ++ join.condition.map(_.references.toSeq).getOrElse(Nil),
        join.left, join.right)

* Fix java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression cannot be cast to org.apache.spark.sql.catalyst.expressions.NamedExpression
	at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:297)
	at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:392)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.$anonfun$main$1(BenchmarkAndVerifyResult.scala:156)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike.map(TraversableLike.scala:238)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
	at scala.collection.immutable.List.map(List.scala:298)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.main(BenchmarkAndVerifyResult.scala:144)
	at com.ebay.carmel.spark.BenchmarkAndVerifyResult.main(BenchmarkAndVerifyResult.scala)

* Fix TPCDS q3 reuslt incorrect:
```
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>   dt.d_year,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand_id brand_id,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand brand,
. . . . . . . . . . . . . . . . . . . . . . .>   SUM(cast(ss_ext_sales_price as decimal(17, 2))) sum_agg
. . . . . . . . . . . . . . . . . . . . . . .> FROM date_dim dt, store_sales, item
. . . . . . . . . . . . . . . . . . . . . . .> WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND store_sales.ss_item_sk = item.i_item_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND item.i_manufact_id = 128
. . . . . . . . . . . . . . . . . . . . . . .>   AND dt.d_moy = 11
. . . . . . . . . . . . . . . . . . . . . . .> GROUP BY dt.d_year, item.i_brand, item.i_brand_id
. . . . . . . . . . . . . . . . . . . . . . .> ORDER BY dt.d_year, sum_agg DESC, brand_id, brand
. . . . . . . . . . . . . . . . . . . . . . .> LIMIT 10;
+---------+-----------+---------------------+--------------+
| d_year  | brand_id  |        brand        |   sum_agg    |
+---------+-----------+---------------------+--------------+
| 1998    | 2003001   | exportiimporto #1   | 43900603.69  |
| 1998    | 1002001   | importoamalg #1     | 35836273.32  |
| 1998    | 1004001   | edu packamalg #1    | 35775953.92  |
| 1998    | 5001001   | amalgscholar #1     | 35538345.92  |
| 1998    | 4001001   | amalgedu pack #1    | 35317861.64  |
| 1998    | 5004001   | edu packscholar #1  | 35302613.66  |
| 1998    | 3003001   | exportiexporti #1   | 35006929.11  |
| 1998    | 2004001   | edu packimporto #1  | 26473180.83  |
| 1998    | 4002001   | importoedu pack #1  | 26176292.12  |
| 1998    | 2002001   | importoimporto #1   | 26171441.74  |
+---------+-----------+---------------------+--------------+
10 rows selected (5.041 seconds)
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>   dt.d_year,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand_id brand_id,
. . . . . . . . . . . . . . . . . . . . . . .>   item.i_brand brand,
. . . . . . . . . . . . . . . . . . . . . . .>   SUM(ss_ext_sales_price) sum_agg
. . . . . . . . . . . . . . . . . . . . . . .> FROM date_dim dt, store_sales, item
. . . . . . . . . . . . . . . . . . . . . . .> WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND store_sales.ss_item_sk = item.i_item_sk
. . . . . . . . . . . . . . . . . . . . . . .>   AND item.i_manufact_id = 128
. . . . . . . . . . . . . . . . . . . . . . .>   AND dt.d_moy = 11
. . . . . . . . . . . . . . . . . . . . . . .> GROUP BY dt.d_year, item.i_brand, item.i_brand_id
. . . . . . . . . . . . . . . . . . . . . . .> ORDER BY dt.d_year, sum_agg DESC, brand_id, brand
. . . . . . . . . . . . . . . . . . . . . . .> LIMIT 10;
+---------+-----------+----------------------+--------------+
| d_year  | brand_id  |        brand         |   sum_agg    |
+---------+-----------+----------------------+--------------+
| 1998    | 2003001   | exportiimporto #1    | 15851205.06  |
| 1998    | 3003001   | exportiexporti #1    | 12790869.96  |
| 1998    | 5001001   | amalgscholar #1      | 12763633.47  |
| 1998    | 1004001   | edu packamalg #1     | 12603183.68  |
| 1998    | 4001001   | amalgedu pack #1     | 12268486.99  |
| 1998    | 5004001   | edu packscholar #1   | 11667142.19  |
| 1998    | 1002001   | importoamalg #1      | 11336379.88  |
| 1998    | 2004001   | edu packimporto #1   | 9165179.56   |
| 1998    | 2002001   | importoimporto #1    | 9148014.59   |
| 1998    | 4004001   | edu packedu pack #1  | 8314235.98   |
+---------+-----------+----------------------+--------------+
10 rows selected (8.508 seconds)
```

Try to fix BigInteger out of long range
22/04/23 00:08:59 ERROR Executor: Exception in task 294.3 in stage 3.0 of app application_1644958298137_48668 (TID 471)
java.lang.ArithmeticException: BigInteger out of long range
at java.math.BigInteger.longValueExact(BigInteger.java:4632)
at org.apache.spark.sql.types.Decimal.toUnscaledLong(Decimal.scala:220)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.setDecimal(UnsafeRow.java:281)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregate_sum_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeysOutput_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:50)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:730)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.rdd.RDD$$anon$2.hasNext(RDD.scala:332)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:176)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:486)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1391)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

* Fix tpcds q93 RuntimeException: Couldn't find sr_return_quantity#34 in [ss_item_sk#3,ss_customer_sk#4,ss_ticket_number#10L,sr_item_sk#26,sr_reason_sk#32,sr_ticket_number#33]
              	at scala.sys.package$.error(package.scala:30)
              	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
              	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
              	... 217 more

* Fix bbensid q1 TreeNodeException:
```
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: item_id#1077
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:329)
```
```sql
SELECT

	u.user_cntry_id AS byr_cntry_id,

	l.item_site_id AS list_site_id,

	l.auct_type_code,

	cat.sap_category_id AS sap_id,

	bb.src_cre_dt,

	COUNT(bb.item_id) AS blocked_bids,

	COUNT(DISTINCT u.prmry_user_id) AS blocked_buyers

FROM

	access_views.dw_tns_blkd_bid bb

	INNER JOIN access_views.dw_users u ON (bb.byr_id = u.user_id)

	INNER JOIN access_views.dw_lstg_item l ON (bb.item_id = l.item_id)

	INNER JOIN access_views.dw_category_groupings cat ON (l.leaf_categ_id = cat.leaf_categ_id AND l.item_site_id = cat.site_id)

WHERE

	bb.src_cre_dt BETWEEN '2021-07-12' AND '2021-07-15'

	AND l.auct_end_dt >= '2021-07-12'

GROUP BY

	1,2,3,4,5
```

* Fix bbensid q194 NPE:
```
    spark.sql("create table t1(a bigint, b string) using parquet")
    spark.sql("create table t2(x bigint, y string) using parquet")

    spark.sql("insert into t1 values(1, 1), (2, 2)")
    spark.sql("insert into t2 values(1, 1)")

    sql("SELECT distinct COALESCE(t2.y, '100') AS rev_rollup2 FROM t1 left JOIN t2 ON t1.a = t2.x").collect().foreach(println)
    sql("SELECT distinct rev_rollup2 FROM t1 left JOIN (select x,COALESCE(t2.y, '100') AS rev_rollup2 from t2) t2 ON t1.a = t2.x").collect().foreach(println)
```

```
0: jdbc:hive2://10.211.174.26:10000/access_vi> create table t1(a bigint, b string) using parquet;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.816 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> create table t2(x bigint, y string) using parquet;
+---------+
| Result  |
+---------+
+---------+
No rows selected (0.95 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> insert into t1 values(1, 1), (2, 2);
+---------+
| Result  |
+---------+
+---------+
No rows selected (14.762 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> insert into t2 values(1, 1);
+---------+
| Result  |
+---------+
+---------+
No rows selected (20.527 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> SELECT distinct COALESCE(t2.y, '100') AS rev_rollup2 FROM t1 left JOIN t2 ON t1.a = t2.x;
+--------------+
| rev_rollup2  |
+--------------+
| 100          |
| 1            |
+--------------+
2 rows selected (8.685 seconds)
0: jdbc:hive2://10.211.174.26:10000/access_vi> SELECT distinct rev_rollup2 FROM t1 left JOIN (select x,COALESCE(t2.y, '100') AS rev_rollup2 from t2) t2 ON t1.a = t2.x;
+--------------+
| rev_rollup2  |
+--------------+
| NULL         |
| 1            |
+--------------+
2 rows selected (2.02 seconds)
```

* Enhance: OUTER joins are supported for group by without aggregate functions

* ColumnPruning and CollapseProject support PartialAggregate

* Fix bbendis q65 reuslt incorrect:
```sql
spark-sql> SELECT
         >     w.src_cre_dt,
         >     w.site_id,
         >     l.auct_type_code,
         >     w.vstr_yn_id,
         >     COUNT(w.item_id) AS watches,
         >     count(*)
         > FROM
         >     access_views.dw_myebay_wtch_trk w
         >     INNER JOIN access_views.dw_lstg_item l ON (w.item_id = l.item_id)
         > WHERE
         >     w.src_cre_dt BETWEEN '2021-07-08' AND '2021-07-15'
         >     AND l.auct_end_dt >= '2021-07-08'
         >     AND w.cnvrted_yn_id = 0
         > GROUP BY
         >     1,2,3,4
         > ORDER by 1,2,3,4 limit 5;
```
2021-07-08	0	1	0	4929026	4930784
2021-07-08	0	7	0	711405	711413
2021-07-08	0	8	0	154	154
2021-07-08	0	9	0	6097525	6097948
2021-07-08	0	13	0	123415	123482

rewrite:
```sql
SELECT
	w.src_cre_dt,
	w.site_id,
	l.auct_type_code,
	w.vstr_yn_id,
	COUNT(w.item_id) AS watches,
	sum(w.cnt * l.cnt) AS watches2
FROM
	(select src_cre_dt, site_id, vstr_yn_id, item_id, COUNT(item_id) as cnt from access_views.dw_myebay_wtch_trk where src_cre_dt BETWEEN '2021-07-08' AND '2021-07-15' and cnvrted_yn_id = 0 group by src_cre_dt, site_id, vstr_yn_id, item_id) w
	INNER JOIN (select auct_type_code, item_id, COUNT(*) as cnt from access_views.dw_lstg_item where auct_end_dt >= '2021-07-08' group by auct_type_code, item_id) l ON (w.item_id = l.item_id)

GROUP BY
	1,2,3,4
```

* Fix should not push count aggregate expression if groupingExpressions is empty:
```
spark-sql> create table t1(id int) using parquet;
spark-sql> select count(*) from t1;
0
spark-sql> select sum(0) from t1;
NULL
```

* Fix bbendis q323 RuntimeException:
```sql
0: jdbc:hive2://10.211.174.151:10000/access_v> SELECT
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    COALESCE(u.prmry_user_id, a.user_id) AS parent_uid,
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    CAST(modified_date AS DATE) AS modified_date
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 FROM
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    access_views.dw_user_past_aliases a
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                    INNER JOIN access_views.dw_users u ON (a.user_id = u.user_id)
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 WHERE
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 a.alias_flag = '2'
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 GROUP BY
. . . . . . . . . . . . . . . . . . . . . . .>
. . . . . . . . . . . . . . . . . . . . . . .>                 1,2
. . . . . . . . . . . . . . . . . . . . . . .> limit 1;
Error: Error running query: java.lang.RuntimeException: Couldn't find _groupingexpression#174653 in [_groupingexpression#174654] (state=,code=0)
```

* Refactor the code

* Support range join case:
```sql

use access_views;

CREATE TEMPORARY TABLE DATE_RANGE AS
(
SELECT
  CAL_DT,
  RETAIL_WEEK,
  RETAIL_YEAR
, TRIM(CAST(RETAIL_YEAR AS INT)) || 'W' || TRIM(SUBSTR(CAST(CAST(RETAIL_WEEK+1000 AS INT) AS VARCHAR(20)), 3)) AS WEEK_ID
, RTL_WEEK_BEG_DT AS WEEK_BEG_DT
, RETAIL_WK_END_DATE AS WEEK_END_DT
, MONTH_BEG_DT, MONTH_END_DT, MONTH_ID
, QTR_BEG_DT, QTR_END_DT, QTR_ID
, YEAR_ID
FROM DW_CAL_DT
WHERE 1=1
AND CAL_DT BETWEEN DATE'2020-01-01' AND CURRENT_DATE
);

create temp table t11 using parquet as
SELECT D.CAL_DT,
	   COUNT(DISTINCT LSTG.ITEM_ID) ITEM_NUM
FROM DATE_RANGE D
INNER JOIN
	(SELECT HOT.AUCT_START_DT,
		   HOT.AUCT_END_DT,
		   HOT.ITEM_ID
	FROM  ACCESS_VIEWS.DW_LSTG_ITEM HOT
	INNER JOIN ACCESS_VIEWS.DW_CATEGORY_GROUPINGS CATE ON CATE.SITE_ID = HOT.ITEM_SITE_ID AND CATE.LEAF_CATEG_ID = HOT.LEAF_CATEG_ID
	INNER JOIN ACCESS_VIEWS.SSA_CURNCY_PLAN_RATE_DIM FX ON FX.CURNCY_ID = HOT.LSTG_CURNCY_ID
	INNER JOIN ACCESS_VIEWS.DW_ITEMS_SHIPPING SHIP ON HOT.ITEM_ID=SHIP.ITEM_ID AND HOT.ITEM_VRSN_ID=SHIP.ITEM_VRSN_ID
	WHERE 1 = 1
	AND HOT.AUCT_TYPE_CODE NOT IN (10,15)
	AND HOT.ITEM_SITE_ID <> 223
	AND CATE.SAP_CATEGORY_ID NOT IN (5,7,23,41,-999)) LSTG
ON D.CAL_DT BETWEEN LSTG.AUCT_START_DT AND LSTG.AUCT_END_DT
GROUP BY 1;

```

* Fix bbendis q311 introduce 2 PartialAggregates:
```sql
== Optimized Logical Plan ==
Aggregate [session_start_dt#84199, site_id#84163, _groupingexpression#84731], [session_start_dt#84199, site_id#84163, count(if ((gid#84733 = 4)) CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737 else null) AS sign_in_visit#84095L, count(if ((gid#84733 = 5)) CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735 else null) AS access_succ#84096L, count(if ((gid#84733 = 6)) CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736 else null) AS sign_in_succ#84097L, count(if ((gid#84733 = 1)) CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739 else null) AS reg_succ#84098L, count(if ((gid#84733 = 2)) CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738 else null) AS gxo_succ#84099L, count(if ((gid#84733 = 3)) s165.`guid`#84734 else null) AS tot_visitors#84100L, _groupingexpression#84731 AS experience#84101], Statistics(sizeInBytes=2.96E+38 B)
+- Aggregate [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], Statistics(sizeInBytes=5.70E+38 B)
   +- Expand [Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, null, null, CASE WHEN (type_1#84089 = reg_suc) THEN guid#80355 END, 1), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, null, CASE WHEN (type_1#84089 = gxo_suc) THEN guid#80355 END, null, 2), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, guid#84161, null, null, null, null, null, 3), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, null, CASE WHEN (type_1#84089 = sign_in_visit) THEN guid#80355 END, null, null, 4), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, CASE WHEN type_1#84089 IN (sign_in_suc,reg_suc,gxo_suc) THEN guid#80355 END, null, null, null, null, 5), Vector(session_start_dt#84199, site_id#84163, _groupingexpression#84731, null, null, CASE WHEN (type_1#84089 = sign_in_suc) THEN guid#80355 END, null, null, null, 6)], [session_start_dt#84199, site_id#84163, _groupingexpression#84731, s165.`guid`#84734, CASE WHEN (av.`type_1` IN ('sign_in_suc', 'reg_suc', 'gxo_suc')) THEN spark_catalog.ubi_t.ubi_event.`guid` END#84735, CASE WHEN (av.`type_1` = 'sign_in_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84736, CASE WHEN (av.`type_1` = 'sign_in_visit') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84737, CASE WHEN (av.`type_1` = 'gxo_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84738, CASE WHEN (av.`type_1` = 'reg_suc') THEN spark_catalog.ubi_t.ubi_event.`guid` END#84739, gid#84733], Statistics(sizeInBytes=5.70E+38 B)
      +- Project [guid#84161, site_id#84163, session_start_dt#84199, guid#80355, type_1#84089, CASE WHEN (cobrand#84164 = 0) THEN dWeb WHEN (cobrand#84164 = 7) THEN FSoM WHEN ((cobrand#84164 = 6) AND primary_app_id#84182 IN (1462,2878)) THEN iOS WHEN ((cobrand#84164 = 6) AND (primary_app_id#84182 = 2571)) THEN Android WHEN ((cobrand#84164 = 6) AND (primary_app_id#84182 = 3564)) THEN mWeb ELSE Other END AS _groupingexpression#84731], Statistics(sizeInBytes=5.65E+37 B)
         +- Join LeftOuter, ((((guid#80355 = guid#84161) AND (session_skey#84201L = session_skey#84162L)) AND (session_start_dt#84203 = session_start_dt#84199)) AND (site_id#84714 = cast(site_id#84163 as decimal(10,0)))), Statistics(sizeInBytes=6.43E+37 B)
            :- Project [guid#84161, session_skey#84162L, site_id#84163, cobrand#84164, session_start_dt#84199, primary_app_id#84182], Statistics(sizeInBytes=11.9 GiB)
            :  +- Filter (((((isnotnull(exclude#84189) AND (exclude#84189 = 0)) AND isnotnull(session_start_dt#84199)) AND NOT cast(cobrand#84164 as int) IN (2,3,4,5,9)) AND (session_start_dt#84199 >= 2021-07-14)) AND (session_start_dt#84199 <= 2021-07-15)), Statistics(sizeInBytes=77.0 GiB)
            :     +- Relation p_soj_cl_t.clav_session[guid#84161,session_skey#84162L,site_id#84163,cobrand#84164,cguid#84165,buyer_site_id#84166,lndg_page_id#84167,start_timestamp#84168,end_timestamp#84169,exit_page_id#84170,valid_page_count#84171,gr_cnt#84172,gr_1_cnt#84173,vi_cnt#84174,homepage_cnt#84175,myebay_cnt#84176,signin_cnt#84177,min_sc_seqnum#84178,max_sc_seqnum#84179,signedin_user_id#84180,mapped_user_id#84181,primary_app_id#84182,agent_id#84183L,session_cntry_id#84184,... 15 more fields] parquet, Statistics(sizeInBytes=77.0 GiB)
            +- Union, Statistics(sizeInBytes=5.05E+27 B)
               :- Aggregate [session_start_dt#84203, guid#80355, session_skey#84201L, site_id#84205, _groupingexpression#84732], [session_start_dt#84203, guid#80355, session_skey#84201L, cast(site_id#84205 as decimal(10,0)) AS site_id#84714, _groupingexpression#84732 AS type_1#84089], Statistics(sizeInBytes=8.0 TiB)
               :  +- Project [GUID#80355, SESSIONSKEY#80356L AS SESSION_SKEY#84201L, cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) AS SESSION_START_DT#84203, SITEID#80361 AS SITE_ID#84205, CASE WHEN ((PAGEID#80363 IN (4853,2487283,2487285) AND (RDT#80376 = 0)) OR PAGEID#80363 IN (2050445,2050533)) THEN sign_in_visit WHEN PAGEID#80363 IN (2052190,2053938) THEN reg_suc WHEN PAGEID#80363 IN (4852,2051246,2266111) THEN sign_in_suc END AS _groupingexpression#84732], Statistics(sizeInBytes=7.5 TiB)
               :     +- Filter (((((isnotnull(SESSIONSKEY#80356L) AND isnotnull(cast(SITEID#80361 as decimal(10,0)))) AND isnotnull(guid#80355)) AND (cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) >= 2021-07-14)) AND (cast(concat(substr(dt#80385, 0, 4), -, substr(dt#80385, 5, 2), -, substr(dt#80385, 7, 2)) as date) <= 2021-07-15)) AND ((((((PAGEID#80363 = 4853) AND (PAGENAME#80364 = signin2)) AND ((lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnTabClick)) = signin) OR isnull(lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnTabClick))))) OR (PAGEID#80363 IN (2052190,2053938) AND (lower(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,type)) = reg_confirm))) OR (((PAGEID#80363 = 4852) AND isnotnull(cast(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,uid) as decimal(18,0)))) AND isnull(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnFastFYPReset)))) OR (((((PAGEID#80363 = 2266111) AND (cast(HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnStatus) as int) = 0)) AND (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,sgnChannelType) IN (0,2) AND NOT (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,authMethod) = guest_id_token))) OR PAGEID#80363 IN (2050445,2050533,2051246)) OR (PAGEID#80363 IN (2487283,2487285) AND NOT (HiveSimpleUDF#com.ebay.hadoop.udf.soj.SojTagFetcher(APPLICATIONPAYLOAD#80370,SigninRedirect) = V4))))), Statistics(sizeInBytes=46.7 TiB)
               :        +- Relation ubi_t.ubi_event[guid#80355,sessionskey#80356L,seqnum#80357,sessionstartdt#80358L,sojdatadt#80359L,clickid#80360,siteid#80361,version#80362,pageid#80363,pagename#80364,refererhash#80365L,eventtimestamp#80366L,urlquerystring#80367,clientdata#80368,cookies#80369,applicationpayload#80370,webserver#80371,referrer#80372,userid#80373,itemid#80374L,flags#80375,rdt#80376,regu#80377,sqr#80378,... 8 more fields] parquet, Statistics(sizeInBytes=46.7 TiB)
               +- Aggregate [sess_session_start_dt#84669, sess_guid#84665, sess_session_skey#84666L, sess_site_id#84670], [sess_session_start_dt#84669 AS session_start_dt#84090, sess_guid#84665 AS guid#84091, sess_session_skey#84666L AS session_skey#84092L, cast(sess_site_id#84670 as decimal(10,0)) AS site_id#84715, gxo_suc AS type_1#84094], Statistics(sizeInBytes=5.05E+27 B)
                  +- Project [sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=3.56E+27 B)
                     +- Join Inner, ((((item_id#84428 = item_id#84639) AND (transaction_id#84436 = transaction_id#84640)) AND (auct_end_dt#84429 = auct_end_dt#84641)) AND (created_dt#84440 = created_dt#84650)), Statistics(sizeInBytes=7.12E+27 B)
                        :- PartialAggregate [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], Statistics(sizeInBytes=206.1 TiB)
                        :  +- Project [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440], Statistics(sizeInBytes=206.1 TiB)
                        :     +- Join LeftOuter, (cast(lstg_curncy_id#84474 as decimal(9,0)) = curncy_id#72721), Statistics(sizeInBytes=309.2 TiB)
                        :        :- PartialAggregate [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], Statistics(sizeInBytes=330.8 GiB)
                        :        :  +- Project [item_id#84428, auct_end_dt#84429, transaction_id#84436, created_dt#84440, lstg_curncy_id#84474], Statistics(sizeInBytes=330.8 GiB)
                        :        :     +- Filter ((((((((isnotnull(created_dt#84440) AND isnotnull(auct_end_dt#84429)) AND isnotnull(CHECKOUT_FLAGS4#84486)) AND (created_dt#84440 >= 2021-07-14)) AND (created_dt#84440 <= 2021-07-16)) AND (auct_end_dt#84429 >= 2021-07-14)) AND isnotnull(item_id#84428)) AND isnotnull(transaction_id#84436)) AND ((cast(CHECKOUT_FLAGS4#84486 as bigint) & 2) > 0)), Statistics(sizeInBytes=10.2 TiB)
                        :        :        +- Relation gdw_tables.dw_checkout_trans[item_id#84428,auct_end_dt#84429,site_id#84430,leaf_categ_id#84431,seller_id#84432,slr_cntry_id#84433,buyer_id#84434,byr_cntry_id#84435,transaction_id#84436,shipping_address_id#84437,sale_type#84438,created_time#84439,created_dt#84440,last_modified#84441,last_modified_dt#84442,checkout_flags#84443,checkout_status#84444,checkout_status_details#84445,payment_method#84446,shipping_fee#84447,shipping_xfee#84448,tax#84449,tax_state#84450,instruction_flag#84451,... 110 more fields] parquet, Statistics(sizeInBytes=10.2 TiB)
                        :        +- PartialAggregate [curncy_id#72721], [curncy_id#72721], Statistics(sizeInBytes=957.0 B)
                        :           +- Project [curncy_id#72721], Statistics(sizeInBytes=957.0 B)
                        :              +- Filter isnotnull(curncy_id#72721), Statistics(sizeInBytes=4.4 KiB)
                        :                 +- Relation gdw_tables.ssa_curncy_plan_rate_dim[CURNCY_ID#72721,CURNCY_PLAN_RATE#72722,CRE_DATE#72723,CRE_USER#72724,UPD_DATE#72725,UPD_USER#72726] parquet, Statistics(sizeInBytes=4.4 KiB)
                        +- PartialAggregate [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                           +- PartialAggregate [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                              +- Project [item_id#84639, transaction_id#84640, auct_end_dt#84641, created_dt#84650, sess_guid#84665, sess_session_skey#84666L, sess_session_start_dt#84669, sess_site_id#84670], Statistics(sizeInBytes=28.6 TiB)
                                 +- Filter ((((((((((((isnotnull(sess_session_start_dt#84669) AND isnotnull(created_dt#84650)) AND isnotnull(auct_end_dt#84641)) AND (created_dt#84650 >= 2021-07-14)) AND (created_dt#84650 <= 2021-07-16)) AND (sess_session_start_dt#84669 >= 2021-07-14)) AND (sess_session_start_dt#84669 <= 2021-07-15)) AND (auct_end_dt#84641 >= 2021-07-14)) AND isnotnull(item_id#84639)) AND isnotnull(transaction_id#84640)) AND isnotnull(sess_session_skey#84666L)) AND isnotnull(cast(sess_site_id#84670 as decimal(10,0)))) AND isnotnull(sess_guid#84665)), Statistics(sizeInBytes=318.1 TiB)
                                    +- Relation p_soj_cl_t.checkout_metric_item[item_id#84639,transaction_id#84640,auct_end_dt#84641,item_site_id#84642,trans_site_id#84643,auct_type_code#84644,leaf_categ_id#84645,seller_id#84646,buyer_id#84647,seller_country_id#84648,buyer_country_id#84649,created_dt#84650,created_time#84651,item_price#84652,quantity#84653,lstg_curncy_exchng_rate#84654,lstg_curncy_id#84655,ck_wacko_yn#84656,variation_id#84657,version_id#84658,app_id#84659,format_flags64#84660L,auct_start_dt#84661,leaf_categ_id2#84662,... 51 more fields] parquet, Statistics(sizeInBytes=318.1 TiB)
```

* Fix tpcds q24a DecimalAggregates issue:
```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.DecimalAggregates ===
 Subquery false                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           Subquery false
 +- Aggregate [CheckOverflow((0.050000 * promote_precision(avg(netpaid#166))), DecimalType(24,8), true) AS (CAST(0.05 AS DECIMAL(21,6)) * CAST(avg(netpaid) AS DECIMAL(21,6)))#176]                                                                                                                                                                                                                                                                                                                                       +- Aggregate [CheckOverflow((0.050000 * promote_precision(avg(netpaid#166))), DecimalType(24,8), true) AS (CAST(0.05 AS DECIMAL(21,6)) * CAST(avg(netpaid) AS DECIMAL(21,6)))#176]
!   +- Aggregate [c_last_name#121, c_first_name#120, s_store_name#66, ca_state#138, s_state#85, i_color#107, i_current_price#95, i_manager_id#110, i_units#108, i_size#105], [sum(ss_net_paid#37, None) AS netpaid#166]                                                                                                                                                                                                                                                                                                      +- Aggregate [c_last_name#121, c_first_name#120, s_store_name#66, ca_state#138, s_state#85, i_color#107, i_current_price#95, i_manager_id#110, i_units#108, i_size#105], [MakeDecimal(sum(UnscaledValue(ss_net_paid#37), None),17,2) AS netpaid#166]
       +- Project [ss_net_paid#37, s_store_name#66, s_state#85, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110, c_first_name#120, c_last_name#121, ca_state#138]                                                                                                                                                                                                                                                                                                                                    +- Project [ss_net_paid#37, s_store_name#66, s_state#85, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110, c_first_name#120, c_last_name#121, ca_state#138]
          +- Join Inner, ((s_zip#86 = ca_zip#139) AND (c_birth_country#126 = upper(ca_country#140)))                                                                                                                                                                                                                                                                                                                                                                                                                               +- Join Inner, ((s_zip#86 = ca_zip#139) AND (c_birth_country#126 = upper(ca_country#140)))
             :- Project [s_store_name#66, s_state#85, s_zip#86, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                                                                   :- Project [s_store_name#66, s_state#85, s_zip#86, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :  +- Join Inner, ((ss_item_sk#19 = sr_item_sk#42) AND (ss_ticket_number#26L = sr_ticket_number#49L))                                                                                                                                                                                                                                                                                                                                                                                                                    :  +- Join Inner, ((ss_item_sk#19 = sr_item_sk#42) AND (ss_ticket_number#26L = sr_ticket_number#49L))
             :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                        :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :     :  +- Join Inner, (ss_item_sk#19 = i_item_sk#90)                                                                                                                                                                                                                                                                                                                                                                                                                                                                   :     :  +- Join Inner, (ss_item_sk#19 = i_item_sk#90)
             :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126]                                                                                                                                                                                                                                                                                                                                              :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_ticket_number#26L, ss_net_paid#37, c_first_name#120, c_last_name#121, c_birth_country#126]
             :     :     :  +- Join Inner, (ss_customer_sk#20 = c_customer_sk#112)                                                                                                                                                                                                                                                                                                                                                                                                                                                    :     :     :  +- Join Inner, (ss_customer_sk#20 = c_customer_sk#112)
             :     :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_customer_sk#20, ss_ticket_number#26L, ss_net_paid#37]                                                                                                                                                                                                                                                                                                                                                                             :     :     :     :- Project [s_store_name#66, s_state#85, s_zip#86, ss_item_sk#19, ss_customer_sk#20, ss_ticket_number#26L, ss_net_paid#37]
             :     :     :     :  +- Join Inner, (ss_store_sk#24 = s_store_sk#61)                                                                                                                                                                                                                                                                                                                                                                                                                                                     :     :     :     :  +- Join Inner, (ss_store_sk#24 = s_store_sk#61)
             :     :     :     :     :- Project [s_store_sk#61, s_store_name#66, s_state#85, s_zip#86]                                                                                                                                                                                                                                                                                                                                                                                                                                :     :     :     :     :- Project [s_store_sk#61, s_store_name#66, s_state#85, s_zip#86]
             :     :     :     :     :  +- Filter ((((s_market_id#71 = 8) AND isnotnull(s_market_id#71)) AND isnotnull(s_zip#86)) AND isnotnull(s_store_sk#61))                                                                                                                                                                                                                                                                                                                                                                       :     :     :     :     :  +- Filter ((((s_market_id#71 = 8) AND isnotnull(s_market_id#71)) AND isnotnull(s_zip#86)) AND isnotnull(s_store_sk#61))
             :     :     :     :     :     +- Relation hermes_tpcds5t.store[s_store_sk#61,s_store_id#62,s_rec_start_date#63,s_rec_end_date#64,s_closed_date_sk#65,s_store_name#66,s_number_employees#67,s_floor_space#68,s_hours#69,s_manager#70,s_market_id#71,s_geography_class#72,s_market_desc#73,s_market_manager#74,s_division_id#75,s_division_name#76,s_company_id#77,s_company_name#78,s_street_number#79,s_street_name#80,s_street_type#81,s_suite_number#82,s_city#83,s_county#84,... 5 more fields] parquet               :     :     :     :     :     +- Relation hermes_tpcds5t.store[s_store_sk#61,s_store_id#62,s_rec_start_date#63,s_rec_end_date#64,s_closed_date_sk#65,s_store_name#66,s_number_employees#67,s_floor_space#68,s_hours#69,s_manager#70,s_market_id#71,s_geography_class#72,s_market_desc#73,s_market_manager#74,s_division_id#75,s_division_name#76,s_company_id#77,s_company_name#78,s_street_number#79,s_street_name#80,s_street_type#81,s_suite_number#82,s_city#83,s_county#84,... 5 more fields] parquet
             :     :     :     :     +- Project [ss_item_sk#19, ss_customer_sk#20, ss_store_sk#24, ss_ticket_number#26L, ss_net_paid#37]                                                                                                                                                                                                                                                                                                                                                                                              :     :     :     :     +- Project [ss_item_sk#19, ss_customer_sk#20, ss_store_sk#24, ss_ticket_number#26L, ss_net_paid#37]
             :     :     :     :        +- Filter (((isnotnull(ss_customer_sk#20) AND isnotnull(ss_store_sk#24)) AND isnotnull(ss_ticket_number#26L)) AND isnotnull(ss_item_sk#19))                                                                                                                                                                                                                                                                                                                                                   :     :     :     :        +- Filter (((isnotnull(ss_customer_sk#20) AND isnotnull(ss_store_sk#24)) AND isnotnull(ss_ticket_number#26L)) AND isnotnull(ss_item_sk#19))
             :     :     :     :           +- Relation hermes_tpcds5t.store_sales[ss_sold_time_sk#18,ss_item_sk#19,ss_customer_sk#20,ss_cdemo_sk#21,ss_hdemo_sk#22,ss_addr_sk#23,ss_store_sk#24,ss_promo_sk#25,ss_ticket_number#26L,ss_quantity#27,ss_wholesale_cost#28,ss_list_price#29,ss_sales_price#30,ss_ext_discount_amt#31,ss_ext_sales_price#32,ss_ext_wholesale_cost#33,ss_ext_list_price#34,ss_ext_tax#35,ss_coupon_amt#36,ss_net_paid#37,ss_net_paid_inc_tax#38,ss_net_profit#39,ss_sold_date_sk#40] parquet               :     :     :     :           +- Relation hermes_tpcds5t.store_sales[ss_sold_time_sk#18,ss_item_sk#19,ss_customer_sk#20,ss_cdemo_sk#21,ss_hdemo_sk#22,ss_addr_sk#23,ss_store_sk#24,ss_promo_sk#25,ss_ticket_number#26L,ss_quantity#27,ss_wholesale_cost#28,ss_list_price#29,ss_sales_price#30,ss_ext_discount_amt#31,ss_ext_sales_price#32,ss_ext_wholesale_cost#33,ss_ext_list_price#34,ss_ext_tax#35,ss_coupon_amt#36,ss_net_paid#37,ss_net_paid_inc_tax#38,ss_net_profit#39,ss_sold_date_sk#40] parquet
             :     :     :     +- Project [c_customer_sk#112, c_first_name#120, c_last_name#121, c_birth_country#126]                                                                                                                                                                                                                                                                                                                                                                                                                 :     :     :     +- Project [c_customer_sk#112, c_first_name#120, c_last_name#121, c_birth_country#126]
             :     :     :        +- Filter (isnotnull(c_birth_country#126) AND isnotnull(c_customer_sk#112))                                                                                                                                                                                                                                                                                                                                                                                                                         :     :     :        +- Filter (isnotnull(c_birth_country#126) AND isnotnull(c_customer_sk#112))
             :     :     :           +- Relation hermes_tpcds5t.customer[c_customer_sk#112,c_customer_id#113,c_current_cdemo_sk#114,c_current_hdemo_sk#115,c_current_addr_sk#116,c_first_shipto_date_sk#117,c_first_sales_date_sk#118,c_salutation#119,c_first_name#120,c_last_name#121,c_preferred_cust_flag#122,c_birth_day#123,c_birth_month#124,c_birth_year#125,c_birth_country#126,c_login#127,c_email_address#128,c_last_review_date#129] parquet                                                                              :     :     :           +- Relation hermes_tpcds5t.customer[c_customer_sk#112,c_customer_id#113,c_current_cdemo_sk#114,c_current_hdemo_sk#115,c_current_addr_sk#116,c_first_shipto_date_sk#117,c_first_sales_date_sk#118,c_salutation#119,c_first_name#120,c_last_name#121,c_preferred_cust_flag#122,c_birth_day#123,c_birth_month#124,c_birth_year#125,c_birth_country#126,c_login#127,c_email_address#128,c_last_review_date#129] parquet
             :     :     +- Project [i_item_sk#90, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]                                                                                                                                                                                                                                                                                                                                                                                                        :     :     +- Project [i_item_sk#90, i_current_price#95, i_size#105, i_color#107, i_units#108, i_manager_id#110]
             :     :        +- Filter isnotnull(i_item_sk#90)                                                                                                                                                                                                                                                                                                                                                                                                                                                                         :     :        +- Filter isnotnull(i_item_sk#90)
             :     :           +- Relation hermes_tpcds5t.item[i_item_sk#90,i_item_id#91,i_rec_start_date#92,i_rec_end_date#93,i_item_desc#94,i_current_price#95,i_wholesale_cost#96,i_brand_id#97,i_brand#98,i_class_id#99,i_class#100,i_category_id#101,i_category#102,i_manufact_id#103,i_manufact#104,i_size#105,i_formulation#106,i_color#107,i_units#108,i_container#109,i_manager_id#110,i_product_name#111] parquet                                                                                                           :     :           +- Relation hermes_tpcds5t.item[i_item_sk#90,i_item_id#91,i_rec_start_date#92,i_rec_end_date#93,i_item_desc#94,i_current_price#95,i_wholesale_cost#96,i_brand_id#97,i_brand#98,i_class_id#99,i_class#100,i_category_id#101,i_category#102,i_manufact_id#103,i_manufact#104,i_size#105,i_formulation#106,i_color#107,i_units#108,i_container#109,i_manager_id#110,i_product_name#111] parquet
             :     +- Project [sr_item_sk#42, sr_ticket_number#49L]                                                                                                                                                                                                                                                                                                                                                                                                                                                                   :     +- Project [sr_item_sk#42, sr_ticket_number#49L]
             :        +- Filter (isnotnull(sr_ticket_number#49L) AND isnotnull(sr_item_sk#42))                                                                                                                                                                                                                                                                                                                                                                                                                                        :        +- Filter (isnotnull(sr_ticket_number#49L) AND isnotnull(sr_item_sk#42))
             :           +- Relation hermes_tpcds5t.store_returns[sr_return_time_sk#41,sr_item_sk#42,sr_customer_sk#43,sr_cdemo_sk#44,sr_hdemo_sk#45,sr_addr_sk#46,sr_store_sk#47,sr_reason_sk#48,sr_ticket_number#49L,sr_return_quantity#50,sr_return_amt#51,sr_return_tax#52,sr_return_amt_inc_tax#53,sr_fee#54,sr_return_ship_cost#55,sr_refunded_cash#56,sr_reversed_charge#57,sr_store_credit#58,sr_net_loss#59,sr_returned_date_sk#60] parquet                                                                                  :           +- Relation hermes_tpcds5t.store_returns[sr_return_time_sk#41,sr_item_sk#42,sr_customer_sk#43,sr_cdemo_sk#44,sr_hdemo_sk#45,sr_addr_sk#46,sr_store_sk#47,sr_reason_sk#48,sr_ticket_number#49L,sr_return_quantity#50,sr_return_amt#51,sr_return_tax#52,sr_return_amt_inc_tax#53,sr_fee#54,sr_return_ship_cost#55,sr_refunded_cash#56,sr_reversed_charge#57,sr_store_credit#58,sr_net_loss#59,sr_returned_date_sk#60] parquet
             +- Project [ca_state#138, ca_zip#139, ca_country#140]                                                                                                                                                                                                                                                                                                                                                                                                                                                                    +- Project [ca_state#138, ca_zip#139, ca_country#140]
                +- Filter (isnotnull(ca_country#140) AND isnotnull(ca_zip#139))                                                                                                                                                                                                                                                                                                                                                                                                                                                          +- Filter (isnotnull(ca_country#140) AND isnotnull(ca_zip#139))
                   +- Relation hermes_tpcds5t.customer_address[ca_address_sk#130,ca_address_id#131,ca_street_number#132,ca_street_name#133,ca_street_type#134,ca_suite_number#135,ca_city#136,ca_county#137,ca_state#138,ca_zip#139,ca_country#140,ca_gmt_offset#141,ca_location_type#142] parquet                                                                                                                                                                                                                                          +- Relation hermes_tpcds5t.customer_address[ca_address_sk#130,ca_address_id#131,ca_street_number#132,ca_street_name#133,ca_street_type#134,ca_suite_number#135,ca_city#136,ca_county#137,ca_state#138,ca_zip#139,ca_country#140,ca_gmt_offset#141,ca_location_type#142] parquet

```

* 1. Fix tpcds q82 can't add runtime filter
2. Fix Statistics issue

* Fix a bug

* Support avg

* 1. Support push down if AggregateExpression contains complex expressions
2. Deduplicate and reorder aggregate expressions to find more ReuseExchanges

* Fix TPC-DS v2.7 q57 and q67a can't re-use exchange issue:
```scala
class TPCDSV2_7_PlanStabilityWithStatsSuite extends PlanStabilitySuite with TPCDSBase {
  override def injectStats: Boolean = true

  override val goldenFilePath: String =
    new File(baseResourcePath, s"approved-plans-v2_7").getAbsolutePath

  Seq(
    // "q5a", "q6", "q10a", "q11", "q12", "q14", "q14a",
    // "q18a",
    "q51a",
    "q57",
    "q67a").foreach { q =>
    test(s"check simplified sf100 (tpcds-v2.7.0/$q)") {
      println(s"=================${q}")
      testQuery("tpcds-v2.7.0", q, ".sf100")
    }
  }

//  test("check simplified sf100 (tpcds-v2.7.0/)") {
//    testQuery("tpcds-v2.7.0", "q57", ".sf100")
//  }
}
```

* Fix bbensid2 q12 java.lang.RuntimeException: Couldn't find date_confirm#25512

```
java.sql.SQLException: Error running query: java.lang.RuntimeException: Couldn't find date_confirm#25512 in sum#31527L,sum#31528L,count#31529L,sum#31530L,user_site_id#25498,half_origin_user#31347,_groupingexpression#31516,_groupingexpression#31517,_groupingexpression#31518,_groupingexpression#31519,pushed_count(user_id#25495)#31520L,pushed_count(date_confirm#25512)#31521L,pushed_sum(CASE WHEN CASE WHEN (flagsex6#25555 = -999) THEN false ELSE (((cast(flagsex6#25555 as bigint) & 65536) >= 1) <=> true) END THEN 1 ELSE 0 END, None)#31522L,site_name#30052,cnt#31525L
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:297)
at org.apache.hive.jdbc.HiveStatement.executeQuery(HiveStatement.java:392)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.$anonfun$main$1(BenchmarkAndVerifyResult.scala:162)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult$.main(BenchmarkAndVerifyResult.scala:146)
at com.ebay.carmel.spark.BenchmarkAndVerifyResult.main(BenchmarkAndVerifyResult.scala)
```

https://jirap.corp.ebay.com/browse/CARMEL-5966

* Merge code from Apache Spark

* Split Aggregate to Partial Agg and Final Agg.

* Enhance supportPushedAgg to do not downgrade tpcds q4 performance

* Do not downgrade bbendis 367 performance:
```
MAX(CASE WHEN sojlib.soj_extract_flag(sojlib.soj_nvl(e.soj, 'cflgs'), 15) = 1 THEN 1 ELSE 0 END) AS gbh_yn,
```

* Do not push if it is contains count distinct

* Simplify the code

* Fix bug

* Fix: org.apache.spark.sql.hive.execution.ObjectHashAggregateSuite.randomized aggregation test - [with distinct] - without grouping keys - with empty input

Error Message
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c3#7123
Stacktrace
sbt.ForkMain$ForkError: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: c3#7123
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
	at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:324)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:414)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:412)

* Simplify the code

* Add config spark.sql.optimizer.partialAggregationOptimization.enabled

* Fix test

* Add tests

* Add partialAggregationOptimization.benefitRatio and partialAggregationOptimization.fallbackReductionRatio

* Port [SPARK-39248][SQL] Improve divide performance for decimal type

* fix

* Support sum(1)

* Aggregate expression's references from Alias

* sync code

* fix test

* fix

* Fix avg data issue

* Fix test error

* fix

* 1. Only push down if has benefit
2. Introduce FinalAggregate

* Fix
udaynpusa pushed a commit to mapr/spark that referenced this pull request Jan 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants