Skip to content

Commit

Permalink
[SPARK-36772] FinalizeShuffleMerge fails with an exception due to att…
Browse files Browse the repository at this point in the history
…empt id not matching

### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

Closes #34018 from zhouyejoe/SPARK-36772.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit cabc36b)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
zhouyejoe authored and gengliangwang committed Sep 18, 2021
1 parent 36ce9cc commit d4d8a63
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 13 deletions.
Expand Up @@ -427,11 +427,4 @@ public long mergedIndexCacheSize() {
public int ioExceptionsThresholdDuringMerge() {
return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
}

/**
* The application attemptID assigned from Hadoop YARN.
*/
public int appAttemptId() {
return conf.getInt("spark.app.attempt.id", -1);
}
}
Expand Up @@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable {

protected volatile TransportClientFactory clientFactory;
protected String appId;
// Store the application attemptId
private String appAttemptId;
protected TransportConf transportConf;

/**
Expand Down Expand Up @@ -124,6 +126,16 @@ protected void checkInit() {
assert appId != null : "Called before init()";
}

// Set the application attemptId
public void setAppAttemptId(String appAttemptId) {
this.appAttemptId = appAttemptId;
}

// Get the application attemptId
public String getAppAttemptId() {
return this.appAttemptId;
}

/**
* Request the local disk directories for executors which are located at the same host with
* the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).
Expand Down
Expand Up @@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;
// Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple
// application attempts. This variable is derived from the String typed appAttemptId. If no
// appAttemptId is set, the default comparableAppAttemptId is -1.
private int comparableAppAttemptId = -1;

/**
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
Expand Down Expand Up @@ -83,6 +87,26 @@ public void init(String appId) {
clientFactory = context.createClientFactory(bootstraps);
}

@Override
public void setAppAttemptId(String appAttemptId) {
super.setAppAttemptId(appAttemptId);
setComparableAppAttemptId(appAttemptId);
}

private void setComparableAppAttemptId(String appAttemptId) {
// For now, push based shuffle only supports running in YARN.
// Application attemptId in YARN is integer and it can be safely parsed
// to integer here. For the application attemptId from other cluster set up
// which is not numeric, it needs to generate this comparableAppAttemptId
// from the String typed appAttemptId through some other customized logic.
try {
this.comparableAppAttemptId = Integer.parseInt(appAttemptId);
} catch (NumberFormatException e) {
logger.warn("Push based shuffle requires comparable application attemptId, " +
"but the appAttemptId {} cannot be parsed to Integer", appAttemptId, e);
}
}

@Override
public void fetchBlocks(
String host,
Expand Down Expand Up @@ -146,7 +170,7 @@ public void pushBlocks(
assert inputListener instanceof BlockPushingListener :
"Expecting a BlockPushingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId,
new OneForOneBlockPusher(client, appId, comparableAppAttemptId, inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start();
} else {
logger.info("This clientFactory was closed. Skipping further block push retries.");
Expand Down Expand Up @@ -178,8 +202,8 @@ public void finalizeShuffleMerge(
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge =
new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId,
shuffleMergeId).toByteBuffer();
new FinalizeShuffleMerge(
appId, comparableAppAttemptId, shuffleId, shuffleMergeId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging {
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = _taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId))
_applicationAttemptId.foreach { attemptId =>
_conf.set(APP_ATTEMPT_ID, attemptId)
_env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
}
if (_conf.get(UI_REVERSE_PROXY)) {
val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
"/proxy/" + _applicationId
Expand Down
Expand Up @@ -466,7 +466,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)

// Set the application attemptId in the BlockStoreClient if available.
val appAttemptId = env.conf.get(APP_ATTEMPT_ID)
appAttemptId.foreach(attemptId =>
env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
)
val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
env.rpcEnv.setupEndpoint("Executor", backend)
arguments.workerUrl.foreach { url =>
Expand Down
12 changes: 12 additions & 0 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Expand Up @@ -1317,6 +1317,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}
}

test("SPARK-36772: Store application attemptId in BlockStoreClient for push based shuffle") {
val conf = new SparkConf().setAppName("testAppAttemptId")
.setMaster("pushbasedshuffleclustermanager")
conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true")
conf.set(IS_TESTING.key, "true")
conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
sc = new SparkContext(conf)
val env = SparkEnv.get
assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1"))
}

}

object SparkContextSuite {
Expand Down
Expand Up @@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends ExternalClusterManager {

override def createTaskScheduler(
sc: SparkContext,
masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true)
masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) {
override def applicationAttemptId(): Option[String] = Some("1")
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
val sc = scheduler.asInstanceOf[TaskSchedulerImpl]
Expand Down

0 comments on commit d4d8a63

Please sign in to comment.