-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#1579][part-1] fix(spark): Adjust reassigned time to ensure that all previous data is cleared for stage retry #1584
base: master
Are you sure you want to change the base?
Conversation
cc @dingshun3016 @yl09099 PTAL |
After rethinking this, I think the |
b192095
to
4d3a892
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1584 +/- ##
============================================
- Coverage 54.86% 53.42% -1.45%
- Complexity 2358 2943 +585
============================================
Files 368 435 +67
Lines 16379 23768 +7389
Branches 1504 2208 +704
============================================
+ Hits 8986 12697 +3711
- Misses 6862 10290 +3428
- Partials 531 781 +250 ☔ View full report in Codecov by Sentry. |
It's dangerous to delete the failed data of the stage when we retry. It's hard to reach the condition to delete the data. We should rely on the data skip to avoid reading the failure data. |
Could you describe more? |
There may be some tasks will write legacy data to the shuffle server after you delete the shuffle data. Because although we resubmit the stage, some tasks for last attempt may write the data. Spark doesn't guarantee that all tasks will be ended from last attempt although you have started the newest attempt. |
@EnricoMi If we have the retry of stage, the taskId may not unique. Because we don't have stage attemptId to differ task 1 attempt 0 in the stage attempt 0 and task 1 attempt 0 in the stage attempt 1. This may cause we read wrong data. |
If so, we'd better to reject the shuffle data of older version. This could be implemented by maintaining the latest staeg |
OK, Maybe rejection the legacy data will be better choice. |
Ignore this. Maybe rejection legacy data will be a better choice. |
@@ -158,6 +158,30 @@ public void registerShuffle( | |||
String remoteStoragePath = req.getRemoteStorage().getPath(); | |||
String user = req.getUser(); | |||
|
|||
if (req.getIsStageRetry()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If removeShuffleDataSync
is always being called, we can avoid adding plumbing isStateRetry
in here. When isStateRetry == false
, this is a NOOP.
Method removeShuffleDataSync
might return true
if it found data to delete, so we can conditionally log the message below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer reserving the isStageRetry
(or use stage attempt number to replace this) param for 2 reasons
- this is more explicit for stage retry, especially when something go wrong, like the previous data has been purged due to expire heartbeat. If having this, the log will indicate the abnormal problem happens
- for the next PR, I will introduce the stage latest attempt to discard the older attempt data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- all this plumbing for logging is peculiar
- maybe there are better mechanisms to discard older data
Using the latest attemtp id in server side to check whether the send request is valid with the older version, this will be finished in the next PR.
This has been involved in this PR. |
Can we register a shuffle as the tuple |
I think deletion of earlier shuffle data should not be synchronously in the first place! That is flawed by design. Think of TB of shuffle data. They should be deleted quickly / constant time (e.g. HDFS move) and cleaned up asynchronously (e.g. HDMF delete). |
Agree with you. I’m concerned about the cost of refactor. |
d922716
to
ae02409
Compare
import org.apache.uniffle.common.ShuffleServerInfo; | ||
import org.apache.uniffle.proto.RssProtos; | ||
|
||
public class ChainShuffleHandleInfo extends ShuffleHandleInfoBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming to StageAttemptShuffleHandleInfo
.build(); | ||
replicaServersProto.put(replicaServerEntry.getKey(), item); | ||
} | ||
Map<Integer, RssProtos.PartitionReplicaServers> partitionToServers = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing the synchronized
?
@Override | ||
public boolean reassignOnStageResubmit( | ||
int stageId, int stageAttemptNumber, int shuffleId, int numPartitions) { | ||
ReentrantReadWriteLock.WriteLock shuffleWriteLock = getShuffleWriteLock(shuffleId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this also could be added into the StageResubmitManager.
|
||
MutableShuffleHandleInfo reassignOnBlockSendFailure( | ||
ChainShuffleHandleInfo reassignOnBlockSendFailure( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be still as MutableShuffleHandleInfo
@@ -184,6 +184,7 @@ message ShuffleRegisterRequest { | |||
string user = 5; | |||
DataDistribution shuffleDataDistribution = 6; | |||
int32 maxConcurrencyPerPartitionToWrite = 7; | |||
int32 stageAttemptNumber = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be solved.
5c9d9e3
to
3dd6b34
Compare
a8b70cc
to
10bc42c
Compare
…at all previous data is cleared for stage retry
private LinkedList<ShuffleHandleInfo> historyHandles; | ||
|
||
public StageAttemptShuffleHandleInfo(ShuffleHandleInfo shuffleServerInfo) { | ||
super(0, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this have to be a ShuffleHandleInfo when this is 0
/ null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this have to be a ShuffleHandleInfo when this is
0
/null
?
This has been modified, the whole PR migrated to #1762.
What changes were proposed in this pull request?
clear out previous stage attempt data synchronously when registering the re-assignment shuffleIds.
Why are the changes needed?
Fix: #1579
If the previous stage attempt is in the purge queue in shuffle-server side, the retry stage writing will cause
unknown exceptions, so we'd better to clear out all previous stage attempt data before re-registering
This PR is to sync remove previous stage data when the first attempt writer is initialized.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing tests.