Skip to content

[SPARK-28305][YARN] Request GetExecutorLossReason to use a smaller timeout parameter#25078

Closed
cxzl25 wants to merge 1 commit intoapache:masterfrom
cxzl25:fix_GetExecutorLossReason
Closed

[SPARK-28305][YARN] Request GetExecutorLossReason to use a smaller timeout parameter#25078
cxzl25 wants to merge 1 commit intoapache:masterfrom
cxzl25:fix_GetExecutorLossReason

Conversation

@cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Jul 8, 2019

What changes were proposed in this pull request?

Request GetExecutorLossReason to use a smaller timeout parameter.

In some cases, such as NM machine crashes or shuts down,driver ask GetExecutorLossReason,
AM getCompletedContainersStatuses can't get the failure information of container.

Because the yarn NM detection timeout is 10 minutes, it is controlled by the parameter yarn.resourcemanager.rm.container-allocation.expiry-interval-ms.
So AM has to wait for 10 minutes to get the cause of the container failure.

Although the driver's ask fails, it will call recover.
However, due to the 2-minute timeout (spark.network.timeout) configured by IdleStateHandler, the connection between driver and am is closed, AM exits, app finish, driver exits, causing the job to fail.

How was this patch tested?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 8, 2019

AM LOG:

19/07/08 16:56:48 [dispatcher-event-loop-0] INFO YarnAllocator: add executor 951 to pendingLossReasonRequests for get the loss reason
19/07/08 16:58:48 [dispatcher-event-loop-26] INFO ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down.
19/07/08 16:58:48 [dispatcher-event-loop-26] INFO ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0

Driver LOG:

19/07/08 16:58:48,476 [rpc-server-3-3] ERROR TransportChannelHandler: Connection to /xx.xx.xx.xx:19398 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong.
19/07/08 16:58:48,476 [rpc-server-3-3] ERROR TransportResponseHandler: Still have 1 requests outstanding when connection from /xx.xx.xx.xx:19398 is closed
19/07/08 16:58:48,510 [rpc-server-3-3] WARN NettyRpcEnv: Ignored failure: java.io.IOException: Connection from /xx.xx.xx.xx:19398 closed
19/07/08 16:58:48,516 [netty-rpc-env-timeout] WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 951 at RPC address xx.xx.xx.xx:49175, but got no response. Marking as slave lost.
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply from null in 120 seconds. This timeout is controlled by spark.rpc.askTimeout

@tgravescs
Copy link
Contributor

Interesting, this makes sense, but I'm wondering does the same happen in other places as well. I assume if you configure the spark.rpc.askTimeout to be < network timeout, that also works around the issue? Or if you io.connectionTimeout to be larger?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 11, 2019

In the yarn-client mode, the driver closes the AM connection, causing the entire job to exit, causing unnecessary failures.
I searched the code and used the ask+recover method very rarely.

Adjust parameters should be work.

spark.rpc.askTimeout 120s
spark.rpc.io.connectionTimeout 130s

Parameter priority

module first second
RpcEndpointRef#ask spark.rpc.askTimeout spark.network.timeout
TransportChannelHandler spark.rpc.io.connectionTimeout spark.network.timeout

I found a problem with the abnormal exit of the yarn client mode last time, and found and fixed a problem. #23989
In the production environment, this patch is used, and there is still a probability of abnormal exit.
Later, I carefully looked at the logic of this piece and found the problem.

close connection

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
// In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
}

use ask+recover

val removeRddFromExecutorsFutures = blockManagerInfo.values.map { bmInfo =>
bmInfo.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove RDD ${removeMsg.rddId} " +
s"from block manager ${bmInfo.blockManagerId}", e)
0 // zero blocks were removed
}
}.toSeq

@tgravescs
Copy link
Contributor

right, so to clarify I think you are say yes if you set askTimeout separate it also fixes the problem, correct?

So my concern is that you are fixing this in a single location, I think there are other places that could have the same issue, although the question is whether those are recoverable or not.
In this case it is recoverable because its just trying to get the loss reason and if it doesn't it can go on just fine and the AM will restart and connect back to the driver.
My other concern here is that we are breaking what the configs actually do. This is no longer just the asktimeout or network timeout, its 0.95 times one of those. Which is inconsistent with what is documented and everywhere else. Let say I set ask timeout different from network timeout, now you are decreasing the ask timeout even more then what I set it to.

so I'm wondering if you can just set the ask timeout globally different or if that causes other issues?

@cxzl25
Copy link
Contributor Author

cxzl25 commented Jul 12, 2019

Yes, I used the following configuration to test successfully in the test environment, but not on a large scale in the production environment.

spark.rpc.askTimeout=120s
spark.rpc.io.connectionTimeout=130s

After the Driver closes the AM connection, the AM does not have a chance to reconnect , and the sparkcontext also stops.

finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)

Or should I take the minimum of three configuration items?

Later I discovered that this may also be a race condition.
RpcOutboxMessage#onTimeout call removeRpcRequest

public void addRpcRequest(long requestId, RpcResponseCallback callback) {
updateTimeOfLastRequest();
outstandingRpcs.put(requestId, callback);
}
public void removeRpcRequest(long requestId) {
outstandingRpcs.remove(requestId);
}

IdleStateHandler trigger IdleStateEvent
numOutstandingRequests>0 May contain rpc request that will be deleted by another thread.
isActuallyOverdue=true

synchronized (this) {
boolean hasInFlightRequests = responseHandler.numOutstandingRequests() > 0;
boolean isActuallyOverdue =
System.nanoTime() - responseHandler.getTimeOfLastRequestNs() > requestTimeoutNs;

I'm not sure if my example is too extreme, but it does happen in our environment.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 27, 2019
@github-actions github-actions bot closed this Dec 28, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants