Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9851] Support submitting map stages individually in DAGScheduler #8180

Closed
wants to merge 9 commits into from

Conversation

mateiz
Copy link
Contributor

@mateiz mateiz commented Aug 13, 2015

This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.

The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.

@mateiz
Copy link
Contributor Author

mateiz commented Aug 13, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2015

Test build #40822 has finished for PR 8180 at commit 5092c54.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 14, 2015

Test build #40829 has finished for PR 8180 at commit 732c4ed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mateiz
Copy link
Contributor Author

mateiz commented Aug 14, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Aug 14, 2015

Test build #40848 has finished for PR 8180 at commit 732c4ed.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 15, 2015

Test build #40926 has finished for PR 8180 at commit d301862.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class StringIndexerModel (
    • implicit class StringToColumn(val sc: StringContext)

@SparkQA
Copy link

SparkQA commented Aug 19, 2015

Test build #41285 has finished for PR 8180 at commit 4a47919.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class PromotePrecision(child: Expression) extends UnaryExpression
    • case class CheckOverflow(child: Expression, dataType: DecimalType) extends UnaryExpression

@SparkQA
Copy link

SparkQA commented Aug 20, 2015

Test build #41286 has finished for PR 8180 at commit 210ca43.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Aug 21, 2015

@zsxwing if you have time, can you review this pr?

* boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
* fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
* executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
* Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its nice to see these expanded comments, but I think we really need to add a section on stage attempts. That is most probably the most confusing part of the dag schduler and where most bugs occur.

@squito
Copy link
Contributor

squito commented Aug 21, 2015

I'm super nervous about adding this, given our current inability to deal w/ the complexity of the scheduler.

@markhamstra
Copy link
Contributor

@squito Agree that this is high risk, but SPARK-9851 is also very high reward. On balance, I'm excited about this, but also expecting some difficult debugging.

* @param shuffleId ID of the shuffle
* @param numPartitions number of map output partitions
*/
def getStatistics(shuffleId: Int, numPartitions: Int): MapOutputStatistics = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its a little strange that you need to pass numPartitions in here, since that is a property of the map output. Maybe instead of just taking shuffleId, it should take a shuffleHandle, so you can get the number of map output partitions w/ shuffleHandle.dependency.partioner.numPartitions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's a bit weird that we don't remember numPartitions anywhere else but it would be clearer this way.

@mateiz
Copy link
Contributor Author

mateiz commented Aug 21, 2015

Hey Imran, I'm curious, have you actually worked on stuff in the scheduler? I don't know what you mean about inability to deal with complexity in it, but it has gotten simpler over time (e.g. eliminating the ability for jobs to execute locally). If you look in this patch, it's only 100-200 lines of changes to the scheduler; the rest is docs and tests. And as a whole, the DAGScheduler is only around 1600 lines.

@squito
Copy link
Contributor

squito commented Aug 24, 2015

I’ve only recently looked at making changes to the scheduler, but it seems to me there is widespread agreement among committers that it is very error prone. For example, consider Andrew Or’s plea in SPARK-8987 which starts with:

DAGScheduler is one of the most monstrous piece of code in Spark.

Other recent examples of similar sentiments are in this dicussion on backporting SPARK-8103 or confusion in the earlier versions of SPARK-5945, SPARK-7308, and SPARK-8103. Even this seemingly innocuous three line change inadvertently introduced SPARK-9809 (its really lucky that somebody stumbled on that before the release).

I’ve been working on issues related to fault-tolerance, primarily SPARK-8103 & SPARK-8029, which came from real customer escalations. Those took me a long time to wrap my head around, after painfully trying to make sense of user logs, create a reproduction, propose a fix, convince others there really was something wrong, and get lots of help to make the right fix.

I did a bit of fault-injection testing as well, and things seemed to pass consistently after my fixes, so I was hoping that would be the end of the story. But then I dug through some existing jiras, and found SPARK-5259. I couldn't believe it had been open since January! A community member had discovered it and even very clearly described exactly how it happened, but still we haven't fixed it. We're about to release spark 1.5 with it still broken, which means that's at least 3 releases where fault tolerance is knowingly broken. I find that embarrassing.

I'm not saying all of this to denigrate the effort that everyone has already put into it, but I just want to be clear that I really do mean it: we are unable to deal with the complexity of the scheduler. IMO, the highest reward would be to fix the fault-tolerance issues, and focus on testing the scheduler so we gain more confidence in it. So while this feature is interesting, I think we should proceed very cautiously.

// Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
// from, then TaskSet 3 will run the reduce stage
scheduler.resubmitFailedStages()
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as part of the SPARK-5945, @ilganeli is introducing a few util methods that make this a little more clear, eg. completeNextShuffleMapSuccessfully. IMO that helps make these test cases a bit more readable -- right now its a bit hard for the reader to know what taskSets(2) is. Also if someone makes a change that breaks things, its much clearer if they can see eg. that they never launched attempt 1 for stage 0, rather than just getting some strange behavior later on.

@andrewor14
Copy link
Contributor

For example, consider Andrew Or’s plea in SPARK-8987 which starts with [...]

OK, my sentence was quoted outside its original context. It was intended to motivate the increase in test coverage in the scheduler, not to prevent additional changes made to it. This patch adds a bunch of tests for the functionality it introduces so that's not a problem. I would say my issue is mostly unrelated to this patch.

@SparkQA
Copy link

SparkQA commented Sep 3, 2015

Test build #41947 has finished for PR 8180 at commit dac96b7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Sep 3, 2015

@andrewor14 listening from the sidelines, I think @squito was only pointing out that you have also noted how hard it is to change the scheduler with confidence. I don't think he meant to or did imply you are against this change; clearly the linked JIRA shows you are just favoring more tests as a remedy to the problem you describe.

There does seem to be some disconnect between SPARK-8987 and @mateiz's view that there's not much problem changing the scheduler. FWIW it does strike me as a big change to code that has been a source of subtle bugs in recent memory. So, maybe a little more nervousness is called for.

We can't stop changing the scheduler and I'm sure that the motivations for this change are worthy. What's the best way forward -- @squito pointed out some aspects that make him most nervous; would a few more tests help de-stress? I tend to respect his nervousness as he takes the brunt of these problems from big deployments, and getting this working for large deployments reliably is of course something everyone needs.

@mateiz
Copy link
Contributor Author

mateiz commented Sep 3, 2015

Before deciding whether it's a big change, do also take a look at the change. As I said, it's only about 100-200 lines of actual changes, the rest is comments for the existing code and tests. And the actual mechanism by which this works already existed in the DAGScheduler design (the scheduler already tracked map stages, including sharing them between jobs, etc, but there was no way to submit a map stage by itself). If you want to make it seem less scary, I can send a PR without the comments in the original scheduler :).

By the way, we have a working prototype of SPARK-9850 end-to-end (including prototypes of the other JIRAs in it) so it's not like this is untested either.

@squito
Copy link
Contributor

squito commented Sep 4, 2015

Sorry Andrew, I certainly didn't mean to imply that I knew your opinion on this particular patch -- as Sean said, I was just trying to point out the general situation. My point is just that new features add complexity, and make it harder to fix bugs; and we have plenty of complexity and bugs now. I don't mean to block the patch, its a cool feature. I'm just nervous about any change to the dag scheduler (eg., even my own proposal in #8427, which is 30 lines + 100 lines of tests, and most likely needs more testing still). Perhaps I err too much on the side of caution, I'm just providing a counterpoint.

Thanks for adding the the additional tests, Matei. Btw, there is an example test for skipped stages you should be able to copy more or less here: #8402

@SparkQA
Copy link

SparkQA commented Sep 4, 2015

Test build #42004 has finished for PR 8180 at commit bb5190f.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mateiz
Copy link
Contributor Author

mateiz commented Sep 4, 2015

Alright, I think this is ready to review now. Changes made:

  • Added more docs to DAGScheduler about how stages may be re-attempted
  • Added tests on:
    • More complex lineage graphs with multiple map jobs and result jobs pending
    • Fetch failures in the above, which also lead to resubmitted stages
    • Executor failure during a stage, and late task completion messages
  • Better handling of getting the MapOutputStatistics object -- now the DAGScheduler gets it at a point when it knows the map output tracker has outputs for all tasks, whereas before the tracker might lose some outputs by the time SparkContext.submitMapStage queries it.

@SparkQA
Copy link

SparkQA commented Sep 5, 2015

Test build #42015 timed out for PR 8180 at commit 9d23757 after a configured wait of 250m.

@SparkQA
Copy link

SparkQA commented Sep 5, 2015

Test build #42016 timed out for PR 8180 at commit 082b134 after a configured wait of 250m.

@mateiz
Copy link
Contributor Author

mateiz commented Sep 5, 2015

retest this please

@SparkQA
Copy link

SparkQA commented Sep 5, 2015

Test build #42032 has finished for PR 8180 at commit 082b134.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mateiz
Copy link
Contributor Author

mateiz commented Sep 7, 2015

@zsxwing / @squito can you take a second look at this?

* Here's a checklist to use when making or reviewing changes to this class:
*
* - All data structures should be cleared when the jobs involving them end to avoid indefinite
* accumulation of state in long-runnin programs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: long-running

@zsxwing
Copy link
Member

zsxwing commented Sep 8, 2015

LGTM except some nits.

@mateiz
Copy link
Contributor Author

mateiz commented Sep 13, 2015

Thanks for the comments; I've made the fixes. Let me know if anyone else has other comments.

@SparkQA
Copy link

SparkQA commented Sep 13, 2015

Test build #42385 has finished for PR 8180 at commit 8bfce4d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    • case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
    • class CoGroupedRDD[K: ClassTag](
    • class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    • class ExecutorLossReason(val message: String) extends Serializable
    • case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
    • case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
    • case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
    • class MultilayerPerceptronClassifier(JavaEstimator, HasFeaturesCol, HasLabelCol, HasPredictionCol,
    • class MultilayerPerceptronClassificationModel(JavaModel):
    • class MinMaxScaler(JavaEstimator, HasInputCol, HasOutputCol):
    • class MinMaxScalerModel(JavaModel):
    • class StringIndexer(JavaEstimator, HasInputCol, HasOutputCol, HasHandleInvalid):
    • ("thresholds", "Thresholds in multi-class classification to adjust the probability of " +
    • class HasHandleInvalid(Params):
    • class HasElasticNetParam(Params):
    • class HasFitIntercept(Params):
    • class HasStandardization(Params):
    • class HasThresholds(Params):
    • thresholds = Param(Params._dummy(), "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • self.thresholds = Param(self, "thresholds", "Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values >= 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class' threshold.")
    • case class Stddev(child: Expression) extends StddevAgg(child)
    • case class StddevPop(child: Expression) extends StddevAgg(child)
    • case class StddevSamp(child: Expression) extends StddevAgg(child)
    • abstract class StddevAgg(child: Expression) extends AlgebraicAggregate
    • abstract class StddevAgg1(child: Expression) extends UnaryExpression with PartialAggregate1
    • case class Stddev(child: Expression) extends StddevAgg1(child)
    • case class StddevPop(child: Expression) extends StddevAgg1(child)
    • case class StddevSamp(child: Expression) extends StddevAgg1(child)
    • case class ComputePartialStd(child: Expression) extends UnaryExpression with AggregateExpression1
    • case class ComputePartialStdFunction (
    • case class MergePartialStd(
    • case class MergePartialStdFunction(
    • case class StddevFunction(
    • case class ConvertToSafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class ConvertToUnsafeNode(conf: SQLConf, child: LocalNode) extends UnaryLocalNode(conf)
    • case class FilterNode(conf: SQLConf, condition: Expression, child: LocalNode)
    • case class HashJoinNode(
    • case class IntersectNode(conf: SQLConf, left: LocalNode, right: LocalNode)
    • case class LimitNode(conf: SQLConf, limit: Int, child: LocalNode) extends UnaryLocalNode(conf)
    • abstract class LocalNode(conf: SQLConf) extends TreeNode[LocalNode] with Logging
    • abstract class LeafLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class UnaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • abstract class BinaryLocalNode(conf: SQLConf) extends LocalNode(conf)
    • case class ProjectNode(conf: SQLConf, projectList: Seq[NamedExpression], child: LocalNode)
    • case class SampleNode(
    • case class SeqScanNode(conf: SQLConf, output: Seq[Attribute], data: Seq[InternalRow])
    • case class TakeOrderedAndProjectNode(
    • case class UnionNode(conf: SQLConf, children: Seq[LocalNode]) extends LocalNode(conf)

@asfgit asfgit closed this in 1a09552 Sep 15, 2015
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Oct 27, 2015
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.

The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#8180 from mateiz/spark-9851.
markhamstra pushed a commit to markhamstra/spark that referenced this pull request Nov 11, 2015
This patch adds support for submitting map stages in a DAG individually so that we can make downstream decisions after seeing statistics about their output, as part of SPARK-9850. I also added more comments to many of the key classes in DAGScheduler. By itself, the patch is not super useful except maybe to switch between a shuffle and broadcast join, but with the other subtasks of SPARK-9850 we'll be able to do more interesting decisions.

The main entry point is SparkContext.submitMapStage, which lets you run a map stage and see stats about the map output sizes. Other stats could also be collected through accumulators. See AdaptiveSchedulingSuite for a short example.

Author: Matei Zaharia <matei@databricks.com>

Closes apache#8180 from mateiz/spark-9851.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
9 participants