Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29298][CORE] Separate block manager heartbeat endpoint from driver endpoint #25971

Closed
wants to merge 11 commits into from

Conversation

LantaoJin
Copy link
Contributor

@LantaoJin LantaoJin commented Sep 30, 2019

What changes were proposed in this pull request?

Executor's heartbeat will send synchronously to BlockManagerMaster to let it know that the block manager is still alive. In a heavy cluster, it will timeout and cause block manager re-register unexpected.
This improvement will separate a heartbeat endpoint from the driver endpoint. In our production environment, this was really helpful to prevent executors from unstable up and down.

Why are the changes needed?

BlockManagerMasterEndpoint handles many events from executors like RegisterBlockManager, GetLocations, RemoveShuffle, RemoveExecutor etc. In a heavy cluster/app, it is always busy. The BlockManagerHeartbeat event also was handled in this endpoint. We found it may timeout when it's busy. So we add a new endpoint BlockManagerMasterHeartbeatEndpoint to handle heartbeat separately.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Exist UTs

@SparkQA
Copy link

SparkQA commented Sep 30, 2019

Test build #111590 has finished for PR 25971 at commit c348176.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 2, 2019

Test build #111665 has finished for PR 25971 at commit d33da8d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class SecondWithFraction(child: Expression, timeZoneId: Option[String] = None)
  • case class AlterDatabaseSetLocationCommand(databaseName: String, location: String)

@cloud-fan
Copy link
Contributor

sounds like a good idea. cc @jiangxb1987 @xuanyuanking

@joshrosen-stripe
Copy link
Contributor

joshrosen-stripe commented Oct 7, 2019

Does this change introduce any potential race-conditions in the case where we detect a legitimate timeout? For example, are there any situations where previously the "mark block manager as dead" was performed serially w.r.t. other block manager operations but now can be performed concurrently with those operations (following this PR's changes)?

I haven't looked closely at this patch yet, so maybe this concern is already addressed / is not an issue.

@LantaoJin
Copy link
Contributor Author

#25971 (comment) Thanks for the comment. IIUC, block manager timeout should be independent event. We've improved this patch in our production thrift server with ~7000 executors for months and no issues happened which relates to this patch.

@SparkQA
Copy link

SparkQA commented Oct 8, 2019

Test build #111859 has finished for PR 25971 at commit e7272aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@JoshRosen I think your concern is valid but it's an existing problem. The executor has a heartbeat thread, it's possible that this thread finds out heartbeat is timeout and decides to shutdown the executor, while the block manager of this executor is still dealing with some requests.

It's good to know how this problem is resolved, but I think it's very unlikely that we have a long-standing bug here.

@LantaoJin
Copy link
Contributor Author

Thanks for the explanation @cloud-fan .

@@ -230,6 +238,7 @@ class BlockManagerMaster(
if (driverEndpoint != null && isDriver) {
tell(StopBlockManagerMaster)
driverEndpoint = null
driverHeartbeatEndPoint = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you'd better stop driverHeartbeatEndPoint before null it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in tell

Comment on lines 41 to 53
case RegisterBlockManager(blockManagerId, _, _, _, _) =>
updateLastSeenMs(blockManagerId)
blockManagerIdByExecutor(blockManagerId.executorId) = blockManagerId
context.reply(true)

case UpdateBlockInfo(blockManagerId, _, _, _, _) =>
updateLastSeenMs(blockManagerId)
context.reply(true)

case RemoveExecutor(execId) =>
blockManagerIdByExecutor.get(execId).foreach(blockManagerLastSeen.remove)
blockManagerIdByExecutor -= execId
context.reply(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if you could comment that why you needs to handle RegisterBlockManager, UpdateBlockInfo, RemoveExecutor besides BlockManagerHeartbeat in a heartbeat related endpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I will add some comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Use BlockManagerId -> Long to manage the heartbeat last seen, so the events which to handle

extends ThreadSafeRpcEndpoint with Logging {

// Mapping from block manager id to the block manager's information.
private val blockManagerLastSeen = new mutable.HashMap[BlockManagerId, Long]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this refactored from original code, but still feel weird that we don't ever use the lastSeen elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, def lastSeenMs: Long = _lastSeenMs is never used in original code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, shall we eliminate blockManagerLastSeen as it may occupy a bunch of memory when there's thousands of executors/BlockManagers ?

Copy link
Contributor Author

@LantaoJin LantaoJin Oct 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see. Does it have a bug in original code? If def lastSeenMs never used is on purpose, why we need to do updateLastSeenMs

Copy link
Contributor Author

@LantaoJin LantaoJin Oct 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. we can not eliminate blockManagerLastSeen for now at least. The lastSeenMs is not used but the blockManagerLastSeen map is used to store all BlockManagerId for def heartbeatReceived. So the

  private val blockManagerLastSeen = new mutable.HashMap[BlockManagerId, Long]

can be changed to

  private val blockManagers = new mutable.HashSet[BlockManagerId]

But I think blockManagerLastSeen structure is better and no need more memory.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense.

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late, the general idea makes sense to me.

private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterBlockManager(blockManagerId, _, _, _, _) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we move these messages in the new RPC endpoint, I think they can be removed in the original class?
https://github.com/apache/spark/pull/25971/files#diff-186864190089a718680accb51de5f0d4L89-L95

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, we need to handle these messages in both endpoints, do you think we should separate more thoroughly? Just handle these 4 kinds of message related to heart beat in this endpoint.

Copy link
Contributor Author

@LantaoJin LantaoJin Oct 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all events need to be handled in heartbeat endpoint. But you noticed me that we need comments in original endpoint as a reminder for new coming events in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// SPARK-29298 separates heartbeat endpoint from driver endpoint.

@SparkQA
Copy link

SparkQA commented Oct 18, 2019

Test build #112249 has finished for PR 25971 at commit cb6c4f6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 19, 2019

Test build #112306 has finished for PR 25971 at commit b0ecff9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@LantaoJin
Copy link
Contributor Author

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late, LGTM.
cc @jiangxb1987 and @cloud-fan

@Ngone51
Copy link
Member

Ngone51 commented Oct 31, 2019

LGTM, too.

@jiangxb1987
Copy link
Contributor

If the BlockManagerMaster Heartbeat timeout, does it means the endpoint is not able to response the event within 10 minutes ?

BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))

@LantaoJin
Copy link
Contributor Author

https://github.com/apache/spark/blob/0da667d31436c43e06cb6bb5ac65a17f65edd08b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#issuecomment-548545864 Yes. In a heavy cluster, it will timeout and cause block manager re-register unexpected. So that’s what the PR to fix, not just increase the timeout value. This code still keep syncAsk with 10 minutes timeout. Instead, the response from new heartbeat endpoint is fast even when the driver is heavy since the logic in heartbeat endpoint is light.

@LantaoJin
Copy link
Contributor Author

Gentle ping @jiangxb1987

@cloud-fan
Copy link
Contributor

I'd like to confirm what's been fixed by this PR.

If BlockManagerMaster is too busy and timeout the heartbeat, then creating a new end-point for heartbeat can make BlockManagerMaster less busy, but by how much? If BlockManagerMaster is mostly busy with other events and heartbeat is a very small portion, then I'm not sure how useful is the new end-point.

@LantaoJin
Copy link
Contributor Author

@cloud-fan the prime target is to fix the second problem: BlockManagerMaster is mostly busy with other events causes heartbeat timeout frequently. In a heavy driver, executors' heartbeat timeout frequently will cause executors lost and finally driver will crash.
We had a testing:
Before: 40 JDBC clients send small SQL concurrency, the driver will crash in 2 hours.
After: 80 JDBC clients send small SQL concurrency, the driver could work over 3 days.
(Testing is based on our concurrency optimized driver)

@cloud-fan
Copy link
Contributor

so other events do not have timeout? or they will retry if timeout?

seems like you are indicating that: heartbeat timeout is serious and we should try our best to avoid it. It's still useful to have a busy block manager as other events can handle timeout gracefully.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Nov 7, 2019

Maybe but I didn’t check all failure handing of all types of event. What’s I can confirm is executors lost frequently could make the situation worse and it’s fatal in our case. The performance here improved in deed after heartbeat event was separated in practice. But I don’t make sure other events like removeExecutor can handle timeout gracefully. In our practice, the availability is increased evidently in a busy blockmanagermaster and jobs finally succeed.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Nov 7, 2019

In deed many handling of other events are synchronized. Heartbeat could be asynchronous handled. This fix could avoid too many executors lost as a victim.

@cloud-fan
Copy link
Contributor

now both the driver endpoint and the new heartbeat endpoint track the list of block managers. What can go wrong if these 2 lists are out of sync?

@LantaoJin
Copy link
Contributor Author

You are right. It’s a risk. This part was changed from our code running in production. In original code, the list is created before the two endpoints and as a class member of them when endpoints constructing. So it has only one block manager list. I will change it back.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113443 has finished for PR 25971 at commit 03b9a56.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113445 has finished for PR 25971 at commit f9f7489.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113439 has finished for PR 25971 at commit 280058c.

  • This patch fails due to an unknown error code, -9.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

case BlockManagerHeartbeat(blockManagerId) =>
context.reply(heartbeatReceived(blockManagerId))

case StopBlockManagerMaster =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a comment that, for simplicity we reuse StopBlockManagerMaster to stop heartbeat end point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to emphasize reuse? All endpoints belong to BlockManagerMaster should stop themselves when they received StopBlockManagerMaster event.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113446 has finished for PR 25971 at commit 7e0d841.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

override val rpcEnv: RpcEnv,
isLocal: Boolean,
blockManagerInfo: mutable.Map[BlockManagerId, BlockManagerInfo])
extends IsolatedRpcEndpoint with Logging {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm wondering whether BlockManagerMasterHeartbeatEndpoint should be an IsolatedRpcEndpoint. As IsolatedRpcEndpoint is mainly designed for those heavy&busy endpoints, e.g. BlockManagerMasterEndpoint, DriverEndpoint, while BlockManagerMasterHeartbeatEndpoint doesn't seems to be high load. cc @vanzin @squito

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copy from BlockManagerMasterEndpoint :)
IsolatedRpcEndpoint was merged recently. I miss the background about this.
Do you suggest to change back to ThreadSafeRpcEndpoint?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113449 has finished for PR 25971 at commit 43634cb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 8, 2019

Test build #113463 has finished for PR 25971 at commit 7b8b398.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

so other events do not have timeout? or they will retry if timeout?

Please correct me if I'm wrong but I don't see approach to retry when GetLocations* requests timeout.

It's still useful to have a busy block manager as other events can handle timeout gracefully.

If this is the case then the approach proposed by this PR is fine, otherwise, all the requests towards the BlockManagerMasterEndpoint just timeout, we might need to figure out other ways to make things work.

Heartbeat could be asynchronous handled.

I won't call an async message Heartbeat.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Nov 9, 2019

Thanks for the comment @jiangxb1987

Please correct me if I'm wrong but I don't see approach to retry when GetLocations* requests timeout.

GetLocations event never timeout.

driverEndpoint.askSync[Seq[BlockManagerId]](GetLocations(blockId))

BlockManagerHeartbeat event could timeout, and if timeout we treat it as an executor lost.
BlockManagerHeartbeat(blockManagerId), new RpcTimeout(10.minutes, "BlockManagerHeartbeat"))

But in a busy block manager, executors are not dead in deed but treated as lost by this mistakenly timeout. That's what this PR to fix.

so other events do not have timeout? or they will retry if timeout?

Previously, I am not confirm that. But I think yes. They do not timeout. I only see BlockManagerHeartbeat with timeout parameter.

driverEndpoint.askSync[T](BlockManagerHeartbeat, new RpcTimeout(..))

I won't call an async message Heartbeat.

Sorry, I still keep it sync.

blockManagerMaster.driverHeartbeatEndPoint.askSync[Boolean](

@cloud-fan
Copy link
Contributor

by default Spark RPC will timeout after 120 seconds

/** Returns the default Spark timeout to use for RPC ask operations. */
  def askRpcTimeout(conf: SparkConf): RpcTimeout = {
    RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
  }

IIUC, non-heartbeat timeout just fail the task/job, while heartbeat timeout causes executor lost, which is more serious

@LantaoJin
Copy link
Contributor Author

by default Spark RPC will timeout after 120 seconds

Ah, correct.

@Ngone51
Copy link
Member

Ngone51 commented Nov 11, 2019

@LantaoJin So, have you noticed the failed task/job along with executor lost issue ? I feel they're more likely to show up since default rpc timeout(2 min) takes less more time than heartbeat timeout(10 min).

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Nov 11, 2019

I spent much more attention on the driver death since we use thriftserver as long running service. So the failed task/jobs may be retried successful, or resubmitted by upper layer scheduler tools/users. But remarkably the driver (thriftserver) can live longer and more stable with this patching.

@Ngone51 Image this, in our production, driver will be busy but not be busy all its life time (unlike pressure testing), jobs/tasks may fail and executors may lost sometimes. For a long running service, we can endure jobs/tasks occasionally failed when driver is busy as long as driver is still alive. But we can not accept the service enters downtime once mass executors lost caused by hot driver.

@LantaoJin
Copy link
Contributor Author

LantaoJin commented Nov 11, 2019

#25971 (comment) our testing also illustrated this. Honestly speaking, this patching could not resolve all problems about hot driver (there are too many pieces to consider). But I think it could fix one of them to help a long running thrift-server to be product level in a way.

@Ngone51
Copy link
Member

Ngone51 commented Nov 11, 2019

Actually, what I want to know is whether these 2 issues(failed task/job and executor lost) happen in a short duration rather than a long run duration. If they happen in a short duration, then I think this could really be the explanation for other may timeouted messages while heartbeat has already timeout.

Anyway, I think this PR is good enough for certain executor lost issue.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR avoids executor lost on BlockManager heartbeat failure, which is the right direction, LGTM

@jiangxb1987
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113612 has finished for PR 25971 at commit 7b8b398.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 5cb05f4 Nov 12, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants