Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3015] Block on cleaning tasks to prevent Akka timeouts #1931

Closed
wants to merge 7 commits into from

Conversation

Projects
None yet
6 participants
@andrewor14
Copy link
Contributor

commented Aug 13, 2014

More detail on the issue is described in SPARK-3015, but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of #1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many RemoveBroadcast messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole BlockManager* message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

@tdas @pwendell @mengxr

@SparkQA

This comment has been minimized.

Copy link

commented Aug 13, 2014

QA tests have started for PR 1931. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18489/consoleFull

@SparkQA

This comment has been minimized.

Copy link

commented Aug 13, 2014

QA results for PR 1931:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18489/consoleFull

@SparkQA

This comment has been minimized.

Copy link

commented Aug 14, 2014

QA tests have started for PR 1931. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18496/consoleFull

@SparkQA

This comment has been minimized.

Copy link

commented Aug 14, 2014

QA results for PR 1931:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18496/consoleFull

andrewor14 added some commits Aug 14, 2014

Use the actual reference queue length
The previous code used the length of the referenceBuffer, which is
the number of elements registered for clean-up, rather than the
number of elements registered AND de-referenced.

What we want is the length of the referenceQueue. However, Java
does not expose this, so we must access it through reflection.
Since this is potentially expensive, we need to limit the number
of times we access the queue length this way.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 14, 2014

QA tests have started for PR 1931. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18507/consoleFull

@SparkQA

This comment has been minimized.

Copy link

commented Aug 14, 2014

QA results for PR 1931:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18507/consoleFull

}
} catch {
case e: Exception =>
logDebug("Failed to access reference queue's length through reflection: " + e)

This comment has been minimized.

Copy link
@tdas

tdas Aug 14, 2014

Contributor

Add a note on why this is logDebug and not logWarning/logError.

Some(f)
} catch {
case e: Exception =>
logDebug("Failed to expose java.lang.ref.ReferenceQueue's queueLength field: " + e)

This comment has been minimized.

Copy link
@tdas

tdas Aug 14, 2014

Contributor

Similar to the comment below, add a note.

private def logQueueFullErrorMessage(): Unit = {
if (!queueFullErrorMessageLogged) {
queueFullErrorMessageLogged = true
logError(s"Reference queue size in ContextCleaner has exceeded $queueCapacity! " +

This comment has been minimized.

Copy link
@tdas

tdas Aug 14, 2014

Contributor

I am not sure whether this should be logError. Its not like the system is immediately tipping over because of it reached this capacity. I think it should be a logWarning.

* so we have to do this through reflection. This is expensive, however, so we should access
* this field only once in a while.
*/
private val queueCapacity = 10000

This comment has been minimized.

Copy link
@tdas

tdas Aug 14, 2014

Contributor

Well, this is not the capacity. It is just a warning threshold. it should be named accordingly.

@tdas

This comment has been minimized.

Copy link
Contributor

commented Aug 14, 2014

Its a little ugly that the ContextCleaner class is being polluted with so many parameters, and all the temporary queue length code. Wouldnt it be much cleaner if we make a custom ReferenceQueue, which has the field length(), that does this reflection on itself to find the queue length. All the iteration counter, queue length checking and error message printing code can go inside that ReferenceQueue implementation, which is cleanly separated from the main context cleaner logic.

@andrewor14

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2014

Yeah, sounds good. I guess we'll use a ReferenceQueueWithSize or something instead

@andrewor14

This comment has been minimized.

Copy link
Contributor Author

commented Aug 16, 2014

I have removed the logic of logging queue length as a warning. This significantly simplifies the PR and fulfills its original purpose as a bug fix. We can add back some notion of warning later on if there is interest.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2014

QA tests have started for PR 1931 at commit d0f7195.

  • This patch merges cleanly.
@SparkQA

This comment has been minimized.

Copy link

commented Aug 16, 2014

QA tests have finished for PR 1931 at commit d0f7195.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@pwendell

This comment has been minimized.

Copy link
Contributor

commented Aug 16, 2014

Great - I like this version better!

@asfgit asfgit closed this in c9da466 Aug 16, 2014

asfgit pushed a commit that referenced this pull request Aug 16, 2014

[SPARK-3015] Block on cleaning tasks to prevent Akka timeouts
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of #1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <andrewor14@gmail.com>

Closes #1931 from andrewor14/reference-blocking and squashes the following commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
(cherry picked from commit c9da466)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
*/
private val blockOnCleanupTasks = sc.conf.getBoolean(
"spark.cleaner.referenceTracking.blocking", false)

This comment has been minimized.

Copy link
@witgo

witgo Aug 20, 2014

Contributor

The changes will not solve the problem here. see.
BlockManagerMasterActor.scala#L165

  private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
    // Nothing to do in the BlockManagerMasterActor data structures
    import context.dispatcher
    val removeMsg = RemoveShuffle(shuffleId)
    Future.sequence(
      blockManagerInfo.values.map { bm =>
        // Here has set the akkaTimeout
        bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Boolean]
      }.toSeq
    )
  }

@andrewor14 andrewor14 deleted the andrewor14:reference-blocking branch Aug 27, 2014

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014

[SPARK-3015] Block on cleaning tasks to prevent Akka timeouts
More detail on the issue is described in [SPARK-3015](https://issues.apache.org/jira/browse/SPARK-3015), but the TLDR is if we send too many blocking Akka messages that are dependent on each other in quick successions, then we end up causing a few of these messages to time out and ultimately kill the executors. As of apache#1498, we broadcast each RDD whether or not it is persisted. This means if we create many RDDs (each of which becomes a broadcast) and the driver performs a GC that cleans up all of these broadcast blocks, then we end up sending many `RemoveBroadcast` messages in parallel and trigger the chain of blocking messages at high frequencies.

We do not know of the Akka-level root cause yet, so this is intended to be a temporary solution until we identify the real issue. I have done some preliminary testing of enabling blocking and observed that the queue length remains quite low (< 1000) even under very intensive workloads.

In the long run, we should do something more sophisticated to allow a limited degree of parallelism through batching clean up tasks or processing them in a sliding window. In the longer run, we should clean up the whole `BlockManager*` message passing interface to avoid unnecessarily awaiting on futures created from Akka asks.

tdas pwendell mengxr

Author: Andrew Or <andrewor14@gmail.com>

Closes apache#1931 from andrewor14/reference-blocking and squashes the following commits:

d0f7195 [Andrew Or] Merge branch 'master' of github.com:apache/spark into reference-blocking
ce9daf5 [Andrew Or] Remove logic for logging queue length
111192a [Andrew Or] Add missing space in log message (minor)
a183b83 [Andrew Or] Switch order of code blocks (minor)
9fd1fe6 [Andrew Or] Remove outdated log
104b366 [Andrew Or] Use the actual reference queue length
0b7e768 [Andrew Or] Block on cleaning tasks by default + log error on queue full
@igreenfield

This comment has been minimized.

Copy link

commented Apr 15, 2019

@andrewor14
I see in the code:

/**

  • Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
  • is controlled by the spark.cleaner.referenceTracking.blocking.shuffle parameter).
  • Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
  • workaround for the issue, which is ultimately caused by the way the BlockManager endpoints
  • issue inter-dependent blocking RPC messages to each other at high frequencies. This happens,
  • for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
  • longer in scope.
    */

does that still needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.