-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1373][FOLLOWUP] fix(spark): shuffle manager rpc service invalid when partition data reassign is enabled #1583
Changes from 4 commits
81e129c
c0a09ba
d795246
f28ca7d
c79ef03
f488c59
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,6 +137,10 @@ public class RssShuffleManager extends RssShuffleManagerBase { | |
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo; | ||
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */ | ||
private boolean rssResubmitStage; | ||
|
||
private boolean taskBlockSendFailureRetry; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. taskBlockSendFailureRetryEnabled |
||
|
||
private boolean shuffleManagerRpcServiceEnabled; | ||
/** A list of shuffleServer for Write failures */ | ||
private Set<String> failuresShuffleServerIds; | ||
/** | ||
|
@@ -239,10 +243,16 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { | |
this.rssResubmitStage = | ||
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) | ||
&& RssSparkShuffleUtils.isStageResubmitSupported(); | ||
this.taskBlockSendFailureRetry = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this.taskBlockSendFailureRetryEnabled = rssConf.getBoolean(RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED); |
||
sparkConf.getBoolean( | ||
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX | ||
+ RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.key(), | ||
RssClientConf.RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED.defaultValue()); | ||
this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage; | ||
if (isDriver) { | ||
heartBeatScheduledExecutorService = | ||
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat"); | ||
if (rssResubmitStage) { | ||
if (shuffleManagerRpcServiceEnabled) { | ||
LOG.info("stage resubmit is supported and enabled"); | ||
// start shuffle manager server | ||
rssConf.set(RPC_SERVER_PORT, 0); | ||
|
@@ -471,7 +481,7 @@ public <K, V, C> ShuffleHandle registerShuffle( | |
|
||
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions()); | ||
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length); | ||
if (rssResubmitStage) { | ||
if (shuffleManagerRpcServiceEnabled) { | ||
ShuffleHandleInfo handleInfo = | ||
new ShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage); | ||
shuffleIdToShuffleHandleInfo.put(shuffleId, handleInfo); | ||
|
@@ -509,7 +519,7 @@ public <K, V> ShuffleWriter<K, V> getWriter( | |
writeMetrics = context.taskMetrics().shuffleWriteMetrics(); | ||
} | ||
ShuffleHandleInfo shuffleHandleInfo; | ||
if (rssResubmitStage) { | ||
if (shuffleManagerRpcServiceEnabled) { | ||
// Get the ShuffleServer list from the Driver based on the shuffleId | ||
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId); | ||
} else { | ||
|
@@ -651,7 +661,7 @@ public <K, C> ShuffleReader<K, C> getReaderImpl( | |
final int partitionNum = rssShuffleHandle.getDependency().partitioner().numPartitions(); | ||
int shuffleId = rssShuffleHandle.getShuffleId(); | ||
ShuffleHandleInfo shuffleHandleInfo; | ||
if (rssResubmitStage) { | ||
if (shuffleManagerRpcServiceEnabled) { | ||
// Get the ShuffleServer list from the Driver based on the shuffleId | ||
shuffleHandleInfo = getRemoteShuffleHandleInfo(shuffleId); | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -203,10 +203,10 @@ public class RssClientConf { | |
"This option is only valid when the remote storage path is specified. If ture, " | ||
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath."); | ||
|
||
public static final ConfigOption<Boolean> RSS_TASK_FAILED_RETRY_ENABLED = | ||
ConfigOptions.key("rss.task.failed.retry.enabled") | ||
public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rss.client.blockSendFailureRetry.enabled There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rss.client.blockSendFailureRetry.enabled -> rss.client.blockSendFailureRetryEnabled. |
||
ConfigOptions.key("rss.client.block.send.failure.retry.enabled") | ||
.booleanType() | ||
.defaultValue(false) | ||
.withDescription( | ||
"Whether to support task write failed retry internal, default value is false."); | ||
"Whether to support rss client block send failure retry, default value is false."); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED);
`