Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19274][SQL] Make GlobalLimit without shuffling data to single partition #16633

Closed
wants to merge 2 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 18, 2017

What changes were proposed in this pull request?

A logical Limit is performed actually by two physical operations LocalLimit and GlobalLimit.

In most of time, before GlobalLimit, we will perform a shuffle exchange to shuffle data to single partition. When the limit number is not trivially small, this shuffling is costing.

This change tries to perform GlobalLimit without shuffling data to single partition. The approach is similar to SparkPlan.executeTake. It iterates part of partitions until it reaches enough data.

How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

numToReduce -= toReduce
}
resultRDD = childRDD.mapPartitionsWithIndexInternal { case (index, iter) =>
if (index < partsScanned) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually for the partitions which index < partsScanned of childRdd we computed twice, its hard to say this must be better than the old version.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not exactly. In here we only drop few redundant rows for few partitions. Most of partitions are not computed at all. Previously, you have to compute all partitions plus shuffling all partitions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously: compute all partitions + shuffle all partitions of child rdd
Now: compute some partitions of child rdd twice, one to get the partsScanned and totalNum, one to get the limit num elements from these partitions.

I mean second one is not always better than the first if the recompute cost is high.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example: select xxx from table where xxx > 99 limit 1000
if the table is a big table and real num of xxx which > 99 is less than 100, you still need compute the all the partitions and you will do the filter, scan the big table twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second one you mean, is just dropping few elements. I think it is very trivial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no rows to drop, it just returns the iterator without consuming it. So so scan will happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be consumed by the parent plan.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is the issue of broken RDD job chain @rxin pointed out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain more, i think without the job of

val res = sc.runJob(childRDD,
         (it: Iterator[InternalRow]) => Array[Int](it.size), p)

the job chain is not broken.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The broken RDD job chain causes extra partition scan.

@SparkQA
Copy link

SparkQA commented Jan 18, 2017

Test build #71605 has finished for PR 16633 at commit b26488f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@rxin
Copy link
Contributor

rxin commented Jan 19, 2017

This breaks the RDD job chain doesn't it?

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@rxin Can you explain it more? I don't get it. Why it breaks the RDD job chain?

@viirya viirya force-pushed the globallimit-without-shuffle branch from b26488f to 6ba8b28 Compare January 19, 2017 02:10
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
val sc = sqlContext.sparkContext
val res = sc.runJob(childRDD,
(it: Iterator[InternalRow]) => Array[Int](it.size), p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya it.size will trigger the scan for the big table, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We scan part of partitions in each iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, i know, but for the selected partitions in each iteration, it will trigger the compute for these partitions, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@rxin ok. I see what you mean breaking the RDD job chain.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@rxin even it breaks the RDD job chain. I think it is still useful in some cases, for example, the number of partitions is big and you only need to get one or few partitions to satisfy the limit.

Broken RDD job chain means we do an extra scan of the few partitions. You can save the time to scan all other partitions and shuffling.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

@viirya @rxin i support the idea of @wzhfy in the maillist http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-td20570.html, it solved the single partition issue in the global limit without break the job chain.

For local limit it still compute all partitions, i think we can consider resolve the local limit issue with some changes in core scheduler in future, we may provide a mechanism: do not compute all the tasks in a stage if some condition is satisfied for the stage.

what do you think?

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf The main issue the user posted in the mailing list is, the limit is big enough or partition number is big enough to cause performance bottleneck in shuffling the data of local limit. But @wzhfy's idea is also involving shuffling.

Another issue is, how do you make sure you create a uniform distribution of the result of local limit. Each local limit can produce different number of rows.

@SparkQA
Copy link

SparkQA commented Jan 19, 2017

Test build #71627 has finished for PR 16633 at commit 6ba8b28.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class LocalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode with CodegenSupport
  • case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

To clear, now we have these issues:

  1. local limit compute all partitions, that means it launch many tasks but actually maybe very small tasks is enough.
  2. global limit single partition issue, now the global limit will shuffle all the data to one partition, so if the limit num is very big, it cause performance bottleneck

It is perfect if we combine the global limit and local limit into one stage, and avoid the shuffle, but for now i can not find a very good solution(no performance regression) to do this without change spark core/scheduler, your solution is trying to do that, but as i suggest, there are some cases the performance maybe worse.

@wzhfy 's idea is just resolve the single partition issue, still shuffle, still local limit on all the partitions, but it not bring performance down in that cases compare with current code path.

Another issue is, how do you make sure you create a uniform distribution of the result of local limit. Each local limit can produce different number of rows.

it use a special partitioner to do this, the partitioner like the row_numer in sql it give each row a uniform partitionid, so in the reduce task, each task handle num of rows very closely.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf

it use a special partitioner to do this, the partitioner like the row_numer in sql it give each row a uniform partitionid, so in the reduce task, each task handle num of rows very closely.

I see @wzhfy wants to use a partitioner to uniformly distribute the rows in each local limit. However, because each local limit can produce different number of rows, you can't get a real uniform distribution. So in the global limit operation, you can't know how many partitions you need to use in order to satisfy the final limit number.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

refer to the maillist

One issue left is how to decide shuffle partition number.
We can have a config of the maximum number of elements for each GlobalLimit task to process,
then do a factorization to get a number most close to that config.
E.g. the config is 2000:
if limit=10000, 10000 = 2000 * 5, we shuffle to 5 partitions
if limit=9999, 9999 = 1111 * 9, we shuffle to 9 partitions
if limit is a prime number, we just fall back to single partition

You mean for the prime number case?

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf No. A simple example: if there are 5 local limit which produce 1, 2, 1, 1, 1 rows when limit is 10. If you shuffle to 5 partitions, the distributions for each local limit look like:

1: (1, 0, 0, 0, 0)
2: (1, 1, 0, 0, 0)
3: (1, 0, 0, 0, 0)
4: (1, 0, 0, 0, 0)
5: (1, 0, 0, 0, 0)

So the final rows in 5 partitions are (5, 1, 0, 0, 0) which is not uniformly distributed.

You don't know how many rows each local limit can get. So how do you know how many partitions and how many rows to retrieve for each partitions?

@SparkQA
Copy link

SparkQA commented Jan 19, 2017

Test build #71633 has finished for PR 16633 at commit 3cbd6ee.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

Yes, you are right, we can not ensure the uniform distribution for global limit.
An idea is not use a special partitioner, after the shuffle we should get the mapoutput statistics for row num of each bucket, and decide each global limit should take how many element.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf I don't think it would work. map output statistics is just approximate number of output bytes. You can't use it to get correct row number.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

need define a new map output statistics to do this

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf Even it works finally, I don't think it is better in performance.

Simply calculate it. Assume the limit number is n, partition number is N, and each partition has n / r rows in average.

For this change, in a worse case, let suppose the first scan partitioned returns 0 rows, then we quadruple the partitions to scan and each partition returns n / r rows. So we totally scan 4 * n / r + n = n * (4 + r) / r rows in the end.

Suppose you can know how many elements in each partition to retrieve back to single partition for global limit operation. You need to produce all rows in all partitions N * n / r + shuffling n rows to single partition.

If we don't consider shuffling cost. So compare n * (4 + r) / r and N * n / r, your solution scans less rows only if N < 4 + r. For example, if each partition has n / 2 rows, N must less than 6. So your solution will only perform better when the partition number is small relatively.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

Not get you, but let me explain more,
If we use map output statistics to decide each global limit should take how many element.

  1. local limit shuffle with the maillist partitioner and return the map output statistics
  2. global limit each partition take or drop some rows(just like what you do in this pr) based on the statistics

Then,

  1. the shuffle cost is almost the same as now
  2. global limit without single partition issue when a big limit number

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf I understand your point. But the main issue is, you can't save the local limit cost and the shuffling cost. You still need to process all rows in all partitions and shuffle some of them to single partition.

Simply said, my previous comment is showing you, in most of cases, your proposal won't perform better than this change. Basically, your proposal should not outperform the current approach.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

I think before compare our proposals , we should first make sure our proposal will not bring performance regression.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf it is fair. anyway, i don't think a proposal can't improve any point of the issues is worth so many requested changing.

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

Your proposal avoid the cost of all partitions compute and shuffle for local limit but introduce some partitions recompute for local limit stage.

We can not decide which cost is cheaper(even in most cases), note computation logical for local limit stage maybe very complex and costly.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

@scwf You still don't get the point. Although few partitions need to recompute in local limit, most of other partitions are saved from computation. In most cases, it is worth. You can refer to my previous comment that shows a simple calculation.

It doesn't matter the local limit is simple or complex. In simple words, one is to recompute 1 partition, another is to compute all 100 partitions, which one is better?

@scwf
Copy link
Contributor

scwf commented Jan 19, 2017

I think the local limit cost is important, we assume recompute partions number: m, all the partitions: n
m = 1, n =100 is a positive case, but there also cases that m very close to n(even m = n).

Our customers has this scenario, so i am so care about this issue.

@viirya
Copy link
Member Author

viirya commented Jan 19, 2017

That case only happens when the all row counts in all partitions are less than or (nearly) equal to the limit number. So it needs to scan (almost) all partitions.

One possible way to deal with this case, is to use row count statistics to decide whether we do this global limit without shuffle, or old global limit.

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

@viirya my team member post the mail list, actually we mean the case i listed above, the main issue is the single partition issue in global limit, if in that case you fall back to old global limit it is still unresolved.

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

@scwf Can you describe the single partition issue based on your understanding? If you read my reply in the mailling list, you should understand that is not an issue. The problem is the shuffling.

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

Assume local limit output 100,000,000 rows, then in global limit it will be take in a single partition, so it is very slow and can not use other free cores to improve the parallelism.

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

@scwf So sounds like it is the problem of shuffling.

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

I think shuffle is ok, but shuffle to one partition leads to the performance issue.

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

That is why I propose to avoid shuffling to single partition. So I don't know what you are against?

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

Again, to clear, I am against the performance regression in flowing case

  1. limit num is 100,000,000
  2. the original table rows is very big, much larger than 100,000,000 rows
  3. after local limit stage, the output row num is about/nearly 100,000,000 rows

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

Do you mean totally rows in all partitions after local limit are about/nearly 100,000,000 rows? Or each partition after local limit has about/nearly 100,000,000 rows?

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

all partitions after local limit are about/nearly 100,000,000 rows

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

Ok. I think it is clearer now. We have two cases needed to solve:

  1. After local limit, total rows in all partitions are (much) more than limit number.
  2. After local limit, total rows in all partitions are nearly the limit number.

For 1. The current change in this PR is effective. We can save shuffling and most of local limit processing.

For 2. The current change will re-process all the rows. So it is not efficient. Fallback to old global limit will degrade parallelism, so if the limit number is big, the performance will be bad. One solution is that we can get the exact number of rows in each partitions after local limit by a modified mapoutput statistics. And we can take only the partitions with required number of rows.

@scwf What do you think?

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

For 1, my idea is not use the proposal in this PR,

  1. how you determine total rows in all partitions are (much) more than limit number. and then go into this code path and how to decide the much more than, we can not use cbo estimate stats here because the locallimit plan maybe complex and we can not ensure the accuracy of the estimate row number.

  2. as @rxin suggest, this break the rdd chain

So for 1, i think it need some improvement of spark core and scheduler as i mentioned above

For 2 it is ok to me, the solution is the same with i described above(still shuffle +shuffle to multi partition + modified mapoutput statistics), right?

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017 via email

@scwf
Copy link
Contributor

scwf commented Jan 20, 2017

@viirya i suggest fix the 2 in this pr, let's wait some comment on 1. /cc @rxin and @wzhfy who may comment on the first case.

@wzhfy
Copy link
Contributor

wzhfy commented Jan 20, 2017

Hi @viirya , the main concern of @scwf is that, we can't afford performance regression in any customer scenarios. I think you can understand that :)

I went through the discussion above, it seems we've had some solution for both cases you mentioned here, then talking points becomes the following two:

  1. how to get the output number and decide the threshold of the two cases;
  2. rdd chain is broken.

Let's wait @rxin 's comment on the second point.

Here I'm just interested in the first one.
One possible way to get the number is to modify the mapoutput statistics suggested by @scwf .
For cbo, if the computing logic before limit is complex, it's hard to get an accurate estimation. E.g. joins of filtered tables, where join keys and filter keys are probably different (that'll need column correlation info).
As you mentioned we can get an estimated number and confidence, can you describe how?

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

@wzhfy Thanks for comment.

I meant that we can just deal with the cases we are confident that the all rows in all partitions must be much larger than the limit number. I am not sure if cbo can tell which cases the estimation is more accurate. Can we obtain such parameter from cbo framework?

If we can't be sure that we won't hit the case of the all rows are nearly the same of the limit number, then the change proposed here can't be used if we don't want to bring performance regression for the cases.

@viirya
Copy link
Member Author

viirya commented Jan 20, 2017

@wzhfy @scwf Thanks for comment.

Until we have a way to figure out how to avoid the defect, I will close this.

@viirya viirya closed this Jan 20, 2017
@viirya viirya deleted the globallimit-without-shuffle branch December 27, 2023 18:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants