Skip to content

Commit

Permalink
[SPARK-36206][CORE] Support shuffle data corruption diagnosis via shu…
Browse files Browse the repository at this point in the history
…ffle checksum

### What changes were proposed in this pull request?

This PR adds support to diagnose shuffle data corruption. Basically, the diagnosis mechanism works like this:
The shuffler reader would calculate the checksum (c1) for the corrupted shuffle block and send it to the server where the block is stored. At the server, it would read back the checksum (c2) that is stored in the checksum file and recalculate the checksum (c3) for the corresponding shuffle block. Then, if c2 != c3, we suspect the corruption is caused by the disk issue. Otherwise, if c1 != c3, we suspect the corruption is caused by the network issue. Otherwise, the checksum verifies pass. In any case of the error, the cause remains unknown.

After the shuffle reader receives the diagnosis response, it'd take the action bases on the type of cause. Only in case of the network issue, we'd give a retry. Otherwise, we'd throw the fetch failure directly. Also note that, if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs. If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.

Please check out apache#32385 to see the completed proposal of the shuffle checksum project.

### Why are the changes needed?

Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not. With the diagnosis support for the shuffle corruption, Spark itself can at least distinguish the cause between disk and network, which is very important for users.

### Does this PR introduce _any_ user-facing change?

Yes, users may know the cause of the shuffle corruption after this change.

### How was this patch tested?

Added tests.

Closes apache#33451 from Ngone51/SPARK-36206.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit a98d919)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
  • Loading branch information
Ngone51 authored and JQ-Cao committed Sep 8, 2021
1 parent a760b8d commit 686d27a
Show file tree
Hide file tree
Showing 31 changed files with 995 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientFactory;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TransportConf;

/**
* Provides an interface for reading both shuffle files and RDD blocks, either from an Executor
Expand All @@ -46,6 +46,45 @@ public abstract class BlockStoreClient implements Closeable {

protected volatile TransportClientFactory clientFactory;
protected String appId;
protected TransportConf transportConf;

/**
* Send the diagnosis request for the corrupted shuffle block to the server.
*
* @param host the host of the remote node.
* @param port the port of the remote node.
* @param execId the executor id.
* @param shuffleId the shuffleId of the corrupted shuffle block
* @param mapId the mapId of the corrupted shuffle block
* @param reduceId the reduceId of the corrupted shuffle block
* @param checksum the shuffle checksum which calculated at client side for the corrupted
* shuffle block
* @return The cause of the shuffle block corruption
*/
public Cause diagnoseCorruption(
String host,
int port,
String execId,
int shuffleId,
long mapId,
int reduceId,
long checksum,
String algorithm) {
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer response = client.sendRpcSync(
new DiagnoseCorruption(appId, execId, shuffleId, mapId, reduceId, checksum, algorithm)
.toByteBuffer(),
transportConf.connectionTimeoutMs()
);
CorruptionCause cause =
(CorruptionCause) BlockTransferMessage.Decoder.fromByteBuffer(response);
return cause.cause;
} catch (Exception e) {
logger.warn("Failed to get the corruption cause.");
return Cause.UNKNOWN_ISSUE;
}
}

/**
* Fetch a sequence of blocks from a remote node asynchronously,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.network.shuffle;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -36,9 +35,9 @@
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Counter;
import com.google.common.collect.Sets;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.network.client.StreamCallbackWithID;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,6 +52,7 @@
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
Expand Down Expand Up @@ -243,6 +243,14 @@ protected void handleMessage(
} finally {
responseDelayContext.stop();
}
} else if (msgObj instanceof DiagnoseCorruption) {
DiagnoseCorruption msg = (DiagnoseCorruption) msgObj;
checkAuth(client, msg.appId);
Cause cause = blockManager.diagnoseShuffleBlockCorruption(
msg.appId, msg.execId, msg.shuffleId, msg.mapId, msg.reduceId, msg.checksum, msg.algorithm);
// In any cases of the error, diagnoseShuffleBlockCorruption should return UNKNOWN_ISSUE,
// so it should always reply as success.
callback.onSuccess(new CorruptionCause(cause).toByteBuffer());
} else {
throw new UnsupportedOperationException("Unexpected message: " + msgObj);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
public class ExternalBlockStoreClient extends BlockStoreClient {
private static final ErrorHandler PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();

private final TransportConf conf;
private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;
Expand All @@ -63,7 +62,7 @@ public ExternalBlockStoreClient(
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
long registrationTimeoutMs) {
this.conf = conf;
this.transportConf = conf;
this.secretKeyHolder = secretKeyHolder;
this.authEnabled = authEnabled;
this.registrationTimeoutMs = registrationTimeoutMs;
Expand All @@ -75,10 +74,11 @@ public ExternalBlockStoreClient(
*/
public void init(String appId) {
this.appId = appId;
TransportContext context = new TransportContext(conf, new NoOpRpcHandler(), true, true);
TransportContext context = new TransportContext(
transportConf, new NoOpRpcHandler(), true, true);
List<TransportClientBootstrap> bootstraps = Lists.newArrayList();
if (authEnabled) {
bootstraps.add(new AuthClientBootstrap(conf, appId, secretKeyHolder));
bootstraps.add(new AuthClientBootstrap(transportConf, appId, secretKeyHolder));
}
clientFactory = context.createClientFactory(bootstraps);
}
Expand All @@ -94,7 +94,7 @@ public void fetchBlocks(
checkInit();
logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
try {
int maxRetries = conf.maxIORetries();
int maxRetries = transportConf.maxIORetries();
RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
(inputBlockId, inputListener) -> {
// Unless this client is closed.
Expand All @@ -103,7 +103,7 @@ public void fetchBlocks(
"Expecting a BlockFetchingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port, maxRetries > 0);
new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
(BlockFetchingListener) inputListener, conf, downloadFileManager).start();
(BlockFetchingListener) inputListener, transportConf, downloadFileManager).start();
} else {
logger.info("This clientFactory was closed. Skipping further block fetch retries.");
}
Expand All @@ -112,7 +112,7 @@ public void fetchBlocks(
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
// a bug in this code. We should remove the if statement once we're sure of the stability.
new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, listener).start();
new RetryingBlockTransferor(transportConf, blockFetchStarter, blockIds, listener).start();
} else {
blockFetchStarter.createAndStart(blockIds, listener);
}
Expand Down Expand Up @@ -146,16 +146,16 @@ public void pushBlocks(
assert inputListener instanceof BlockPushingListener :
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start();
} else {
logger.info("This clientFactory was closed. Skipping further block push retries.");
}
};
int maxRetries = conf.maxIORetries();
int maxRetries = transportConf.maxIORetries();
if (maxRetries > 0) {
new RetryingBlockTransferor(
conf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
transportConf, blockPushStarter, blockIds, listener, PUSH_ERROR_HANDLER).start();
} else {
blockPushStarter.createAndStart(blockIds, listener);
}
Expand All @@ -178,7 +178,7 @@ public void finalizeShuffleMerge(
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge =
new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId,
new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId,
shuffleMergeId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@

import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.checksum.Cause;
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.LevelDBProvider.StoreVersion;
Expand Down Expand Up @@ -378,6 +380,29 @@ public Map<String, String[]> getLocalDirs(String appId, Set<String> execIds) {
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

/**
* Diagnose the possible cause of the shuffle data corruption by verifying the shuffle checksums
*/
public Cause diagnoseShuffleBlockCorruption(
String appId,
String execId,
int shuffleId,
long mapId,
int reduceId,
long checksumByReader,
String algorithm) {
ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
// This should be in sync with IndexShuffleBlockResolver.getChecksumFile
String fileName = "shuffle_" + shuffleId + "_" + mapId + "_0.checksum." + algorithm;
File checksumFile = ExecutorDiskUtils.getFile(
executor.localDirs,
executor.subDirsPerLocalDir,
fileName);
ManagedBuffer data = getBlockData(appId, execId, shuffleId, mapId, reduceId);
return ShuffleChecksumHelper.diagnoseCorruption(
algorithm, checksumFile, reduceId, data, checksumByReader);
}

/** Simply encodes an executor's full ID, which is appId + execId. */
public static class AppExecId {
public final String appId;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.checksum;

/**
* The cause of shuffle data corruption.
*/
public enum Cause {
DISK_ISSUE, NETWORK_ISSUE, UNKNOWN_ISSUE, CHECKSUM_VERIFY_PASS, UNSUPPORTED_CHECKSUM_ALGORITHM
}
Loading

0 comments on commit 686d27a

Please sign in to comment.