Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1787] feat(spark): Fine grained stage retry switch for fetch/write failure #1788

Merged
merged 2 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@

public class RssSparkConfig {

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_ENABLED =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see all the configs start with RSS_RESUBMIT_STAGE_, but the keys of them all start with rss.stageRetry.

Can we unify these two terms, "stage resubmit" and "stage retry"? We should use only one of them in our project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm... Yes. but this could be optimized in the final PR. I prefer stage retry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

ConfigOptions.key("rss.stageRetry.enabled")
.booleanType()
.defaultValue(false)
.withDeprecatedKeys(RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription("Whether to enable the resubmit stage for fetch/write failure");

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED =
ConfigOptions.key("rss.stageRetry.fetchFailureEnabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription(
"If set to true, the stage retry mechanism will be enabled when a fetch failure occurs.");

public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED =
ConfigOptions.key("rss.stageRetry.writeFailureEnabled")
.booleanType()
.defaultValue(false)
.withFallbackKeys(RSS_RESUBMIT_STAGE_ENABLED.key(), RssClientConfig.RSS_RESUBMIT_STAGE)
.withDescription(
"If set to true, the stage retry mechanism will be enabled when a write failure occurs.");

public static final ConfigOption<Boolean> RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED =
ConfigOptions.key("rss.blockId.selfManagementEnabled")
.booleanType()
Expand Down Expand Up @@ -404,13 +427,6 @@ public class RssSparkConfig {
.doc(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT.description()))
.createWithDefault(-1);

public static final ConfigEntry<Boolean> RSS_RESUBMIT_STAGE =
createBooleanBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_RESUBMIT_STAGE)
.internal()
.doc("Whether to enable the resubmit stage."))
.createWithDefault(false);

public static final ConfigEntry<Integer> RSS_MAX_PARTITIONS =
createIntegerBuilder(
new ConfigBuilder("spark.rss.blockId.maxPartitions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.uniffle.client.request.RssReportShuffleFetchFailureRequest;
import org.apache.uniffle.client.response.RssReportShuffleFetchFailureResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
Expand All @@ -57,6 +56,7 @@
import org.apache.uniffle.common.exception.RssFetchFailedException;
import org.apache.uniffle.common.util.Constants;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.uniffle.common.util.Constants.DRIVER_HOST;

public class RssSparkShuffleUtils {
Expand Down Expand Up @@ -353,7 +353,7 @@ public static RssException reportRssFetchFailedException(
int stageAttemptId,
Set<Integer> failedPartitions) {
RssConf rssConf = RssSparkConfig.toRssConf(sparkConf);
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
&& RssSparkShuffleUtils.isStageResubmitSupported()) {
String driver = rssConf.getString(DRIVER_HOST, "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ public abstract class RssShuffleManagerBase implements RssShuffleManagerInterfac

protected SparkConf sparkConf;
protected ShuffleManagerClient shuffleManagerClient;
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
protected boolean rssResubmitStage;
protected boolean rssStageRetryEnabled;
protected boolean rssStageRetryForWriteFailureEnabled;
protected boolean rssStageRetryForFetchFailureEnabled;
/**
* Mapping between ShuffleId and ShuffleServer list. ShuffleServer list is dynamically allocated.
* ShuffleServer is not obtained from RssShuffleHandle, but from this mapping.
Expand Down Expand Up @@ -1046,7 +1047,15 @@ protected RemoteStorageInfo getRemoteStorageInfo() {
appId, defaultRemoteStorage, dynamicConfEnabled, storageType, shuffleWriteClient);
}

public boolean isRssResubmitStage() {
return rssResubmitStage;
public boolean isRssStageRetryEnabled() {
return rssStageRetryEnabled;
}

public boolean isRssStageRetryForWriteFailureEnabled() {
return rssStageRetryForWriteFailureEnabled;
}

public boolean isRssStageRetryForFetchFailureEnabled() {
return rssStageRetryForFetchFailureEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.shuffle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
Expand All @@ -56,7 +58,6 @@

import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
Expand All @@ -74,6 +75,8 @@
import org.apache.uniffle.shuffle.manager.ShuffleManagerServerFactory;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED;
import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

Expand Down Expand Up @@ -165,13 +168,29 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
// shuffle cluster, we don't need shuffle data locality
sparkConf.set("spark.shuffle.reduceLocality.enabled", "false");
LOG.info("Disable shuffle data locality in RssShuffleManager.");
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();

// stage retry for write/fetch failure
rssStageRetryForFetchFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
rssStageRetryForWriteFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
if (rssStageRetryForFetchFailureEnabled || rssStageRetryForWriteFailureEnabled) {
rssStageRetryEnabled = true;
List<String> logTips = new ArrayList<>();
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("write");
}
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("fetch");
}
LOG.info(
"Activate the stage retry mechanism that will resubmit stage on {} failure",
StringUtils.join(logTips, "/"));
}
this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
partitionReassignEnabled || rssStageRetryEnabled || blockIdSelfManagedEnabled;
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
if (isDriver) {
heartBeatScheduledExecutorService =
Expand Down Expand Up @@ -334,7 +353,7 @@ public <K, V, C> ShuffleHandle registerShuffle(

shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
ShuffleHandleInfo handleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
StageAttemptShuffleHandleInfo stageAttemptShuffleHandleInfo =
Expand Down Expand Up @@ -406,7 +425,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
int shuffleId = rssHandle.getShuffleId();
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleHandleInfo shuffleHandleInfo;
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
Expand Down Expand Up @@ -479,7 +498,7 @@ public <K, C> ShuffleReader<K, C> getReader(
+ "]");
start = System.currentTimeMillis();
ShuffleHandleInfo shuffleHandleInfo;
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;

public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {

private static final Logger LOG = LoggerFactory.getLogger(RssShuffleReader.class);
Expand Down Expand Up @@ -231,7 +233,7 @@ public BoxedUnit apply() {
}

// stage re-compute and shuffle manager server port are both set
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
&& rssConf.getInteger(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT, 0) > 0) {
String driver = rssConf.getString("driver.host", "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import org.apache.uniffle.common.exception.RssWaitFailedException;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED;

public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {

private static final Logger LOG = LoggerFactory.getLogger(RssShuffleWriter.class);
Expand Down Expand Up @@ -238,7 +240,7 @@ public void write(Iterator<Product2<K, V>> records) {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
if (shuffleManager.isRssResubmitStage()) {
if (RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED)) {
throwFetchFailedIfNecessary(e);
} else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.shuffle;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -37,6 +38,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.MapOutputTracker;
import org.apache.spark.ShuffleDependency;
Expand Down Expand Up @@ -64,7 +66,6 @@
import org.apache.uniffle.client.PartitionDataReplicaRequirementTracking;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
Expand All @@ -84,6 +85,8 @@

import static org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED;
import static org.apache.uniffle.common.config.RssBaseConf.RPC_SERVER_PORT;
import static org.apache.uniffle.common.config.RssClientConf.MAX_CONCURRENCY_PER_PARTITION_TO_WRITE;

Expand Down Expand Up @@ -179,10 +182,28 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
LOG.info("Disable shuffle data locality in RssShuffleManager.");
taskToSuccessBlockIds = JavaUtils.newConcurrentMap();
taskToFailedBlockSendTracker = JavaUtils.newConcurrentMap();
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.partitionReassignEnabled = rssConf.getBoolean(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

this.rssStageRetryEnabled = rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);
this.partitionReassignEnabled = rssConf.get(RssClientConf.RSS_CLIENT_REASSIGN_ENABLED);

// stage retry for write/fetch failure
rssStageRetryForFetchFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED);
rssStageRetryForWriteFailureEnabled =
rssConf.get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED);
if (rssStageRetryForFetchFailureEnabled || rssStageRetryForWriteFailureEnabled) {
rssStageRetryEnabled = true;
List<String> logTips = new ArrayList<>();
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("write");
}
if (rssStageRetryForWriteFailureEnabled) {
logTips.add("fetch");
}
LOG.info(
"Activate the stage retry mechanism that will resubmit stage on {} failure",
StringUtils.join(logTips, "/"));
}

// The feature of partition reassign is exclusive with multiple replicas and stage retry.
if (partitionReassignEnabled) {
Expand All @@ -194,7 +215,7 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {

this.blockIdSelfManagedEnabled = rssConf.getBoolean(RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED);
this.shuffleManagerRpcServiceEnabled =
partitionReassignEnabled || rssResubmitStage || blockIdSelfManagedEnabled;
partitionReassignEnabled || rssStageRetryEnabled || blockIdSelfManagedEnabled;
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
Expand Down Expand Up @@ -448,7 +469,7 @@ public <K, V, C> ShuffleHandle registerShuffle(
startHeartbeat();
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
StageAttemptShuffleHandleInfo handleInfo =
Expand Down Expand Up @@ -492,7 +513,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
ShuffleHandleInfo shuffleHandleInfo;
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
Expand Down Expand Up @@ -636,7 +657,7 @@ public <K, C> ShuffleReader<K, C> getReaderImpl(
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
ShuffleHandleInfo shuffleHandleInfo;
if (shuffleManagerRpcServiceEnabled && rssResubmitStage) {
if (shuffleManagerRpcServiceEnabled && rssStageRetryEnabled) {
// In Stage Retry mode, Get the ShuffleServer list from the Driver based on the shuffleId.
shuffleHandleInfo = getRemoteShuffleHandleInfoWithStageRetry(shuffleId);
} else if (shuffleManagerRpcServiceEnabled && partitionReassignEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
import static org.apache.uniffle.common.util.Constants.DRIVER_HOST;

public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
Expand Down Expand Up @@ -189,7 +190,7 @@ public Void apply(TaskContext context) {
resultIter = new InterruptibleIterator<>(context, resultIter);
}
// resubmit stage and shuffle manager server port are both set
if (rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
if (rssConf.getBoolean(RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED)
&& rssConf.getInteger(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT, 0) > 0) {
String driver = rssConf.getString(DRIVER_HOST, "");
int port = rssConf.get(RssClientConf.SHUFFLE_MANAGER_GRPC_PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED;

public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {

Expand Down Expand Up @@ -277,7 +278,7 @@ public void write(Iterator<Product2<K, V>> records) {
writeImpl(records);
} catch (Exception e) {
taskFailureCallback.apply(taskId);
if (shuffleManager.isRssResubmitStage()) {
if (RssSparkConfig.toRssConf(sparkConf).get(RSS_RESUBMIT_STAGE_WITH_WRITE_FAILURE_ENABLED)) {
throwFetchFailedIfNecessary(e);
} else {
throw e;
Expand Down
Loading
Loading