-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ISSUE-124] Add fallback mechanism for blocks read inconsistent #276
Conversation
Codecov Report
@@ Coverage Diff @@
## master #276 +/- ##
============================================
+ Coverage 58.45% 58.57% +0.12%
Complexity 1570 1570
============================================
Files 193 192 -1
Lines 10833 10803 -30
Branches 951 942 -9
============================================
- Hits 6332 6328 -4
+ Misses 4127 4100 -27
- Partials 374 375 +1
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
What's the relation between this pr and #129 ? |
client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
Outdated
Show resolved
Hide resolved
...age/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryQuorumClientReadHandler.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
Outdated
Show resolved
Hide resolved
One question: Should we use the concept of |
So, what is your opinion? |
|
How about fallback() -> nextRound() and maxFallbackTimes -> maxRounds? |
If you have three replicas, every replica have memory, disk and hdfs. Whether maxRounds 3 is enough to read all the data? |
No guarantee. For example, the blocks is incomplete after first round, and than can't read from any shuffle server which store the missing blocks |
So I feel that |
I have another solution:
|
MaxFailureTime will tolerate the logic of replicas, this is my biggest concern. |
I don't understand |
For replica logic, if we use 7 replicas, we should read 4 replica successfully, but if maxFailure is 3 . Although we have 4 correct replicas, the application will fail because the app may read 3 wrong replicas. |
I mean |
It seems ok. |
# Conflicts: # client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java # client-spark/spark2/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java # client-spark/spark2/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java # client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java # client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java # client-spark/spark3/src/test/java/org/apache/spark/shuffle/reader/RssShuffleReaderTest.java
// Only for test | ||
public ShuffleServerInfo(String host, int port) { | ||
this.id = host + "-" + port; | ||
this.host = host; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little strange to add a constructor just for test, we can just use
new ShuffleServerInfo(host + "_" + String.valueOf(port), host, port)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For unification and convenience. If we don't do this, we need modify many uts.
if (CollectionUtils.isEmpty(request.getShuffleServerInfoList())) { | ||
throw new RuntimeException("Shuffle servers should not be empty!"); | ||
} | ||
if (request.getShuffleServerInfoList().size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jerqi, the current logic is too complicated.
It is better to use an unified code path (by the way, one server is a special case of multiple servers)
I prefer to add a global data structure in composed handler, may be called "progress".
It stores the information of consumed replicas and servers.
We could add the fallback in composed handler, and the each layer of handler can restart from the the last by reading the progress.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @xianjingfeng current implement is ok. We need a replicaHandler concept as a upper layer of composite handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to add a global data structure in composed handler, may be called "progress".
It stores the information of consumed replicas and servers.
We could add the fallback in composed handler, and the each layer of handler can restart from the the last by reading the progress.
This logic has the same problem as the previous version of this pr. If we read fail from the memory handler and read successful from the localfile handler. And then, the memory data flush to localfile and we read from memory again, some data maybe lost.
PTAL @jerqi |
...age/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
Outdated
Show resolved
Hide resolved
...age/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
Outdated
Show resolved
Hide resolved
storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleReadHandler.java
Outdated
Show resolved
Hide resolved
LGTM except for minor issues , cc @Gustfh Do you have another suggestion? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, let's wait for a moment. If Gus don't reply us, I'll merge this pr next Tuesday.
Merged. thanks all. |
### What changes were proposed in this pull request? Skip blocks which not in expected blockId range when read from memory. ### Why are the changes needed? 1.If we use AQE, every task will read data from all partitions. 2.If the data of the first shuffle server is incomplete, we need to read from another server if #276 is merged. Both of the above situations will lead to read redundant data from shuffle server. ### Does this PR introduce _any_ user-facing change? Set `rss.client.read.block.skip.strategy` to `BLOCKID_RANGE`. ### How was this patch tested? Already added
What changes were proposed in this pull request?
Add fallback mechanism for blocks read inconsistent
Why are the changes needed?
When the data in this first server is damaged, application will fail. #124 #129
Does this PR introduce any user-facing change?
No
How was this patch tested?
Already added