Skip to content

Commit

Permalink
refact code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
shun01.ding committed Apr 1, 2024
1 parent d795246 commit f28ca7d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public class RssShuffleManager extends RssShuffleManagerBase {
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
private boolean rssResubmitStage;

private boolean taskFailureRetryEnabled;
private boolean taskBlockFailureRetryEnabled;

private boolean shuffleManagerRpcServiceEnabled;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds = Sets.newHashSet();
/**
Expand Down Expand Up @@ -224,16 +226,17 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskFailureRetryEnabled =
this.taskBlockFailureRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.defaultValue());
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
this.shuffleManagerRpcServiceEnabled = taskBlockFailureRetryEnabled || rssResubmitStage;
if (!sparkConf.getBoolean(RssSparkConfig.RSS_TEST_FLAG.key(), false)) {
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
rssConf.set(RPC_SERVER_PORT, 0);
Expand Down Expand Up @@ -383,7 +386,7 @@ public <K, V, C> ShuffleHandle registerShuffle(

shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
Expand Down Expand Up @@ -480,7 +483,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
int shuffleId = rssHandle.getShuffleId();
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
Expand Down Expand Up @@ -550,7 +553,7 @@ public <K, C> ShuffleReader<K, C> getReader(
+ "]");
start = System.currentTimeMillis();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ public class RssShuffleManager extends RssShuffleManagerBase {
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */
private boolean rssResubmitStage;

private boolean taskFailureRetryEnabled;
private boolean taskBlockSendFailureRetry;

private boolean shuffleManagerRpcServiceEnabled;
/** A list of shuffleServer for Write failures */
private Set<String> failuresShuffleServerIds;
/**
Expand Down Expand Up @@ -241,15 +243,16 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) {
this.rssResubmitStage =
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false)
&& RssSparkShuffleUtils.isStageResubmitSupported();
this.taskFailureRetryEnabled =
this.taskBlockSendFailureRetry =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.defaultValue());
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(),
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage;
if (isDriver) {
heartBeatScheduledExecutorService =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat");
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
LOG.info("stage resubmit is supported and enabled");
// start shuffle manager server
rssConf.set(RPC_SERVER_PORT, 0);
Expand Down Expand Up @@ -478,7 +481,7 @@ public <K, V, C> ShuffleHandle registerShuffle(

shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
ShuffleHandleInfo handleInfo =
new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);
shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo);
Expand Down Expand Up @@ -516,7 +519,7 @@ public <K, V> ShuffleWriter<K, V> getWriter(
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
Expand Down Expand Up @@ -658,7 +661,7 @@ public <K, C> ShuffleReader<K, C> getReaderImpl(
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
if (shuffleManagerRpcServiceEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private final Set<Long> blockIds = Sets.newConcurrentHashSet();
private TaskContext taskContext;
private SparkConf sparkConf;
private boolean taskFailureRetryEnabled;
private boolean blockSendFailureRetryEnabled;

/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;
Expand Down Expand Up @@ -189,7 +189,7 @@ private RssShuffleWriter(
this.taskFailureCallback = taskFailureCallback;
this.taskContext = context;
this.sparkConf = sparkConf;
this.taskFailureRetryEnabled =
this.blockSendFailureRetryEnabled =
sparkConf.getBoolean(
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX
+ RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.key(),
Expand Down Expand Up @@ -421,7 +421,7 @@ protected void checkBlockSendResult(Set<Long> blockIds) {

private void checkIfBlocksFailed() {
Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId);
if (taskFailureRetryEnabled && !failedBlockIds.isEmpty()) {
if (blockSendFailureRetryEnabled && !failedBlockIds.isEmpty()) {
Set<TrackingBlockStatus> shouldResendBlockSet = shouldResendBlockStatusSet(failedBlockIds);
try {
reSendFailedBlockIds(shouldResendBlockSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ public class RssClientConf {
"This option is only valid when the remote storage path is specified. If ture, "
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath.");

public static final ConfigOption<Boolean> RSS_TASK_FAILURE_RETRY_ENABLED =
ConfigOptions.key("rss.task.failure.retry.enabled")
public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED =
ConfigOptions.key("rss.client.block.send.failure.retry.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to support task write failed retry internal, default value is false.");
"Whether to support rss client block send failure retry, default value is false.");
}

0 comments on commit f28ca7d

Please sign in to comment.