Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4630][Core]Dynamically determine optimal number of partitions #4070

Closed
wants to merge 4 commits into from

Conversation

lianhuiwang
Copy link
Contributor

stages in application have different size of data. if user doesnot set numPartitions for any stages, spark will use same defaultParallelism as partitons.
then in DAGScheduler number of stage's running tasks is equal to partition size of stage.so usually this number is a same value.
so if number of stage's partitions is too small, then task need to process large mount of data and slows down due to spilling or gc.
if number of stage's partitions is too large, there is big cost in schedule.
To improve performance of application, we need to determine optimal number of partitions according to stage's input data size.
there are two steps:

  1. estimate number of Stage's Partitions
    according to its parent stages's input data size and spark.reduce.per.partition.bytes configuration we can determine number of stage's partitions.
    how to get parent stages's input data Size?
    if it has no parent, we get its inputSize through summing its input path's length.
    else if it has parents, but its parents is no available, so we get its parents' input data size.
    else if its parents is avilable, we just compute its parents' shuffle data size as its input size.
  2. update Stage's Partitioner
    firstly, update partition of parents' shuffleDep. that make shuffleMapTask write wanted number of partition files.
    and then, update stage's information, particularly stage's rdd. it make stage's shuffleRDD can correctly pull data from map task.

finally, now this feature can be turned off/on with a configuration option.

TODO:

  1. consider spark.shuffle.memoryFraction to determine spark.reduce.per.partition.bytes configuration.
  2. when stage is final stage, resultHandler's value cannot be returned to SparkContext because partitions has been changed.
  3. when number of stage's tasks has been changed, report stage's new information to UI.before submitStage,event SparkListenerJobStart that include all stage Infos of job has been post to listenerBus.
    @ksakellis @sryza @JoshRosen @rxin

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25658 has started for PR 4070 at commit fc652a5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25658 has finished for PR 4070 at commit fc652a5.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class HashPartitioner(var partitions: Int) extends Partitioner

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25658/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25659 has started for PR 4070 at commit 668926c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25659 has finished for PR 4070 at commit 668926c.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class HashPartitioner(var partitions: Int) extends Partitioner

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25659/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25660 has started for PR 4070 at commit 8b7216f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25660 has finished for PR 4070 at commit 8b7216f.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class HashPartitioner(var partitions: Int) extends Partitioner

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25660/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25662 has started for PR 4070 at commit 622e45c.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Jan 16, 2015

Test build #25662 has finished for PR 4070 at commit 622e45c.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class HashPartitioner(var partitions: Int) extends Partitioner

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/25662/
Test FAILed.

@rxin
Copy link
Contributor

rxin commented Jan 19, 2015

@lianhuiwang have you tried this in production on any jobs?

@lianhuiwang
Copy link
Contributor Author

@rxin yes, some of etl jobs that has groupby and join operators have been tried to use this feature.most of time that can determine number of partition very well. why jenkins report "java.lang.RuntimeException: spark-core: Binary compatibility check failed!"? can you tell me the reasons of failed? thanks.
@ksakellis I take a look at your works in SPARK-4630 and Can you give some suggestions about this PR? thanks.

@srowen
Copy link
Member

srowen commented May 18, 2015

I don't think this is the direction that the discussion in SPARK-4630 is leading. This is trying to use output size as a heuristic and it isn't ideal. I'm also not sure of the implications of making the number of partitions mutable in RDDs. Do you mind closing this, as it hasn't been active in a while.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants