Skip to content

Commit

Permalink
[SPARK-32923][CORE][SHUFFLE] Handle indeterminate stage retries for p…
Browse files Browse the repository at this point in the history
…ush-based shuffle

### What changes were proposed in this pull request?
[[SPARK-23243](https://issues.apache.org/jira/browse/SPARK-23243)] and [[SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341)] addressed cases of stage retries for indeterminate stage involving operations like repartition. This PR addresses the same issues in the context of push-based shuffle. Currently there is no way to distinguish the current execution of a stage for a shuffle ID. Therefore the changes explained below are necessary.

Core changes are summarized as follows:

1. Introduce a new variable `shuffleMergeId` in `ShuffleDependency` which is monotonically increasing value tracking the temporal ordering of execution of <stage-id, stage-attempt-id> for a shuffle ID.
2. Correspondingly make changes in the push-based shuffle protocol layer in `MergedShuffleFileManager`, `BlockStoreClient` passing the `shuffleMergeId` in order to keep track of the shuffle output in separate files on the shuffle service side.
3. `DAGScheduler` increments the `shuffleMergeId` tracked in `ShuffleDependency` in the cases of a indeterministic stage execution
4. Deterministic stage will have `shuffleMergeId` set to 0 as no special handling is needed in this case and indeterminate stage will have `shuffleMergeId` starting from 1.

### Why are the changes needed?

New protocol changes are needed due to the reasons explained above.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
Added new unit tests in `RemoteBlockPushResolverSuite, DAGSchedulerSuite, BlockIdSuite, ErrorHandlerSuite`

Closes apache#33034 from venkata91/SPARK-32923.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
venkata91 authored and Mridul Muralidharan committed Aug 2, 2021
1 parent 2a18f82 commit c039d99
Show file tree
Hide file tree
Showing 39 changed files with 1,501 additions and 722 deletions.
Expand Up @@ -206,12 +206,15 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) {
*
* @param appId applicationId.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param reduceId reduce id.
* @param callback callback the handle the reply.
*/
public void sendMergedBlockMetaReq(
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId,
MergedBlockMetaResponseCallback callback) {
long requestId = requestId();
Expand All @@ -222,7 +225,8 @@ public void sendMergedBlockMetaReq(
handler.addRpcRequest(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
channel.writeAndFlush(
new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId)).addListener(listener);
new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId,
reduceId)).addListener(listener);
}

/**
Expand Down
Expand Up @@ -32,13 +32,20 @@ public class MergedBlockMetaRequest extends AbstractMessage implements RequestMe
public final long requestId;
public final String appId;
public final int shuffleId;
public final int shuffleMergeId;
public final int reduceId;

public MergedBlockMetaRequest(long requestId, String appId, int shuffleId, int reduceId) {
public MergedBlockMetaRequest(
long requestId,
String appId,
int shuffleId,
int shuffleMergeId,
int reduceId) {
super(null, false);
this.requestId = requestId;
this.appId = appId;
this.shuffleId = shuffleId;
this.shuffleMergeId = shuffleMergeId;
this.reduceId = reduceId;
}

Expand All @@ -49,36 +56,39 @@ public Type type() {

@Override
public int encodedLength() {
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4;
return 8 + Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
buf.writeLong(requestId);
Encoders.Strings.encode(buf, appId);
buf.writeInt(shuffleId);
buf.writeInt(shuffleMergeId);
buf.writeInt(reduceId);
}

public static MergedBlockMetaRequest decode(ByteBuf buf) {
long requestId = buf.readLong();
String appId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int shuffleMergeId = buf.readInt();
int reduceId = buf.readInt();
return new MergedBlockMetaRequest(requestId, appId, shuffleId, reduceId);
return new MergedBlockMetaRequest(requestId, appId, shuffleId, shuffleMergeId, reduceId);
}

@Override
public int hashCode() {
return Objects.hashCode(requestId, appId, shuffleId, reduceId);
return Objects.hashCode(requestId, appId, shuffleId, shuffleMergeId, reduceId);
}

@Override
public boolean equals(Object other) {
if (other instanceof MergedBlockMetaRequest) {
MergedBlockMetaRequest o = (MergedBlockMetaRequest) other;
return requestId == o.requestId && shuffleId == o.shuffleId && reduceId == o.reduceId
&& Objects.equal(appId, o.appId);
return requestId == o.requestId && shuffleId == o.shuffleId &&
shuffleMergeId == o.shuffleMergeId && reduceId == o.reduceId &&
Objects.equal(appId, o.appId);
}
return false;
}
Expand All @@ -89,6 +99,7 @@ public String toString() {
.append("requestId", requestId)
.append("appId", appId)
.append("shuffleId", shuffleId)
.append("shuffleMergeId", shuffleMergeId)
.append("reduceId", reduceId)
.toString();
}
Expand Down
Expand Up @@ -152,14 +152,14 @@ public MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
TransportClient reverseClient = mock(TransportClient.class);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, reverseClient,
rpcHandler, 2L, null);
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0);
MergedBlockMetaRequest validMetaReq = new MergedBlockMetaRequest(19, "app1", 0, 0, 0);
requestHandler.handle(validMetaReq);
assertEquals(1, responseAndPromisePairs.size());
assertTrue(responseAndPromisePairs.get(0).getLeft() instanceof MergedBlockMetaSuccess);
assertEquals(2,
((MergedBlockMetaSuccess) (responseAndPromisePairs.get(0).getLeft())).getNumChunks());

MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 1);
MergedBlockMetaRequest invalidMetaReq = new MergedBlockMetaRequest(21, "app1", -1, 0, 1);
requestHandler.handle(invalidMetaReq);
assertEquals(2, responseAndPromisePairs.size());
assertTrue(responseAndPromisePairs.get(1).getLeft() instanceof RpcFailure);
Expand Down
Expand Up @@ -167,6 +167,8 @@ public void pushBlocks(
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
Expand All @@ -175,6 +177,7 @@ public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
int shuffleMergeId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
Expand All @@ -185,6 +188,8 @@ public void finalizeShuffleMerge(
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param shuffleId shuffle id.
* @param shuffleMergeId shuffleMergeId is used to uniquely identify merging process
* of shuffle by an indeterminate stage attempt.
* @param reduceId reduce id.
* @param listener the listener to receive chunk counts.
*
Expand All @@ -194,6 +199,7 @@ public void getMergedBlockMeta(
String host,
int port,
int shuffleId,
int shuffleMergeId,
int reduceId,
MergedBlocksMetaListener listener) {
throw new UnsupportedOperationException();
Expand Down
Expand Up @@ -55,12 +55,14 @@ default boolean shouldLogError(Throwable t) {
class BlockPushErrorHandler implements ErrorHandler {
/**
* String constant used for generating exception messages indicating a block to be merged
* arrives too late on the server side, and also for later checking such exceptions on the
* client side. When we get a block push failure because of the block arrives too late, we
* will not retry pushing the block nor log the exception on the client side.
* arrives too late or stale block push in the case of indeterminate stage retries on the
* server side, and also for later checking such exceptions on the client side. When we get
* a block push failure because of the block push being stale or arrives too late, we will
* not retry pushing the block nor log the exception on the client side.
*/
public static final String TOO_LATE_MESSAGE_SUFFIX =
"received after merged shuffle is finalized";
public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
"received after merged shuffle is finalized or stale block push as shuffle blocks of a"
+ " higher shuffleMergeId for the shuffle is being pushed";

/**
* String constant used for generating exception messages indicating the server couldn't
Expand All @@ -81,25 +83,54 @@ class BlockPushErrorHandler implements ErrorHandler {
public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
"IOExceptions exceeded the threshold";

/**
* String constant used for generating exception messages indicating the server rejecting a
* shuffle finalize request since shuffle blocks of a higher shuffleMergeId for a shuffle is
* already being pushed. This typically happens in the case of indeterminate stage retries
* where if a stage attempt fails then the entirety of the shuffle output needs to be rolled
* back. For more details refer SPARK-23243, SPARK-25341 and SPARK-32923.
*/
public static final String STALE_SHUFFLE_FINALIZE_SUFFIX =
"stale shuffle finalize request as shuffle blocks of a higher shuffleMergeId for the"
+ " shuffle is already being pushed";

@Override
public boolean shouldRetryError(Throwable t) {
// If it is a connection time-out or a connection closed exception, no need to retry.
// If it is a FileNotFoundException originating from the client while pushing the shuffle
// blocks to the server, even then there is no need to retry. We will still log this exception
// once which helps with debugging.
// blocks to the server, even then there is no need to retry. We will still log this
// exception once which helps with debugging.
if (t.getCause() != null && (t.getCause() instanceof ConnectException ||
t.getCause() instanceof FileNotFoundException)) {
return false;
}
// If the block is too late, there is no need to retry it
return !Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);

String errorStackTrace = Throwables.getStackTraceAsString(t);
// If the block is too late or stale block push, there is no need to retry it
return !errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
}

@Override
public boolean shouldLogError(Throwable t) {
String errorStackTrace = Throwables.getStackTraceAsString(t);
return !errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) &&
!errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
return !(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
}
}

class BlockFetchErrorHandler implements ErrorHandler {
public static final String STALE_SHUFFLE_BLOCK_FETCH =
"stale shuffle block fetch request as shuffle blocks of a higher shuffleMergeId for the"
+ " shuffle is available";

@Override
public boolean shouldRetryError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}

@Override
public boolean shouldLogError(Throwable t) {
return !Throwables.getStackTraceAsString(t).contains(STALE_SHUFFLE_BLOCK_FETCH);
}
}
}

0 comments on commit c039d99

Please sign in to comment.