Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1608][part-3] feat(spark3): support reading partition data from multiple reassignment servers #1615

Merged
merged 21 commits into from
Apr 17, 2024

Conversation

zuston
Copy link
Member

@zuston zuston commented Apr 2, 2024

What changes were proposed in this pull request?

Support reading from partition block data reassignment servers.

Why are the changes needed?

For: #1608

Writer has been writing data into reassignment servers, so it's necessary to read from reassignment servers.
And the blockId will be stored in their owned partition servers, so this PR can read blockIds from these servers and
support min-replica requirements at the same time.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

PartitionBlockDataReassignTest integration test.

@codecov-commenter
Copy link

codecov-commenter commented Apr 2, 2024

Codecov Report

Attention: Patch coverage is 66.96429% with 37 lines in your changes are missing coverage. Please review.

Project coverage is 54.97%. Comparing base (1051d26) to head (03f9293).
Report is 3 commits behind head on master.

Files Patch % Lines
...rg/apache/spark/shuffle/KryoSerializerWrapper.java 0.00% 15 Missing ⚠️
...he/uniffle/client/impl/ShuffleWriteClientImpl.java 0.00% 7 Missing ⚠️
...fle/shuffle/manager/ShuffleManagerGrpcService.java 0.00% 4 Missing ⚠️
.../response/RssPartitionToShuffleServerResponse.java 0.00% 4 Missing ⚠️
...va/org/apache/spark/shuffle/ShuffleHandleInfo.java 93.75% 2 Missing and 1 partial ⚠️
...lient/PartitionDataReplicaRequirementTracking.java 93.75% 2 Missing ⚠️
...he/uniffle/server/buffer/ShuffleBufferManager.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #1615      +/-   ##
============================================
+ Coverage     53.98%   54.97%   +0.98%     
+ Complexity     2872     2750     -122     
============================================
  Files           438      407      -31     
  Lines         24927    21215    -3712     
  Branches       2126     2014     -112     
============================================
- Hits          13456    11662    -1794     
+ Misses        10626     8812    -1814     
+ Partials        845      741     -104     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link

github-actions bot commented Apr 2, 2024

Test Results

 2 384 files  +21   2 384 suites  +21   4h 33m 18s ⏱️ + 1m 55s
   919 tests + 7     918 ✅ + 7   1 💤 ±0  0 ❌ ±0 
10 666 runs  +81  10 652 ✅ +81  14 💤 ±0  0 ❌ ±0 

Results for commit 297947e. ± Comparison against base commit 3ea3aaa.

♻️ This comment has been updated with latest results.

@zuston zuston changed the title [#1608][part-3] feat(spark3): support reading from partition block data reassignment servers [#1608][part-3] feat(spark3): support reading from partition multiple reassignment servers Apr 3, 2024
@zuston zuston changed the title [#1608][part-3] feat(spark3): support reading from partition multiple reassignment servers [#1608][part-3] feat(spark3): support reading from multiple reassignment servers Apr 3, 2024
@zuston zuston changed the title [#1608][part-3] feat(spark3): support reading from multiple reassignment servers [#1608][part-3] feat(spark3): support reading partition data from multiple reassignment servers Apr 8, 2024
import static org.junit.jupiter.api.Assertions.assertEquals;

/** This class is to test the mechanism of partition block data reassignment. */
public class PartitionBlockDataReassignTest extends SparkSQLTest {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is to test the reassign spark test.

@zuston
Copy link
Member Author

zuston commented Apr 8, 2024

PTAL @dingshun3016 @xumanbu @jerqi

}

replacement = faultyServerReplacements.get(faultyServerId);
replacement = faultyServerToReplacements.get(faultyServerId);
for (Integer partitionId : partitionIds) {
List<ShuffleServerInfo> replicaServers = partitionToServers.get(partitionId);
for (int i = 0; i < replicaServers.size(); i++) {
if (replicaServers.get(i).getId().equals(faultyServerId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There a little problem, when reassign server for the second time, faultyServerId is not in partitionToServers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, the server reassignment will only occur once time.

import org.apache.spark.serializer.KryoSerializer;
import org.apache.spark.serializer.SerializerInstance;

public class KryoSerializerWrapper {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't bind to KryoSeriliazerWrapper.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope the shuffleHandleInfo could be shared by the driver and executors by seriliazed bytes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other RPCs also be implemented in this manner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes similar implementations could be acted in shuffleManager rpc service, because the driver and executor are always bounded into the same version, no compatiblity problems.

Copy link
Member Author

@zuston zuston Apr 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been refactored to avoid de/serialization performance drop @jerqi

RemoteStorageInfo remote_storage_info = 3;
string msg = 4;
string msg = 2;
bytes shuffleHandleInfoSerializableBytes = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

@jerqi
Copy link
Contributor

jerqi commented Apr 9, 2024

There are some points which I cared:

  1. Do we support multiple times reassignments? Do we add some tests for this case?
  2. Do we work with multiple replicas well?

@zuston
Copy link
Member Author

zuston commented Apr 9, 2024

There are some points which I cared:

  1. Do we support multiple times reassignments? Do we add some tests for this case?

Multiple reassignments servers for one partition will be added in the next PR, which will be used by the following cases;

  1. reassign servers fails to send blocks and then reassign again for one partition
  2. partition is marked as a huge partition, which will be reassign to multiple servers at the first time, and then different tasks using different reassignment partition server by hash.
  1. Do we work with multiple replicas well?

This should be more tests. But in current stage, I will make multiple replicas and reassign exlusive by the extra check, which also will be added in the next PR.

Partition data reassignment is a huge change, so let's do this carefully and slowly

@@ -712,4 +712,8 @@ public boolean limitHugePartition(
}
return false;
}

public void setUsedMemory(long usedMemory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function should only be used for testing purposes ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@dingshun3016
Copy link
Contributor

In the same task, when the failed blocks resend successfully and the remaining blocks need to be sent, partitionToServers needs to be replaced with the last one in org.apache.spark.shuffle.writer.WriteBufferManager#createShuffleBlock

@zuston
Copy link
Member Author

zuston commented Apr 12, 2024

In the same task, when the failed blocks resend successfully and the remaining blocks need to be sent, partitionToServers needs to be replaced with the last one in org.apache.spark.shuffle.writer.WriteBufferManager#createShuffleBlock

Yes, this is on the plan, but i will not reuse the concept of partitionToServer. Please wait for the following PRs

jerqi
jerqi previously approved these changes Apr 14, 2024
Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks all.

@zuston zuston requested a review from jerqi April 15, 2024 02:00
@zuston
Copy link
Member Author

zuston commented Apr 15, 2024

CI failure has been fixed. PTAL @jerqi

@zuston
Copy link
Member Author

zuston commented Apr 16, 2024

Ping @jerqi

Set<ShuffleServerInfo> tempSet = new HashSet<>();
tempSet.addAll(replacements);
tempSet.removeAll(servers);
servers.addAll(tempSet);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will keep the faulty servers in servers, is this expected?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

The name of faulty servers here is not accurate, (But here I will not change this, maybe later.) because due to tight memory, the NO_BUFFER will be threw. For these cases, we'd better to reassign partition server to write data.

So the reserving these server is to fetch partial data when reading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we read from all servers until all blocks are read? Then we would not need to add replacement server on failure here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think I need to say the reassign occurs on the writer side. Sorry I still don’t catch your thought.

@zuston zuston merged commit 60fce8e into apache:master Apr 17, 2024
41 checks passed
@zuston
Copy link
Member Author

zuston commented Apr 17, 2024

Thanks @jerqi @dingshun3016 @xumanbu @EnricoMi for your review. Merged.

Comment on lines +47 to +50
Map<Integer, Integer> succeedReplicas = succeedList.get(partitionId);
if (succeedReplicas == null) {
succeedReplicas = new HashMap<>();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified:

    Map<Integer, Integer> succeedReplicas = succeedList.getOrDefault(partitionId, new HashMap<>());

succeedReplicas = new HashMap<>();
}

Map<Integer, List<ShuffleServerInfo>> replicaList = inventory.get(partitionId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if partitionId does not exist?

zuston added a commit that referenced this pull request May 9, 2024
)

### What changes were proposed in this pull request?

1. make the write client always use the latest available assignment for the following writing when the block reassign happens.
2. support multi time retry for partition reassign
3. limit the max reassign server num of one partition
4. refactor the reassign rpc
5. rename the faultyServer -> receivingFailureServer. 

#### Reassign whole process
![image](https://github.com/apache/incubator-uniffle/assets/8609142/8afa5386-be39-4ccb-9c10-95ffb3154939)

#### Always using the latest assignment

To acheive always using the latest assignment, I introduce the `TaskAttemptAssignment` to get the latest assignment for current task. The creating process of AddBlockEvent also will apply the latest assignment by `TaskAttemptAssignment` 

And it will be updated by the `reassignOnBlockSendFailure` rpc. 
That means the original reassign rpc response will be refactored and replaced by the whole latest `shuffleHandleInfo`.

### Why are the changes needed?

This PR is the subtask for #1608.

Leverging the #1615 / #1610 / #1609, we have implemented the reassign servers mechansim when write client encounters the server failure or unhealthy. But this is not good enough that will not share the faulty server state to the unstarted tasks and latter `AddBlockEvent` .

### Does this PR introduce _any_ user-facing change?

Yes. 

### How was this patch tested?

Unit and integration tests.

Integration tests as follows:
1. `PartitionBlockDataReassignBasicTest` to validate the reassign mechanism valid
2. `PartitionBlockDataReassignMultiTimesTest` is to test the partition reassign mechanism of multiple retries.

---------

Co-authored-by: Enrico Minack <github@enrico.minack.dev>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants