-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10620][WIP] Migrate TaskMetrics to accumulators #10717
Conversation
There are a bunch of decrement X methods that were not used. Also, there are a few set X methods that could have easily just been increment X. The latter change is more in line with accumulators.
This commit uses the existing PEAK_EXECUTION_MEMORY mechanism to bring a few other fields in TaskMetrics to use accumulators.
This commit ports ShuffleReadMetrics to use accumulators, preserving as much of the existing semantics as possible. It also introduces a nicer way to organize all the internal accumulators by namespacing them.
This commit was a little tricky because it ripped the bytes read callback from TaskMetrics and related classes. It does change behavior in the sense that now we periodically update the number of bytes read (every 1000 records) instead of doing it every time we send an executor heartbeat. The advantage here is code simplicity.
Tests are still failing as of this commit. E.g. SortShuffleSuite.
Tests were previously failing because we end up double counting metrics in local mode. This is because each TaskContext shares the same list of accumulators, so they end up updating the metrics on top of each other. The fix is to ensure TaskContext clears any existing values on the accumulators before passing them on.
The exception was harmless because it didn't actually fail the test. However, the test harness was actually badly written. We used to always assume that the first job will have an ID of 0, but there could very well be other tests sharing the same SparkContext. This is now fixed and we no longer see the exception. As of this commit, all known test failures have been fixed. I'm sure there will be more...
Instead of passing in a callback, we can just return the accumulator values directly, which we have. "We" here refers to TaskMetrics.
…-accums Conflicts: core/src/main/scala/org/apache/spark/TaskContextImpl.scala sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
Instead, send only accumulator updates. As of this commit TaskMetrics is only used as a syntactic sugar on the executor side to modify accumulator values by names. Now we no longer send the same thing in two different codepaths. Now that we never send TaskMetrics from executors to the driver, we also never send accumulators that way. Then we can revert some of the accumulator changes.
In the previous commit, we made accumulator communication one-way again, which is the same as before this patch, so we restored all the semantics involved in serializing accumulators as before. Note: tests are still failing because of a duplicate accumulator name in some SQL things. Run `DataFrameCallbackSuite` for more detail.
Currently we still get values for tasks that fail. We should keep this semantics in the new accumulator updates as well.
There are a few places where we passed in empty internal accumulators to TaskContextImpl, so the TaskMetrics creation would fail. These are now fixed.
Before this commit the SQL UI would not display any accumulators. This is because it is powered by the SQLListener, which reads accumulators from TaskMetrics. However, we did not update the accumulator values before posting the TaskMetrics, so the UI never saw the updates from the tasks. This commit also fixes a few related test failures.
Now internal accumulators no longer need to have unique names. This was an unnecessary hack for the SQL accumulators that can be reverted through some clean ups.
for readability.
A few bugs: (1) In Executor.scala, we updated TaskMetrics after collecting the accumulator values. We should do it the other order. (2) The test utility method of verifying whether peak execution memory is set imposed this requirement on every single job run in the test body. This does not apply for SQL's external sort, however, because one of the jobs does a sample and so does not update peak execution memory. (3) We were getting accumulators from executors that were not registered on the driver. Not exactly sure what the cause is but it could very well have to do with GC on the driver since we use weak references there. We shouldn't crash the scheduler if this happens.
Such that downstream listeners can access their values. This commit also generalizes the internal accumulator type from Long to anything, since we need to store the read and write methods of InputMetrics and OutputMetrics respectively.
This fixes a bug where when we reconstruct TaskMetrics we just pass in mutable accumulators, such that when new tasks come in they change the values of the old tasks. A more subtle bug here is that we were passing in the accumulated values instead of the local task values. Both are now fixed. TODO: write a test for all of these please.
The fake accumulator values should no longer all be Longs. Ugh.
Test build #49482 has started for PR 10717 at commit |
Test build #49480 has started for PR 10717 at commit |
…-accums Conflicts: core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
23af334
to
b58f2e6
Compare
retest this please |
This patch is really difficult to review since it combines simple refactoring and more subtle logic. Is it possible to break this up? If there were patches that did simple clean up, those can get reviewed and merged very quickly. |
Yeah it's still WIP but once it's in a more ready state I'll try to break it up. |
Test build #49485 has finished for PR 10717 at commit
|
Test build #49509 has finished for PR 10717 at commit
|
Test build #49601 has finished for PR 10717 at commit
|
OK, I broke this down into smaller PRs: #10810 - move classes to their own files for readability (MERGED) |
This is a small step in implementing SPARK-10620, which migrates `TaskMetrics` to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just moves classes to their own files to avoid having single monolithic ones that contain 10 different classes. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10810 from andrewor14/move-things.
By the end of all the smaller patches there will be too much conflict to resolve, so I'm closing this. |
Note to self: DO NOT DELETE THIS BRANCH!! |
This is a small step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. This patch is strictly a cleanup patch and introduces no change in functionality. It literally just renames 3 fields for consistency. Today we have: ``` inputMetrics.recordsRead outputMetrics.bytesWritten shuffleReadMetrics.localBlocksFetched ... shuffleWriteMetrics.shuffleRecordsWritten shuffleWriteMetrics.shuffleBytesWritten shuffleWriteMetrics.shuffleWriteTime ``` The shuffle write ones are kind of redundant. We can drop the `shuffle` part in the method names. I added backward compatible (but deprecated) methods with the old names. Parent PR: #10717 Author: Andrew Or <andrew@databricks.com> Closes #10811 from andrewor14/rename-things.
This is a step in implementing SPARK-10620, which migrates TaskMetrics to accumulators. TaskMetrics has a bunch of var's, some are fully public, some are `private[spark]`. This is bad coding style that makes it easy to accidentally overwrite previously set metrics. This has happened a few times in the past and caused bugs that were difficult to debug. Instead, we should have get-or-create semantics, which are more readily understandable. This makes sense in the case of TaskMetrics because these are just aggregated metrics that we want to collect throughout the task, so it doesn't matter who's incrementing them. Parent PR: apache#10717 Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes apache#10815 from andrewor14/get-or-create-metrics.
The high level idea is that instead of having the executors send both accumulator updates and TaskMetrics, we should have them send only accumulator updates. This eliminates the need to maintain both code paths since one can be implemented in terms of the other. This effort is split into two parts: **SPARK-12895: Implement TaskMetrics using accumulators.** TaskMetrics is basically just a bunch of accumulable fields. This patch makes TaskMetrics a syntactic wrapper around a collection of accumulators so we don't need to send TaskMetrics from the executors to the driver. **SPARK-12896: Send only accumulator updates to the driver.** Now that TaskMetrics are expressed in terms of accumulators, we can capture all TaskMetrics values if we just send accumulator updates from the executors to the driver. This completes the parent issue SPARK-10620. While an effort has been made to preserve as much of the public API as possible, there were a few known breaking DeveloperApi changes that would be very awkward to maintain. I will gather the full list shortly and post it here. Note: This was once part of #10717. This patch is split out into its own patch from there to make it easier for others to review. Other smaller pieces of already been merged into master. Author: Andrew Or <andrew@databricks.com> Closes #10835 from andrewor14/task-metrics-use-accums.
There exist two mechanisms to pass metrics from executors to drivers: accumulators and
TaskMetrics
. Currently we send both things to the driver using two separate code paths. This is an unnecessary maintenance burden and makes the code more difficult to follow.This patch proposes that we send only accumulator updates to the driver. Additionally, it reimplements
TaskMetrics
using accumulators such that the newTaskMetrics
serves mainly as a syntactic sugar to increment and access the values of the underlying accumulators. It migrates the rest of the metrics to adopt the code path already used by the existingPEAK_EXECUTION_MEMORY
.While an effort has been made to preserve as much of the public API as possible, there were a few known breaking
@DeveloperApi
changes that would be very awkward to maintain. These are:TaskMetrics#hostname
field was removed; event log was the only consumerExceptionFailure#taskMetrics
field was replaced withaccumUpdates
SparkListenerExecutorMetricsUpdate#taskMetrics
field was replaced withaccumUpdates
AccumulableInfo#update
changed fromOption[String]
toOption[Any]
AccumulableInfo#value
changed fromString
toOption[Any]
The following event log elements are changed:
AccumulableInfo
"Update" field changed fromOption[String]
toOption[Any]
AccumulableInfo
"Value" field changed fromString
toOption[Any]
This is WIP because I would like to add tests for some of the intricate cases that I ran into while implementing this.
I broke this down into several smaller PRs:
ShuffleWriteMetrics