Skip to content

Commit

Permalink
refact code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
shun01.ding committed Apr 1, 2024
1 parent d795246 commit fa3ca15
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.uniffle.client.response.RssFetchClientConfResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleHandleInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
Expand Down Expand Up @@ -488,4 +490,20 @@ protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf sparkCo
return new RemoteStorageInfo(
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), confItems);
}

protected static ShuffleHandleInfo getShuffleHandleInfo(
boolean rssResubmitStage,
boolean taskFailureRetryEnabled,
int shuffleId,
Map<Integer, List<ShuffleServerInfo>> partitionToServers,
RemoteStorageInfo storageInfo) {
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
shuffleHandleInfo = new ShuffleHandleInfo(shuffleId, partitionToServers, storageInfo);
}
return shuffleHandleInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,8 @@ public <K, V> ShuffleWriter<K, V> getWriter(

int shuffleId = rssHandle.getShuffleId();
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
shuffleHandleInfo =
new ShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
}
ShuffleHandleInfo shuffleHandleInfo = getShuffleHandleInfo(shuffleId,
rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
ShuffleWriteMetrics writeMetrics = context.taskMetrics().shuffleWriteMetrics();
return new RssShuffleWriter<>(
rssHandle.getAppId(),
Expand Down Expand Up @@ -549,17 +542,8 @@ public <K, C> ShuffleReader<K, C> getReader(
+ startPartition
+ "]");
start = System.currentTimeMillis();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
shuffleHandleInfo =
new ShuffleHandleInfo(
shuffleId,
rssShuffleHandle.getPartitionToServers(),
rssShuffleHandle.getRemoteStorage());
}
ShuffleHandleInfo shuffleHandleInfo = getShuffleHandleInfo(shuffleId,
rssShuffleHandle.getPartitionToServers(), rssShuffleHandle.getRemoteStorage());
Map<Integer, List<ShuffleServerInfo>> partitionToServers =
shuffleHandleInfo.getPartitionToServers();
Roaring64NavigableMap blockIdBitmap =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,15 +515,9 @@ public <K, V> ShuffleWriter<K, V> getWriter(
} else {
writeMetrics = context.taskMetrics().shuffleWriteMetrics();
}
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
shuffleHandleInfo =
new ShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
}
ShuffleHandleInfo shuffleHandleInfo =
getShuffleHandleInfo(
shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage());
String taskId = "" + context.taskAttemptId() + "_" + context.attemptNumber();
LOG.info("RssHandle appId {} shuffleId {} ", rssHandle.getAppId(), rssHandle.getShuffleId());
return new RssShuffleWriter<>(
Expand Down Expand Up @@ -657,17 +651,11 @@ public <K, C> ShuffleReader<K, C> getReaderImpl(
RssShuffleHandle<K, ?, C> rssShuffleHandle = (RssShuffleHandle<K, ?, C>) handle;
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions();
int shuffleId = rssShuffleHandle.getShuffleId();
ShuffleHandleInfo shuffleHandleInfo;
if (rssResubmitStage || taskFailureRetryEnabled) {
// Get the ShuffleServer list from the Driver based on the shuffleId
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId);
} else {
shuffleHandleInfo =
new ShuffleHandleInfo(
shuffleId,
rssShuffleHandle.getPartitionToServers(),
rssShuffleHandle.getRemoteStorage());
}
ShuffleHandleInfo shuffleHandleInfo =
getShuffleHandleInfo(
shuffleId,
rssShuffleHandle.getPartitionToServers(),
rssShuffleHandle.getRemoteStorage());
Map<Integer, List<ShuffleServerInfo>> allPartitionToServers =
shuffleHandleInfo.getPartitionToServers();
Map<Integer, List<ShuffleServerInfo>> requirePartitionToServers =
Expand Down

0 comments on commit fa3ca15

Please sign in to comment.