Skip to content

Commit

Permalink
[SPARK-35548][CORE][SHUFFLE] Handling new attempt has started error m…
Browse files Browse the repository at this point in the history
…essage in BlockPushErrorHandler in client

### What changes were proposed in this pull request?
Add a new type of error message in BlockPushErrorHandler which indicates the PushblockStream message is received after a new application attempt has started. This error message should be correctly handled in client without retrying the block push.

### Why are the changes needed?
When we get a block push failure because of the too old attempt, we will not retry pushing the block nor log the exception on the client side.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add the corresponding unit test.

Closes #33617 from zhuqi-lucas/master.

Authored-by: zhuqi-lucas <821684824@qq.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
zhuqi-lucas authored and Mridul Muralidharan committed Aug 16, 2021
1 parent 4dcd746 commit 05cd5f9
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 32 deletions.
Expand Up @@ -39,6 +39,14 @@ public class BlockPushNonFatalFailure extends RuntimeException {
public static final String TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX =
" is received after merged shuffle is finalized";

/**
* String constant used for generating exception messages indicating the application attempt is
* not the latest attempt on the server side. When we get a block push failure because of the too
* old attempt, we will not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_OLD_ATTEMPT_SUFFIX =
" is from an older app attempt";

/**
* String constant used for generating exception messages indicating a block to be merged
* is a stale block push in the case of indeterminate stage retries on the server side.
Expand Down Expand Up @@ -124,7 +132,12 @@ public enum ReturnCode {
* indeterminate stage retries. When the client receives this code, it will not retry
* pushing the block.
*/
STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX),
/**
* Indicate the application attempt is not the latest attempt on the server side.
* When the client gets this code, it will not retry pushing the block.
*/
TOO_OLD_ATTEMPT_PUSH(4, TOO_OLD_ATTEMPT_SUFFIX);

private final byte id;
// Error message suffix used to generate an error message for a given ReturnCode and
Expand All @@ -146,10 +159,17 @@ public static ReturnCode getReturnCode(byte id) {
case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
case 3: return ReturnCode.STALE_BLOCK_PUSH;
case 4: return ReturnCode.TOO_OLD_ATTEMPT_PUSH;
default: throw new IllegalArgumentException("Unknown block push return code: " + id);
}
}

public static boolean shouldNotRetryErrorCode(ReturnCode returnCode) {
return returnCode == ReturnCode.TOO_LATE_BLOCK_PUSH ||
returnCode == ReturnCode.STALE_BLOCK_PUSH ||
returnCode == ReturnCode.TOO_OLD_ATTEMPT_PUSH;
}

public static String getErrorMsg(String blockId, ReturnCode errorCode) {
Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
return "Block " + blockId + errorCode.errorMsgSuffix;
Expand Down
Expand Up @@ -249,8 +249,17 @@ public String getID() {
wrappedCallback.onComplete(wrappedCallback.getID());
}
} catch (Exception e) {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
if (e instanceof BlockPushNonFatalFailure) {
// Thrown by rpcHandler.receiveStream(reverseClient, meta, callback), the same as
// onComplete method. Respond an RPC message with the error code to client instead of
// using exceptions encoded in the RPCFailure. Using a proper RPCResponse is more
// efficient, and now only include the too old attempt case here.
respond(new RpcResponse(req.requestId,
new NioManagedBuffer(((BlockPushNonFatalFailure) e).getResponse())));
} else {
logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
}
// We choose to totally fail the channel, rather than trying to recover as we do in other
// cases. We don't know how many bytes of the stream the client has already sent for the
// stream, it's not worth trying to recover.
Expand Down
Expand Up @@ -87,10 +87,11 @@ public boolean shouldRetryError(Throwable t) {
return false;
}

// If the block is too late or stale block push, there is no need to retry it
// If the block is too late or the invalid block push or the attempt is not the latest one,
// there is no need to retry it
return !(t instanceof BlockPushNonFatalFailure &&
(((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH ||
((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
BlockPushNonFatalFailure
.shouldNotRetryErrorCode(((BlockPushNonFatalFailure) t).getReturnCode()));
}

@Override
Expand Down
Expand Up @@ -395,19 +395,17 @@ void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
@Override
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// If this Block belongs to a former application attempt, it is considered late,
// as only the blocks from the current application attempt will be merged
// TODO: [SPARK-35548] Client should be updated to handle this error.
throw new IllegalArgumentException(
String.format("The attempt id %s in this PushBlockStream message does not match "
+ "with the current attempt id %s stored in shuffle service for application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
// Use string concatenation here to avoid the overhead with String.format on every
// pushed block.
final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + "_"
+ msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + msg.reduceId;
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// If this Block belongs to a former application attempt, it is considered late,
// as only the blocks from the current application attempt will be merged
throw new BlockPushNonFatalFailure(new BlockPushReturnCode(ReturnCode
.TOO_OLD_ATTEMPT_PUSH.id(), streamId).toByteBuffer(),
BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_OLD_ATTEMPT_PUSH));
}
// Retrieve merged shuffle file metadata
AppShufflePartitionInfo partitionInfoBeforeCheck;
BlockPushNonFatalFailure failure = null;
Expand Down Expand Up @@ -513,12 +511,18 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
msg.shuffleId, msg.shuffleMergeId, msg.appId, msg.appAttemptId);
AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
if (appShuffleInfo.attemptId != msg.appAttemptId) {
// If this Block belongs to a former application attempt, it is considered late,
// as only the blocks from the current application attempt will be merged
// TODO: [SPARK-35548] Client should be updated to handle this error.
// If finalizeShuffleMerge from a former application attempt, it is considered late,
// as only the finalizeShuffleMerge request from the current application attempt
// will be merged. Too old app attempt only being seen by an already failed
// app attempt, and no need use callback to return to client now, because
// the finalizeShuffleMerge in DAGScheduler has no retry policy, and don't
// use the BlockPushNonFatalFailure because it's the finalizeShuffleMerge
// related case, not the block push case, just throw it in server side now.
// TODO we may use a new exception class to include the finalizeShuffleMerge
// related case just as the BlockPushNonFatalFailure contains the block push cases.
throw new IllegalArgumentException(
String.format("The attempt id %s in this FinalizeShuffleMerge message does not match "
+ "with the current attempt id %s stored in shuffle service for application %s",
+ "with the current attempt id %s stored in shuffle service for application %s",
msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
}
AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
Expand Down
Expand Up @@ -36,6 +36,8 @@ public void testErrorRetry() {
ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler();
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldRetryError(new RuntimeException(new ConnectException())));
Expand All @@ -53,6 +55,8 @@ public void testErrorLogging() {
ErrorHandler.BlockPushErrorHandler pushHandler = new ErrorHandler.BlockPushErrorHandler();
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
Expand Down
Expand Up @@ -111,6 +111,8 @@ public void testErrorLogging() {
ErrorHandler.BlockPushErrorHandler errorHandler = RemoteBlockPushResolver.createErrorHandler();
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
Expand Down Expand Up @@ -939,7 +941,7 @@ public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
}
}

@Test(expected = IllegalArgumentException.class)
@Test(expected = BlockPushNonFatalFailure.class)
public void testPushBlockFromPreviousAttemptIsRejected()
throws IOException, InterruptedException {
Semaphore closed = new Semaphore(0);
Expand Down Expand Up @@ -998,11 +1000,12 @@ void closeAndDeletePartitionFilesIfNeeded(
try {
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(testApp, 1, 0, 0, 1, 0, 0));
} catch (IllegalArgumentException re) {
assertEquals(
"The attempt id 1 in this PushBlockStream message does not match " +
"with the current attempt id 2 stored in shuffle service for application " +
testApp, re.getMessage());
} catch (BlockPushNonFatalFailure re) {
BlockPushReturnCode errorCode =
(BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(re.getResponse());
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_OLD_ATTEMPT_PUSH.id(),
errorCode.returnCode);
assertEquals(errorCode.failureBlockId, stream2.getID());
throw re;
}
}
Expand Down Expand Up @@ -1034,7 +1037,7 @@ public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
} catch (IllegalArgumentException e) {
assertEquals(e.getMessage(),
String.format("The attempt id %s in this FinalizeShuffleMerge message does not " +
"match with the current attempt id %s stored in shuffle service for application %s",
"match with the current attempt id %s stored in shuffle service for application %s",
ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
throw e;
}
Expand Down
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.server.BlockPushNonFatalFailure
import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
import org.apache.spark.network.shuffle.BlockPushingListener
import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
import org.apache.spark.network.util.TransportConf
Expand Down Expand Up @@ -78,12 +77,11 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) extends Logging {
if (t.getCause != null && t.getCause.isInstanceOf[FileNotFoundException]) {
return false
}
// If the block is too late or the invalid block push, there is no need to retry it
// If the block is too late or the invalid block push or the attempt is not the latest one,
// there is no need to retry it
!(t.isInstanceOf[BlockPushNonFatalFailure] &&
(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
== ReturnCode.TOO_LATE_BLOCK_PUSH ||
t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
== ReturnCode.STALE_BLOCK_PUSH))
BlockPushNonFatalFailure.
shouldNotRetryErrorCode(t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode));
}
}
}
Expand Down
Expand Up @@ -222,6 +222,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(
!errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
assert(
!errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
Expand All @@ -238,6 +241,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(
!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
assert(
!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.TOO_OLD_ATTEMPT_PUSH, "")))
assert(
!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
ReturnCode.STALE_BLOCK_PUSH, "")))
Expand Down

0 comments on commit 05cd5f9

Please sign in to comment.