-
Notifications
You must be signed in to change notification settings - Fork 134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1373][FOLLOWUP] fix(spark): shuffle manager rpc service invalid when partition data reassign is enabled #1583
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #1583 +/- ##
============================================
+ Coverage 53.99% 54.85% +0.85%
- Complexity 2862 2864 +2
============================================
Files 438 418 -20
Lines 24819 22551 -2268
Branches 2111 2123 +12
============================================
- Hits 13402 12371 -1031
+ Misses 10578 9408 -1170
+ Partials 839 772 -67 ☔ View full report in Codecov by Sentry. |
Test Results 2 109 files - 231 2 109 suites - 231 3h 16m 19s ⏱️ - 1h 12m 4s Results for commit f28ca7d. ± Comparison against base commit cbf4f6f. This pull request removes 15 tests.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Task write fail retry is not related with stage recomputing, right?
Yes, they can run independently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also fix the spark2
@@ -137,6 +137,8 @@ public class RssShuffleManager extends RssShuffleManagerBase { | |||
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo; | |||
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */ | |||
private boolean rssResubmitStage; | |||
|
|||
private boolean taskFailRetry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskFailRetry -> taskReattemptOnFailures
Is is better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is only valid for task write failure, in the future, it could be applied on the not-enough-capacity disk servers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskFailureRetryEnabled
Could you add a ut for this case? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall. But after reviewing this part code again, I think we could extra this logic into a general method and put this into RssShuffleManagerBase
, which could be shared by spark2 + 3.
Could you help optimize this?
I will do it |
client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
Outdated
Show resolved
Hide resolved
public static final ConfigOption<Boolean> RSS_TASK_FAILED_RETRY_ENABLED = | ||
ConfigOptions.key("rss.task.failed.retry.enabled") | ||
public static final ConfigOption<Boolean> RSS_TASK_FAILURE_RETRY_ENABLED = | ||
ConfigOptions.key("rss.task.failure.retry.enabled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rss.task.failure.retry.enabled
-> rss.client.blockSendFailureRetry.enabled
@@ -112,7 +112,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { | |||
private final Set<Long> blockIds = Sets.newConcurrentHashSet(); | |||
private TaskContext taskContext; | |||
private SparkConf sparkConf; | |||
private boolean taskFailRetry; | |||
private boolean taskFailureRetryEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskFailureRetryEnabled
-> blockSendFailureRetryEnabled
@@ -137,6 +137,8 @@ public class RssShuffleManager extends RssShuffleManagerBase { | |||
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo; | |||
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */ | |||
private boolean rssResubmitStage; | |||
|
|||
private boolean taskFailureRetryEnabled; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskFailureRetryEnabled
-> taskBlockSendFailureRetry
@@ -239,10 +241,15 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { | |||
this.rssResubmitStage = | |||
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) | |||
&& RssSparkShuffleUtils.isStageResubmitSupported(); | |||
this.taskFailureRetryEnabled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED);
sparkConf.getBoolean( | ||
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX | ||
+ RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.key(), | ||
RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED.defaultValue()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And here you can introduce the new var to indicate whether we need to start shuffle manager rpc service.Like this:
this.shuffleManagerRpcServiceEnabled = taskBlockSendFailureRetry || rssResubmitStage
if (isDriver) { | ||
heartBeatScheduledExecutorService = | ||
ThreadUtils.getDaemonSingleThreadScheduledExecutor("rss-heartbeat"); | ||
if (rssResubmitStage) { | ||
if (rssResubmitStage || taskFailureRetryEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with if (shuffleManagerRpcServiceEnabled)
@@ -471,7 +478,7 @@ public <K, V, C> ShuffleHandle registerShuffle( | |||
|
|||
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions()); | |||
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length); | |||
if (rssResubmitStage) { | |||
if (rssResubmitStage || taskFailureRetryEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
new ShuffleHandleInfo( | ||
shuffleId, rssHandle.getPartitionToServers(), rssHandle.getRemoteStorage()); | ||
} | ||
ShuffleHandleInfo shuffleHandleInfo = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert this part
@@ -488,4 +490,20 @@ protected static RemoteStorageInfo getDefaultRemoteStorageInfo(SparkConf sparkCo | |||
return new RemoteStorageInfo( | |||
sparkConf.get(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(), ""), confItems); | |||
} | |||
|
|||
protected static ShuffleHandleInfo getShuffleHandleInfo( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this
fa3ca15
to
f28ca7d
Compare
@@ -222,11 +226,17 @@ public RssShuffleManager(SparkConf sparkConf, boolean isDriver) { | |||
this.rssResubmitStage = | |||
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) | |||
&& RssSparkShuffleUtils.isStageResubmitSupported(); | |||
this.taskBlockFailureRetryEnabled = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`this.taskBlockSendFailureRetry = rssConf.getBoolean(RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED);
`
@@ -137,6 +137,10 @@ public class RssShuffleManager extends RssShuffleManagerBase { | |||
private Map<Integer, ShuffleHandleInfo> shuffleIdToShuffleHandleInfo; | |||
/** Whether to enable the dynamic shuffleServer function rewrite and reread functions */ | |||
private boolean rssResubmitStage; | |||
|
|||
private boolean taskBlockSendFailureRetry; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskBlockSendFailureRetryEnabled
@@ -239,10 +243,16 @@ public RssShuffleManager(SparkConf conf, boolean isDriver) { | |||
this.rssResubmitStage = | |||
rssConf.getBoolean(RssClientConfig.RSS_RESUBMIT_STAGE, false) | |||
&& RssSparkShuffleUtils.isStageResubmitSupported(); | |||
this.taskBlockSendFailureRetry = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.taskBlockSendFailureRetryEnabled = rssConf.getBoolean(RssClientConf.RSS_TASK_FAILURE_RETRY_ENABLED);
@@ -203,10 +203,10 @@ public class RssClientConf { | |||
"This option is only valid when the remote storage path is specified. If ture, " | |||
+ "the remote storage conf will use the client side hadoop configuration loaded from the classpath."); | |||
|
|||
public static final ConfigOption<Boolean> RSS_TASK_FAILED_RETRY_ENABLED = | |||
ConfigOptions.key("rss.task.failed.retry.enabled") | |||
public static final ConfigOption<Boolean> RSS_CLIENT_BLOCK_SEND_FAILURE_RETRY_ENABLED = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rss.client.blockSendFailureRetry.enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rss.client.blockSendFailureRetry.enabled -> rss.client.blockSendFailureRetryEnabled.
And the spotless failure could be solved by the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for your contribution @dingshun3016
What changes were proposed in this pull request?
Fix task fail retry parameter not work and add task fail retry parameter in other locations
Why are the changes needed?
Fix: (#1373)
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Not necessary.