Skip to content

Commit

Permalink
[SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate com…
Browse files Browse the repository at this point in the history
…mon block push failures to the client

### What changes were proposed in this pull request?

We have run performance evaluations on the version of push-based shuffle committed to upstream so far, and have identified a few places for further improvements:
1. On the server side, we have noticed that the usage of `String.format`, especially when receiving a block push request, has a much higher overhead compared with string concatenation.
2. On the server side, the usage of `Throwables.getStackTraceAsString` in the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has generated quite some overhead.

These 2 issues are related to how we are currently handling certain common block push failures.
We are communicating such failures via `RPCFailure` by transmitting the exception stack trace.
This generates the overhead on both server and client side for creating these exceptions and makes checking the type of failures fragile and inefficient with string matching of exception stack trace.
To address these, this PR also proposes to encode the common block push failure as an error code and send that back to the client with a proper RPC message.

### Why are the changes needed?

Improve shuffle service efficiency for push-based shuffle.
Improve code robustness for handling block push failures.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit tests.

Closes #33613 from Victsm/SPARK-36378.

Lead-authored-by: Min Shen <mshen@linkedin.com>
Co-authored-by: Min Shen <victor.nju@gmail.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
2 people authored and Mridul Muralidharan committed Aug 10, 2021
1 parent c888bad commit 3f09093
Show file tree
Hide file tree
Showing 14 changed files with 547 additions and 187 deletions.
Expand Up @@ -17,6 +17,16 @@

package org.apache.spark.network.client;

import java.nio.ByteBuffer;

public interface StreamCallbackWithID extends StreamCallback {
String getID();

/**
* Response to return to client upon the completion of a stream. Currently only invoked in
* {@link org.apache.spark.network.server.TransportRequestHandler#processStreamUpload}
*/
default ByteBuffer getCompletionResponse() {
return ByteBuffer.allocate(0);
}
}
@@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.server;

import java.nio.ByteBuffer;

import com.google.common.base.Preconditions;

/**
* A special RuntimeException thrown when shuffle service experiences a non-fatal failure
* with handling block push requests with push-based shuffle. Due to the best-effort nature
* of push-based shuffle, there are cases where the exceptions gets thrown under certain
* relatively common cases such as when a pushed block is received after the corresponding
* shuffle is merge finalized or when a pushed block experiences merge collision. Under these
* scenarios, we throw this special RuntimeException.
*/
public class BlockPushNonFatalFailure extends RuntimeException {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late on the server side. When we get a block push failure because of the
* block arrives too late, we will not retry pushing the block nor log the exception on
* the client side.
*/
public static final String TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX =
" is received after merged shuffle is finalized";

/**
* String constant used for generating exception messages indicating a block to be merged
* is a stale block push in the case of indeterminate stage retries on the server side.
* When we get a block push failure because of the block push being stale, we will not
* retry pushing the block nor log the exception on the client side.
*/
public static final String STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
" is a stale block push from an indeterminate stage retry";

/**
* String constant used for generating exception messages indicating the server couldn't
* append a block after all available attempts due to collision with other blocks belonging
* to the same shuffle partition. When we get a block push failure because of the block
* couldn't be written due to this reason, we will not log the exception on the client side.
*/
public static final String BLOCK_APPEND_COLLISION_MSG_SUFFIX =
" experienced merge collision on the server side";

/**
* The error code of the failure, encoded as a ByteBuffer to be responded back to the client.
* Instead of responding a RPCFailure with the exception stack trace as the payload,
* which makes checking the content of the exception very tedious on the client side,
* we can respond a proper RPCResponse to make it more robust and efficient. This
* field is only set on the shuffle server side when the exception is originally generated.
*/
private ByteBuffer response;

/**
* The error code of the failure. This field is only set on the client side when a
* BlockPushNonFatalFailure is recreated from the error code received from the server.
*/
private ReturnCode returnCode;

public BlockPushNonFatalFailure(ByteBuffer response, String msg) {
super(msg);
this.response = response;
}

public BlockPushNonFatalFailure(ReturnCode returnCode, String msg) {
super(msg);
this.returnCode = returnCode;
}

/**
* Since this type of exception is used to only convey the error code, we reduce the
* exception initialization overhead by skipping filling the stack trace.
*/
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}

public ByteBuffer getResponse() {
// Ensure we do not invoke this method if response is not set
Preconditions.checkNotNull(response);
return response;
}

public ReturnCode getReturnCode() {
// Ensure we do not invoke this method if returnCode is not set
Preconditions.checkNotNull(returnCode);
return returnCode;
}

public enum ReturnCode {
/**
* Indicate the case of a successful merge of a pushed block.
*/
SUCCESS(0, ""),
/**
* Indicate a block to be merged arrives too late on the server side, i.e. after the
* corresponding shuffle has been merge finalized. When the client gets this code, it
* will not retry pushing the block.
*/
TOO_LATE_BLOCK_PUSH(1, TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX),
/**
* Indicating the server couldn't append a block after all available attempts due to
* collision with other blocks belonging to the same shuffle partition.
*/
BLOCK_APPEND_COLLISION_DETECTED(2, BLOCK_APPEND_COLLISION_MSG_SUFFIX),
/**
* Indicate a block received on the server side is a stale block push in the case of
* indeterminate stage retries. When the client receives this code, it will not retry
* pushing the block.
*/
STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);

private final byte id;
// Error message suffix used to generate an error message for a given ReturnCode and
// a given block ID
private final String errorMsgSuffix;

ReturnCode(int id, String errorMsgSuffix) {
assert id < 128 : "Cannot have more than 128 block push return code";
this.id = (byte) id;
this.errorMsgSuffix = errorMsgSuffix;
}

public byte id() { return id; }
}

public static ReturnCode getReturnCode(byte id) {
switch (id) {
case 0: return ReturnCode.SUCCESS;
case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
case 3: return ReturnCode.STALE_BLOCK_PUSH;
default: throw new IllegalArgumentException("Unknown block push return code: " + id);
}
}

public static String getErrorMsg(String blockId, ReturnCode errorCode) {
Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
return "Block " + blockId + errorCode.errorMsgSuffix;
}
}
Expand Up @@ -213,7 +213,15 @@ public void onData(String streamId, ByteBuffer buf) throws IOException {
public void onComplete(String streamId) throws IOException {
try {
streamHandler.onComplete(streamId);
callback.onSuccess(ByteBuffer.allocate(0));
callback.onSuccess(streamHandler.getCompletionResponse());
} catch (BlockPushNonFatalFailure ex) {
// Respond an RPC message with the error code to client instead of using exceptions
// encoded in the RPCFailure. This type of exceptions gets thrown more frequently
// than a regular exception on the shuffle server side due to the best-effort nature
// of push-based shuffle and requires special handling on the client side. Using a
// proper RPCResponse is more efficient.
callback.onSuccess(ex.getResponse());
streamHandler.onFailure(streamId, ex);
} catch (Exception ex) {
IOException ioExc = new IOException("Failure post-processing complete stream;" +
" failing this rpc and leaving channel active", ex);
Expand Down
Expand Up @@ -23,6 +23,9 @@
import com.google.common.base.Throwables;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.network.server.BlockPushNonFatalFailure;

import static org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode.*;

/**
* Plugs into {@link RetryingBlockTransferor} to further control when an exception should be retried
Expand Down Expand Up @@ -53,27 +56,6 @@ default boolean shouldLogError(Throwable t) {
* @since 3.1.0
*/
class BlockPushErrorHandler implements ErrorHandler {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late or stale block push in the case of indeterminate stage retries on the
* server side, and also for later checking such exceptions on the client side. When we get
* a block push failure because of the block push being stale or arrives too late, we will
* not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
"received after merged shuffle is finalized or stale block push as shuffle blocks of a"
+ " higher shuffleMergeId for the shuffle is being pushed";

/**
* String constant used for generating exception messages indicating the server couldn't
* append a block after all available attempts due to collision with other blocks belonging
* to the same shuffle partition, and also for later checking such exceptions on the client
* side. When we get a block push failure because of the block couldn't be written due to
* this reason, we will not log the exception on the client side.
*/
public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
"Couldn't find an opportunity to write block";

/**
* String constant used for generating exception messages indicating the server encountered
* IOExceptions multiple times, greater than the configured threshold, while trying to merged
Expand Down Expand Up @@ -105,16 +87,15 @@ public boolean shouldRetryError(Throwable t) {
return false;
}

String errorStackTrace = Throwables.getStackTraceAsString(t);
// If the block is too late or stale block push, there is no need to retry it
return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
return !(t instanceof BlockPushNonFatalFailure &&
(((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH ||
((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
}

@Override
public boolean shouldLogError(Throwable t) {
String errorStackTrace = Throwables.getStackTraceAsString(t);
return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
return !(t instanceof BlockPushNonFatalFailure);
}
}

Expand Down
Expand Up @@ -218,7 +218,7 @@ public void getMergedBlockMeta(
public void onSuccess(int numChunks, ManagedBuffer buffer) {
logger.trace("Successfully got merged block meta for shuffleId {} shuffleMergeId {}"
+ " reduceId {}", shuffleId, shuffleMergeId, reduceId);
listener.onSuccess(shuffleId, reduceId, shuffleMergeId,
listener.onSuccess(shuffleId, shuffleMergeId, reduceId,
new MergedBlockMeta(numChunks, buffer));
}

Expand Down
Expand Up @@ -21,13 +21,18 @@
import java.util.Arrays;
import java.util.Map;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.server.BlockPushNonFatalFailure;
import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;

/**
Expand Down Expand Up @@ -77,42 +82,58 @@ private class BlockPushCallback implements RpcResponseCallback {

@Override
public void onSuccess(ByteBuffer response) {
// On receipt of a successful block push
listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
BlockPushReturnCode pushResponse =
(BlockPushReturnCode) BlockTransferMessage.Decoder.fromByteBuffer(response);
// If the return code is not SUCCESS, the server has responded some error code. Handle
// the error accordingly.
ReturnCode returnCode = BlockPushNonFatalFailure.getReturnCode(pushResponse.returnCode);
if (returnCode != ReturnCode.SUCCESS) {
String blockId = pushResponse.failureBlockId;
Preconditions.checkArgument(!blockId.isEmpty());
checkAndFailRemainingBlocks(index, new BlockPushNonFatalFailure(returnCode,
BlockPushNonFatalFailure.getErrorMsg(blockId, returnCode)));
} else {
// On receipt of a successful block push
listener.onBlockPushSuccess(blockId, new NioManagedBuffer(ByteBuffer.allocate(0)));
}
}

@Override
public void onFailure(Throwable e) {
// Since block push is best effort, i.e., if we encounter a block push failure that's still
// retriable according to ErrorHandler (not a connection exception and the block is not too
// late), we should not fail all remaining block pushes even though
// RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
// count etc). The best effort nature makes block push tolerable of a partial completion.
// Thus, we only fail the block that's actually failed in this case. Note that, on the
// RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
// previous active retry listener, and retry pushing all outstanding blocks. However, since
// the blocks to be pushed are preloaded into memory and the first attempt of pushing these
// blocks might have already succeeded, retry pushing all the outstanding blocks should be
// very cheap (on the client side, the block data is in memory; on the server side, the block
// will be recognized as a duplicate which triggers noop handling). Here, by failing only the
// one block that's actually failed, we are essentially preventing forwarding unnecessary
// block push failures to the parent listener of the retry listener.
//
// Take the following as an example. For the common exception during block push handling,
// i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
// by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
// one block encountering this issue not the remaining blocks in the same batch. On the
// RetryingBlockTransferor side, since this exception is considered as not retriable, it
// would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
// blocks in the same batch would remain current and active and they won't be impacted by
// this exception.
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
failRemainingBlocks(targetBlockId, e);
} else {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length);
failRemainingBlocks(targetBlockId, e);
}
checkAndFailRemainingBlocks(index, e);
}
}

private void checkAndFailRemainingBlocks(int index, Throwable e) {
// Since block push is best effort, i.e., if we encounter a block push failure that's still
// retriable according to ErrorHandler (not a connection exception and the block is not too
// late), we should not fail all remaining block pushes even though
// RetryingBlockTransferor might consider this failure not retriable (exceeding max retry
// count etc). The best effort nature makes block push tolerable of a partial completion.
// Thus, we only fail the block that's actually failed in this case. Note that, on the
// RetryingBlockTransferor side, if retry is initiated, it would still invalidate the
// previous active retry listener, and retry pushing all outstanding blocks. However, since
// the blocks to be pushed are preloaded into memory and the first attempt of pushing these
// blocks might have already succeeded, retry pushing all the outstanding blocks should be
// very cheap (on the client side, the block data is in memory; on the server side, the block
// will be recognized as a duplicate which triggers noop handling). Here, by failing only the
// one block that's actually failed, we are essentially preventing forwarding unnecessary
// block push failures to the parent listener of the retry listener.
//
// Take the following as an example. For the common exception during block push handling,
// i.e. block collision, it is considered as retriable by ErrorHandler but not retriable
// by RetryingBlockTransferor. When we encounter a failure of this type, we only fail the
// one block encountering this issue not the remaining blocks in the same batch. On the
// RetryingBlockTransferor side, since this exception is considered as not retriable, it
// would immediately invoke parent listener's onBlockTransferFailure. However, the remaining
// blocks in the same batch would remain current and active and they won't be impacted by
// this exception.
if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
failRemainingBlocks(targetBlockId, e);
} else {
String[] targetBlockId = Arrays.copyOfRange(blockIds, index, blockIds.length);
failRemainingBlocks(targetBlockId, e);
}
}

Expand Down

0 comments on commit 3f09093

Please sign in to comment.