Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-732][SPARK-3628][CORE][RESUBMIT] eliminate duplicate update on accmulator #2524

Closed
wants to merge 15 commits into from

Conversation

CodingCat
Copy link
Contributor

https://issues.apache.org/jira/browse/SPARK-3628

In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive

In this patch, I changed the way for the DAGScheduler to update the accumulator,

DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...

@CodingCat CodingCat changed the title [SPARK-732][RESUBMIT] make if allowing duplicate update as an option of accumulator [SPARK-732][SPARK-3628][RESUBMIT] make if allowing duplicate update as an option of accumulator Sep 24, 2014
@CodingCat
Copy link
Contributor Author

@mateiz @mridulm @kayousterhout @markhamstra @pwendell @JoshRosen I proposed this as an resubmission of #228

Expecting your review

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20760/

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have started for PR 2524 at commit 13a190e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have finished for PR 2524 at commit 13a190e.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T],

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20761/

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have started for PR 2524 at commit 9fbe39a.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

QA tests have finished for PR 2524 at commit 9fbe39a.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T],

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20763/

@CodingCat
Copy link
Contributor Author

OK...I will make MIMA happy.....

@SparkQA
Copy link

SparkQA commented Sep 24, 2014

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20768/

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have started for PR 2524 at commit af7ff02.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 25, 2014

QA tests have finished for PR 2524 at commit af7ff02.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T],

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/20775/

@CodingCat CodingCat changed the title [SPARK-732][SPARK-3628][RESUBMIT] make if allowing duplicate update as an option of accumulator [SPARK-732][SPARK-3628][CORE][RESUBMIT] make if allowing duplicate update as an option of accumulator Sep 25, 2014
@CodingCat
Copy link
Contributor Author

BTW, if we don't want to de-duplicate in shuffle stages, we can just move the necessary part to TaskSetManager

@mateiz
Copy link
Contributor

mateiz commented Sep 28, 2014

Let's not de-duplicate in shuffle stages please. That complicates the patch a lot and I'm not sure why people would necessarily use it.

Also, why did you add a duplicate flag to Accumulator? IMO we shouldn't expose this as an option. Again it adds complexity in what should just be a bug fix.

@mateiz
Copy link
Contributor

mateiz commented Sep 28, 2014

Basically it would be great to get a really simple patch that only fixes SPARK-3628 and adds no new data structures in DAGScheduler.

@CodingCat
Copy link
Contributor Author

the drawbacks for us not to de-duplicate in shuffle stage is that, it makes accumulator usage to be very tricky...

it sounds like you are not encouraged to use accumulator in a transformation, especially when the involved stage is shared by multiple jobs or your cluster is not that stable....

for adding flag, just provide flexibility for the user to choose whether they would like to accept duplicate update....

@CodingCat
Copy link
Contributor Author

I can simply monitor the accumulator update in TaskSetManager, just not sure if that can maximumly resolve the problem.....

@mateiz
Copy link
Contributor

mateiz commented Sep 28, 2014

It's probably easiest to move the accumulator update to TaskSetManager or to the part of DAGScheduler that reports the result to the user. It's right below the current update in the code:

if (!job.finished(rt.outputId)) {
  job.finished(rt.outputId) = true
  ...

That happens only once per task, so it's a good place to do the update for ResultTask. For ShuffleMapTask you can do it in the corresponding match statement as well.

@@ -112,6 +112,10 @@ class DAGScheduler(
// stray messages to detect.
private val failedEpoch = new HashMap[String, Long]

// stageId => (SplitId -> (accumulatorId, accumulatorValue))
private[scheduler] val stageIdToAccumulators = new HashMap[Int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may cause a memory leak?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How?

@CodingCat
Copy link
Contributor Author

I think it should work...I'm trying this

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have started for PR 2524 at commit 92d5fec.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have finished for PR 2524 at commit 92d5fec.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21011/

@CodingCat
Copy link
Contributor Author

OK, Jenkins said OK

Finished the modification,

  1. Removed the option for the user to choose whether the accumulator accepts duplication (this may need a highlight in release note)
  2. still de-duplicate in DAGScheduler, since we agreed to de-duplicate Shuffle Stage and a new TaskSetManager is created for re-submitted stage, so DAGScheduler becomes the only place we can work on
  3. Still need to add a data structure in DAGScheduler, the reason is that ShuffleMapTask does not have something like outputid which can be used to associate a re-submit and original task....StageId + partitionId is the only thing we can use...

@@ -901,6 +900,33 @@ class DAGScheduler(
}
}

private def updateAccumulator(event: CompletionEvent): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this updateAccumulators and add a comment saying
/** Merge updates from a task to our local accumulator values */

@mateiz
Copy link
Contributor

mateiz commented Nov 26, 2014

@CodingCat thanks for the update, this looks good. I just made a few small comments.

@SparkQA
Copy link

SparkQA commented Nov 26, 2014

Test build #23893 has started for PR 2524 at commit b233737.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 26, 2014

Test build #23893 has finished for PR 2524 at commit b233737.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23893/
Test FAILed.

@CodingCat
Copy link
Contributor Author

Hey, @mateiz , thank you very much for the review,

I addressed all of them except the "lastId" one, as MIMA wants me to keep that since it's public.....

also, a question for you, shall I submit the patch to the old version branches, since there are some merge conflicts preventing the patch directly to there

@SparkQA
Copy link

SparkQA commented Nov 26, 2014

Test build #23895 has started for PR 2524 at commit 1433e6f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 26, 2014

Test build #23895 has finished for PR 2524 at commit 1433e6f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23895/
Test PASSed.

@mateiz
Copy link
Contributor

mateiz commented Nov 26, 2014

Can you just not change Accumulator.scala then? That change isn't fixing any kind of bug, it's just a small optimization. Just remove it from this patch.

@SparkQA
Copy link

SparkQA commented Nov 26, 2014

Test build #23908 has started for PR 2524 at commit 701a1e8.

  • This patch merges cleanly.

@CodingCat
Copy link
Contributor Author

@mateiz sure, just rollback the changes...how about the question to apply the patch to other branches?

@mateiz
Copy link
Contributor

mateiz commented Nov 27, 2014

Don't worry about the other branches now, we can figure it out if we want to backport it.

@SparkQA
Copy link

SparkQA commented Nov 27, 2014

Test build #23908 has finished for PR 2524 at commit 701a1e8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23908/
Test PASSed.

@asfgit asfgit closed this in 5af53ad Nov 27, 2014
@mateiz
Copy link
Contributor

mateiz commented Nov 27, 2014

Alright, thanks! I've merged this in.

asfgit pushed a commit that referenced this pull request Nov 27, 2014
… accmulator

https://issues.apache.org/jira/browse/SPARK-3628

In current implementation, the accumulator will be updated for every successfully finished task, even the task is from a resubmitted stage, which makes the accumulator counter-intuitive

In this patch, I changed the way for the DAGScheduler to update the accumulator,

DAGScheduler maintains a HashTable, mapping the stage id to the received <accumulator_id , value> pairs. Only when the stage becomes independent, (no job needs it any more), we accumulate the values of the <accumulator_id , value> pairs, when a task finished, we check if the HashTable has contained such stageId, it saves the accumulator_id, value only when the task is the first finished task of a new stage or the stage is running for the first attempt...

Author: CodingCat <zhunansjtu@gmail.com>

Closes #2524 from CodingCat/SPARK-732-1 and squashes the following commits:

701a1e8 [CodingCat] roll back change on Accumulator.scala
1433e6f [CodingCat] make MIMA happy
b233737 [CodingCat] address Matei's comments
02261b8 [CodingCat] rollback  some changes
6b0aff9 [CodingCat] update document
2b2e8cf [CodingCat] updateAccumulator
83b75f8 [CodingCat] style fix
84570d2 [CodingCat] re-enable  the bad accumulator guard
1e9e14d [CodingCat] add NPE guard
21b6840 [CodingCat] simplify the patch
88d1f03 [CodingCat] fix rebase error
f74266b [CodingCat] add test case for resubmitted result stage
5cf586f [CodingCat] de-duplicate on task level
138f9b3 [CodingCat] make MIMA happy
67593d2 [CodingCat] make if allowing duplicate update as an option of accumulator

(cherry picked from commit 5af53ad)
Signed-off-by: Matei Zaharia <matei@databricks.com>
@aarondav
Copy link
Contributor

@mateiz @CodingCat Apologies, but can I confirm that the scope of this change is strictly to ensure that actions/result stages never duplicate accumulator updates? The PR title and description are more general than this, but the associated JIRAs suggest the restricted scope.

@CodingCat
Copy link
Contributor Author

yes, originally, I tried to do it for both shuffletask and resultask, later, @mateiz convinced me that we actually cannot handle transformation case

so the current change only involves result task,

apologize for not changing the PR title on time

@mateiz
Copy link
Contributor

mateiz commented Nov 28, 2014

Yes, it should be only SPARK-3628.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants