Skip to content

Commit

Permalink
[apache#1608][part-8] feat(spark3): add a limit to the number of retr…
Browse files Browse the repository at this point in the history
…ies when block access is denied
  • Loading branch information
shun01.ding committed May 15, 2024
1 parent a3461fb commit b683d1b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ public class RssSparkConfig {
.defaultValue(1)
.withDescription("The block retry max times when partition reassign is enabled.");

public static final ConfigOption<Integer>
RSS_PARTITION_REASSIGN_BLOCK_ACCESS_DENIED_RETRY_MAX_TIMES =
ConfigOptions.key("rss.client.reassign.blockAccessDeniedRetryMaxTimes")
.intType()
.defaultValue(1)
.withDescription(
"The block access denied retry max times when partition reassign is enabled.");

public static final String SPARK_RSS_CONFIG_PREFIX = "spark.";

public static final ConfigEntry<Integer> RSS_PARTITION_NUM_PER_RANGE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_ACCESS_DENIED_RETRY_MAX_TIMES;
import static org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES;

public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
Expand Down Expand Up @@ -122,6 +123,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> {
private SparkConf sparkConf;
private boolean blockFailSentRetryEnabled;
private int blockFailSentRetryMaxTimes = 1;
private int blockAccessDeniedRetryMaxTimes = 1;

/** used by columnar rss shuffle writer implementation */
protected final long taskAttemptId;
Expand Down Expand Up @@ -212,6 +214,9 @@ private RssShuffleWriter(
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue());
this.blockFailSentRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf).get(RSS_PARTITION_REASSIGN_BLOCK_RETRY_MAX_TIMES);
this.blockAccessDeniedRetryMaxTimes =
RssSparkConfig.toRssConf(sparkConf)
.get(RSS_PARTITION_REASSIGN_BLOCK_ACCESS_DENIED_RETRY_MAX_TIMES);
}

public RssShuffleWriter(
Expand Down Expand Up @@ -502,6 +507,11 @@ private void collectFailedBlocksToResend() {
.map(x -> x.getShuffleBlockInfo().getRetryCnt())
.max(Comparator.comparing(Integer::valueOf))
.get();
int accessDeniedRetryCnt =
failedBlockStatus.stream()
.map(x -> x.getShuffleBlockInfo().getAccessDeniedRetryCnt())
.max(Comparator.comparing(Integer::valueOf))
.get();
if (retryIndex >= blockFailSentRetryMaxTimes) {
LOG.error(
"Partial blocks for taskId: [{}] retry exceeding the max retry times: [{}]. Fast fail! faulty server list: {}",
Expand All @@ -513,6 +523,17 @@ private void collectFailedBlocksToResend() {
isFastFail = true;
break;
}
if (accessDeniedRetryCnt >= blockAccessDeniedRetryMaxTimes) {
LOG.error(
"Partial blocks for taskId: [{}] access denied retry exceeding the max retry times: [{}]. Fast fail! access denied server list: {}",
taskId,
blockAccessDeniedRetryMaxTimes,
failedBlockStatus.stream()
.map(x -> x.getShuffleServerInfo())
.collect(Collectors.toSet()));
isFastFail = true;
break;
}

for (TrackingBlockStatus status : failedBlockStatus) {
StatusCode code = status.getStatusCode();
Expand Down Expand Up @@ -626,7 +647,11 @@ private void reassignAndResendBlocks(Set<TrackingBlockStatus> blocks) {
// clear the previous retry state of block
clearFailedBlockState(block);
final ShuffleBlockInfo newBlock = block;
newBlock.incrRetryCnt();
if (StatusCode.ACCESS_DENIED.equals(blockStatus.getStatusCode())) {
newBlock.incrAccessDeniedRetryCnt();
} else {
newBlock.incrRetryCnt();
}
newBlock.reassignShuffleServers(Arrays.asList(replacement));
resendCandidates.add(newBlock);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ShuffleBlockInfo {
private int uncompressLength;
private long freeMemory;
private int retryCnt = 0;
private int accessDeniedRetryCnt = 0;

private transient BlockCompletionCallback completionCallback;

Expand Down Expand Up @@ -164,6 +165,14 @@ public int getRetryCnt() {
return retryCnt;
}

public void incrAccessDeniedRetryCnt() {
this.accessDeniedRetryCnt += 1;
}

public int getAccessDeniedRetryCnt() {
return accessDeniedRetryCnt;
}

public void reassignShuffleServers(List<ShuffleServerInfo> replacements) {
this.shuffleServerInfos = replacements;
}
Expand Down

0 comments on commit b683d1b

Please sign in to comment.