Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators #10835

Closed
wants to merge 45 commits into from

Conversation

andrewor14
Copy link
Contributor

The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts:

SPARK-12895: Implement TaskMetrics using accumulators. TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver.

SPARK-12896: Send only accumulator updates to the driver. Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620.

While an effort has been made to preserve as much of the public API as possible, there were a few known breaking @DeveloperAPI changes that would be very awkward to maintain. I will gather the full list shortly and post it here.

Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master.

Andrew Or added 13 commits January 18, 2016 14:55
commit 269031f
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 16:33:12 2016 -0800

    Remove unused method

commit c04b5df
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 16:13:08 2016 -0800

    Review comments

commit d2e4e23
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 14:42:19 2016 -0800

    One more

commit 202d48e
Merge: e99b9af 4f11e3f
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 14:27:47 2016 -0800

    Merge branch 'master' of github.com:apache/spark into get-or-create-metrics

commit e99b9af
Merge: 34c7ce5 b8cb548
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 13:56:41 2016 -0800

    Merge branch 'master' of github.com:apache/spark into get-or-create-metrics

    Conflicts:
    	core/src/main/scala/org/apache/spark/CacheManager.scala
    	core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
    	core/src/test/scala/org/apache/spark/CacheManagerSuite.scala

commit 34c7ce5
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 12:46:42 2016 -0800

    Hide updatedBlocks

commit ad094f0
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 12:30:59 2016 -0800

    Clean up JsonProtocol

    This commit collapsed 10 methods into 2. The 8 that were inlined
    were only used in 1 place each, and the body of each was quite
    small. The additional level of abstraction did not add much value
    and made the code verbose.

commit 0785984
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 12:20:28 2016 -0800

    Replace set with register

    JsonProtocol remains the only place where we still call set
    on each of the *Metrics classes.

commit b9d7fbf
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 12:10:17 2016 -0800

    Clean up places where we set OutputMetrics

    Note: there's one remaining place, which is JsonProtocol.

commit 62c96e1
Author: Andrew Or <andrew@databricks.com>
Date:   Mon Jan 18 11:50:04 2016 -0800

    Add register* methods (get or create)
Previously we would always zero out an accumulator when we
deserialize it. Certainly we don't want to do that on the driver.
The changes in this commit are temporary and will be reverted
in SPARK-12896.
Tests don't pass yet, obviously...
+ fix tests, which are still failing
... by setting it to zero on the executors, always.
…e-accums

Conflicts:
	core/src/main/scala/org/apache/spark/CacheManager.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala
	core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala
	core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
	core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
	core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
	core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
	core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Also add some useful error printing if things don't match.
@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49702 has finished for PR 10835 at commit 7b5d840.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49718 has finished for PR 10835 at commit 2069a78.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 19, 2016

Test build #49724 has finished for PR 10835 at commit 40fd853.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49729 has finished for PR 10835 at commit 2a3cd27.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49732 has finished for PR 10835 at commit 2a3cd27.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -212,6 +212,8 @@ class HadoopRDD[K, V](
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

// TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, SQLNewHadoopRDD started as a copy of NewHadoopRDD and then diverged some more. It would be great to try to clean this up at some point.

@JoshRosen
Copy link
Contributor

According to Reviewable I'm all caught up with the latest changes. This looks like it's in really good shape to me; I only had a few very minor comments upthread, plus one or two clarifying questions (I think everything is fine, but just wanted to double-check my understanding before a final sign-off). I think we're on track to get this by end-of-day tomorrow, if not earlier.

@andrewor14
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 26, 2016

Test build #50110 has finished for PR 10835 at commit 6893419.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public final class BitArray
    • public abstract class BloomFilter
    • public class BloomFilterImpl extends BloomFilter
    • \"Cannot merge bloom filter of class \" + other.getClass().getName()
    • class QuantileDiscretizer(JavaEstimator, HasInputCol, HasOutputCol):


if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
}

listenerBus.post(SparkListenerTaskEnd(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I guess the old logic was kinda convoluted in the sense that it would report failures of cancelled stages' tasks to the listener bus, but not completions of successes. The new behavior of skipping the listener event for all tasks from cancelled stages, not just successful ones, makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it's failing tests now and I don't know why. For simplicity I'm just going to revert this change since it's largely orthogonal and add a comment that the old comment was wrong. We can fix it later if we want to.

@@ -1125,24 +1130,31 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way this comment isn't even true. I think this whole "post only failed tasks of cancelled stages" behavior is not intentional to begin with. I tried to look into this but there are 30 layers of indirection so I just gave up. Let's tackle this separately (if it actually matters).

Andrew Or added 4 commits January 26, 2016 12:39
Looks like previous changes accidentally broke an unspoken
contract in posting task end events. There's no documentation
anywhere so it's unclear what the right behavior is. For
simplicity I will revert this part of the patch and maybe
fix it later.
... by ignoring the failing tests. These tests were
non-deterministically failing only because exceptions in job end
callbacks weren't propagated properly due to a race condition.

In terms of the failing tests themselves, they don't actually
correctly test what they intend to. Fixing this should be done
in a follow-up patch.
@andrewor14
Copy link
Contributor Author

retest this please

@andrewor14
Copy link
Contributor Author

@JoshRosen I believe I addressed all of your comments. The latest commit should pass tests. There are a few things I would like to do on my end that shouldn't block on the merging of this patch:

  • Do a final pass of self-review over the outstanding changes
  • Compile a list of changed public APIs
  • File follow-up JIRAs based on the few things we discussed in the review
  • Test HistoryServer reading Spark 1.6 logs

If this patch looks good to you and it passes tests, feel free to merge it. I'll keep working on these things in the mean time.

@SparkQA
Copy link

SparkQA commented Jan 27, 2016

Test build #50163 has finished for PR 10835 at commit 9f964f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister
    • class ChiSqSelector(JavaEstimator, HasFeaturesCol, HasOutputCol, HasLabelCol):
    • class ChiSqSelectorModel(JavaModel):
    • public static final class Array extends ArrayData
    • public static final class Struct extends InternalRow
    • public class ColumnVectorUtils
    • public static final class Row extends InternalRow

@JoshRosen
Copy link
Contributor

I'm going to merge this now and we'll deal with those other tasks in followup patches. Thanks!

@asfgit asfgit closed this in 87abcf7 Jan 27, 2016
asfgit pushed a commit that referenced this pull request Jan 27, 2016
by explicitly marking annotated parameters as vals (SI-8813).

Caused by #10835.

Author: Andrew Or <andrew@databricks.com>

Closes #10955 from andrewor14/fix-scala211.
@andrewor14 andrewor14 deleted the task-metrics-use-accums branch January 27, 2016 22:23
@andrewor14
Copy link
Contributor Author

I have compiled a list of breaking @DeveloperAPI changes:

ExceptionFailure:

  • changed: apply, unapply, copy
  • removed: old constructor
  • deprecated: metrics

InputMetrics:

  • remove: old constructor, all case class methods, updateBytesRead, setBytesReadCallback, var bytesReadCallback
  • deprecated: apply, unapply, incBytesRead, incRecordsRead

OutputMetrics:

  • removed: old constructor, all case class methods
  • deprecated: apply, unapply

ShuffleReadMetrics:

  • removed: old constructor

ShuffleWriteMetrics:

  • removed: old constructor

TaskMetrics:

  • changed: accumulatorUpdates return type (Map[Long, Any] -> Seq[AccumulableInfo])
  • removed: hostname
  • deprecated: var updatedBlocks, set var outputMetrics, set var shuffleWriteMetrics

AccumulableInfo:

  • changed: update type (Option[String] -> Option[Any]), value type (String -> Option[Any]), name type (String -> Option[String])
  • removed: internal
  • deprecated: all existing apply methods

SparkListenerTaskEnd:

  • changed: taskMetrics is now @Nullable

SparkListenerExecutorMetricsUpdate:

  • changed: apply, unapply, copy
  • removed: old constructor, taskMetrics

@andrewor14
Copy link
Contributor Author

I was able to verify that the changes in Accumulable and Accumulator do not break compatibility. Looks like MiMa was confused there.

asfgit pushed a commit that referenced this pull request Jan 29, 2016
This is an existing issue uncovered recently by #10835. The reason for the exception was because the `SQLHistoryListener` gets all sorts of accumulators, not just the ones that represent SQL metrics. For example, the listener gets the `internal.metrics.shuffleRead.remoteBlocksFetched`, which is an Int, then it proceeds to cast the Int to a Long, which fails.

The fix is to mark accumulators representing SQL metrics using some internal metadata. Then we can identify which ones are SQL metrics and only process those in the `SQLHistoryListener`.

Author: Andrew Or <andrew@databricks.com>

Closes #10971 from andrewor14/fix-sql-history.
asfgit pushed a commit that referenced this pull request Jan 30, 2016
This issue is causing tests to fail consistently in master with Hadoop 2.6 / 2.7. This is because for Hadoop 2.5+ we overwrite existing values of `InputMetrics#bytesRead` in each call to `HadoopRDD#compute`. In the case of coalesce, e.g.
```
sc.textFile(..., 4).coalesce(2).count()
```
we will call `compute` multiple times in the same task, overwriting `bytesRead` values from previous calls to `compute`.

For a regression test, see `InputOutputMetricsSuite.input metrics for old hadoop with coalesce`. I did not add a new regression test because it's impossible without significant refactoring; there's a lot of existing duplicate code in this corner of Spark.

This was caused by #10835.

Author: Andrew Or <andrew@databricks.com>

Closes #10973 from andrewor14/fix-input-metrics-coalesce.
asfgit pushed a commit that referenced this pull request Feb 9, 2016
Additional changes to #10835, mainly related to style and visibility. This patch also adds back a few deprecated methods for backward compatibility.

Author: Andrew Or <andrew@databricks.com>

Closes #10958 from andrewor14/task-metrics-to-accums-followups.
asfgit pushed a commit that referenced this pull request Feb 18, 2016
See [JIRA](https://issues.apache.org/jira/browse/SPARK-13344) for more detail. This was caused by #10835.

Author: Andrew Or <andrew@databricks.com>

Closes #11222 from andrewor14/fix-test-accum-exceptions.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants