Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target #2366

Closed
wants to merge 16 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Sep 12, 2014

If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably.

The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication.

The solution in this patch adds the following.

  • Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager.
  • Refactored BlockManager's replication code to handle peer caching correctly.
    • The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application.
    • If replication fails to one node, the peers are refetched.
    • The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication.
  • Refactored use of in BlockManager into a new method BlockManagerId.isDriver
  • Added replication unit tests (replication was not tested till now, duh!)

This should not make a difference in performance of Spark workloads where replication is not used.

@andrewor14 @JoshRosen

@andrewor14
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Sep 12, 2014

QA tests have started for PR 2366 at commit 7598f91.

  • This patch merges cleanly.

@tdas tdas changed the title [SPARK-3495] Block replication fails continuously when the replication target node is dead [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake choose driver as target Sep 12, 2014
@SparkQA
Copy link

SparkQA commented Sep 12, 2014

QA tests have started for PR 2366 at commit 4a20531.

  • This patch merges cleanly.

* Get peer block managers in the system.
*/
private def getPeers(forceFetch: Boolean): HashSet[BlockManagerId] = {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 1000 ms is good enough to limit traffic? My logic is that if there are 100 nodes in the cluster 100 messages per second is cheap enough for the driver to handle. Also, this will occur only in streaming, which actively uses replication. If there is no replication being used, this is inactive.

@tdas tdas changed the title [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake choose driver as target [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target Sep 12, 2014
@SparkQA
Copy link

SparkQA commented Sep 12, 2014

QA tests have finished for PR 2366 at commit 7598f91.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

@tdas
Copy link
Contributor Author

tdas commented Sep 12, 2014

GetPeers class signatures has been changed from case class GetPeers(blockManagerId: BlockManagerId, numPeers: Int) to case class GetPeers(blockManagerId: BlockManagerId) . This is probably okay as this is developer API.

@SparkQA
Copy link

SparkQA commented Sep 12, 2014

QA tests have finished for PR 2366 at commit 4a20531.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

@rxin
Copy link
Contributor

rxin commented Sep 12, 2014

Isn't 1s cache span too low? How often will we get a cache hit if they expire in 1 sec?

@@ -59,6 +59,8 @@ class BlockManagerId private (

def port: Int = port_

def isDriver = (executorId == "<driver>")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this. I added this TODO a while back and I think this also affects some UI code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write out the return type Boolean

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right!

@tdas
Copy link
Contributor Author

tdas commented Sep 12, 2014

Yeah, 1 second is probably too low (though there will be 4/5 cache hits for every miss for streaming). Better to have it be a minute. Its a tradeoff between how fast we want streaming to find new nodes. I will update it. Any other thoughts?

/** Get the list of the peers of the given block manager */
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
val blockManagerIds = blockManagerInfo.keySet.filterNot { _.isDriver }.toArray
val selfIndex = blockManagerIds.indexOf(blockManagerId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do contains? Then we don't need to convert this to an array and use indexof

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, had just kept the existing code here as is. Will change, obviously better.

@andrewor14
Copy link
Contributor

I looked at the logic in BlockManager in detail and it looks reasonable. I left a few minor comments all over the place, but in general this LGTM.

@mridulm
Copy link
Contributor

mridulm commented Sep 23, 2014

@tdas In case I did not mention it before :-) this is definitely a great improvement over what existed earlier !
I would love it if we could (sometime soon I hope) add support for re-replication of blocks due to lost executors : which, currently, is outside scope of this PR it seems.

Other than the MT bug I mentioned above, this looks good to me !

@rxin
Copy link
Contributor

rxin commented Sep 23, 2014

If this is getting more complicated, we should consider standardizing the internal api and then buliding a separate service that properly handles all these issues. That service can also handle serving shuffle blocks.

@SparkQA
Copy link

SparkQA commented Sep 26, 2014

QA tests have started for PR 2366 at commit 012afa3.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 27, 2014

QA tests have finished for PR 2366 at commit 012afa3.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

@tdas
Copy link
Contributor Author

tdas commented Sep 27, 2014

@rxin I dont think it is getting more complicated that the status quo. The complexity of fetching and caching of peers is contained in this one method getPeers, so it should be reasonably self contained and easy to replace with a different implementation.

@rxin
Copy link
Contributor

rxin commented Sep 27, 2014

No I was responding to Mridul's rebalancing.

*/
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
peerFetchLock.synchronized {
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a constant - so why not just put it outside of this function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually probably no big deal to leave this here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt want to pollute the namespace inside the BlockManager class any more than absolutely necessary. :)

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have started for PR 2366 at commit 0661773.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 30, 2014

QA tests have finished for PR 2366 at commit 0661773.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Oct 1, 2014

@rxin This is good to go!

@@ -1228,4 +1244,314 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
}

test("get peers with addition and removal of block managers") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we create a new test suite BlockManagerReplicationSuite? This file is getting long.

@tdas
Copy link
Contributor Author

tdas commented Oct 2, 2014

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 2366 at commit 9690f57.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have finished for PR 2366 at commit 9690f57.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster

@rxin
Copy link
Contributor

rxin commented Oct 2, 2014

Thanks. Merging in master.

@asfgit asfgit closed this in 5db78e6 Oct 2, 2014
tdas added a commit to tdas/spark that referenced this pull request Nov 10, 2014
…n target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target

If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably.

The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication.

The solution in this patch adds the following.
- Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager.
- Refactored BlockManager's replication code to handle peer caching correctly.
    + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application.
    + If replication fails to one node, the peers are refetched.
    + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication.
- Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver`
- Added replication unit tests (replication was not tested till now, duh!)

This should not make a difference in performance of Spark workloads where replication is not used.

@andrewor14 @JoshRosen

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#2366 from tdas/replication-fix and squashes the following commits:

9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite.
0661773 [Tathagata Das] Minor changes based on PR comments.
a55a65c [Tathagata Das] Added a unit test to test replication behavior.
012afa3 [Tathagata Das] Bug fix
89f91a0 [Tathagata Das] Minor change.
68e2c72 [Tathagata Das] Made replication peer selection logic more efficient.
08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id
3821ab9 [Tathagata Das] Fixes based on PR comments.
08e5646 [Tathagata Das] More minor changes.
d402506 [Tathagata Das] Fixed imports.
4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager.
7598f91 [Tathagata Das] Minor changes.
03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition.
d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn.
9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug.
af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite

Conflicts:
	core/src/main/scala/org/apache/spark/storage/BlockManager.scala
	core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
	core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
asfgit pushed a commit that referenced this pull request Nov 11, 2014
…master to branch 1.1

The original PR was #2366

This backport was non-trivial because Spark 1.1 uses ConnectionManager instead of NioBlockTransferService, which required slight modification to unit tests. Other than that the code is exactly same as in the original PR. Please refer to discussion in the original PR if you have any thoughts.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #3191 from tdas/replication-fix-branch-1.1-backport and squashes the following commits:

593214a [Tathagata Das] Merge remote-tracking branch 'apache-github/branch-1.1' into branch-1.1
2ed927f [Tathagata Das] Fixed error in unit test.
de4ff73 [Tathagata Das] [SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
@@ -787,19 +789,41 @@ private[spark] class BlockManager(
}

/**
* Get peer block managers in the system.
*/
private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot not make sense of what getPeers means?(it's just my personal confusion....sorry,look forward to receive your reply,thanks) @tdas

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants