Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] Avoid selecting storage which has reached the high watermark #424

Merged
merged 20 commits into from
Dec 21, 2022

Conversation

zuston
Copy link
Member

@zuston zuston commented Dec 15, 2022

What changes were proposed in this pull request?

  1. Replace selecting storage every time with selection cache to avoid selection not being idempotent in some cases
  2. Avoid selecting storage which has reached the high watermark, which is based on above optimization

Why are the changes needed?

In current codebase, it's possible to select the local storage of reaching the high watermark in LocalStorageManager.

This strategy is unreasonable. And it makes many apps fallback to HDFS, because they select one high watermark storage.

Does this PR introduce any user-facing change?

No

How was this patch tested?

  1. UTs

@zuston zuston changed the title [Improvement] Avoid selecting storage when it reaches the high watermark [Improvement] Avoid selecting storage which has reached the high watermark Dec 15, 2022
try {
LocalStorage storage = partitionsOfStorage.get(appId).get(shuffleId).get(partitionId);
if (storage.isCorrupted()) {
throw new RuntimeException("LocalStorage: " + storage.getBasePath() + " is corrupted.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had remove this exception in #281, why put it back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont catch the point of removing this exception. Once storage is corrupted, there is no need to use next storage, because uniffle dont support dynamic storage switch, especially for a corrupted storage

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In currnet codebase. if one storage is corrupted, data will be written to another storage. In this case, we will lost some data of this replica, but client can still read some data. But if exception thrown here, all data will be drop. If we use multi replicas, it will be useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense for multiple replica.

But for single replica, it's not necessary. Moreover, if in single replica and enable fallback strategy, switching other storage will cause data lost. cc @jerqi

I prefer fast fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle ping @jerqi WDYT. Besides, after this PR, I think I will introduce the feature of dynamic switching the localdisk for one single, which will solve your multiple replica problem.

Copy link
Member Author

@zuston zuston Dec 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@advancedxy Due to this change, it will make diskErrorTest fail. So I remove it temporarily

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case:
In fact, I'm prefer to keep the old logic in this pr.
If we want to optimize with dynamic switching. we can introduce this change in that pr.

However, I think we should reconsider the corrupted cases. Once one replica of local storage is corrupted. The
whole partition could not be trusted. ShuffleReadClient should switch to another replica as soon as possible.
Maybe there's some logic I didn't follow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I missed this thread.

The whole partition could not be trusted. ShuffleReadClient should switch to another replica as soon as possible.
Maybe there's some logic I didn't follow?

Let me explain more detail about this. Firstly it need to be corrected that the disk corruption will influence the writing process in current case. Of course, it will cause the later reading if having multiple replica.

So if having single replica and using the MEMORY_LOCALFILE or LOCALFILE type, once disk is corrupted, which means the data lost, we should make relevant job fast fail. But in current codebase, it ignore this failure and retry and then fail. Of course, this will not cause big problems.

But when using MEMORY_LOCALFILE_HDFS and single replica, this will cause rest events flushing to fallback storage, like from localdisk to HDFS. Actually, this is unnecessary, as partial data lost.

Of course, this could be useful for multiple replica. But I think we could support this mechanism after dynamic selection, which will be better.

ShuffleReadClient should switch to another replica as soon as possible.

Yes. I think this is reasonable.

@codecov-commenter
Copy link

codecov-commenter commented Dec 16, 2022

Codecov Report

Merging #424 (6673783) into master (877b4ed) will increase coverage by 0.74%.
The diff coverage is 62.36%.

@@             Coverage Diff              @@
##             master     #424      +/-   ##
============================================
+ Coverage     58.43%   59.18%   +0.74%     
+ Complexity     1613     1493     -120     
============================================
  Files           195      183      -12     
  Lines         11063     9805    -1258     
  Branches        976      853     -123     
============================================
- Hits           6465     5803     -662     
+ Misses         4220     3651     -569     
+ Partials        378      351      -27     
Impacted Files Coverage Δ
...pache/uniffle/server/ShuffleServerGrpcService.java 0.80% <0.00%> (-0.01%) ⬇️
...he/uniffle/server/buffer/ShuffleBufferManager.java 82.74% <ø> (ø)
...rg/apache/uniffle/storage/common/LocalStorage.java 45.89% <ø> (+2.66%) ⬆️
...pache/uniffle/storage/common/LocalStorageMeta.java 74.28% <0.00%> (-1.45%) ⬇️
.../org/apache/uniffle/server/ShuffleTaskManager.java 74.66% <31.57%> (-2.57%) ⬇️
...rg/apache/uniffle/server/ShuffleDataReadEvent.java 90.00% <75.00%> (-10.00%) ⬇️
...he/uniffle/server/storage/LocalStorageManager.java 88.35% <85.71%> (-2.89%) ⬇️
.../main/java/org/apache/uniffle/common/UnionKey.java 87.50% <87.50%> (ø)
...org/apache/uniffle/storage/common/HdfsStorage.java 0.00% <0.00%> (ø)
...pache/hadoop/mapreduce/task/reduce/RssShuffle.java
... and 19 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();

private final Map<PartitionUnionKey, LocalStorage> partitionsOfStorage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it will occupy many memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our dashboard, there are only a few thousand partitions running at the same time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if we use the nested concurrent hashmap, I'm worry about the problem of thread safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our dashboard, there are only a few thousand partitions running at the same time

For large spark apps, it's common to have ~10K shuffle partitions and it's just one app.

However maybe we have not reached this kind of scale.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far, yes

@zuston zuston requested a review from jerqi December 16, 2022 06:53
private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();

private final Map<PartitionUnionKey, LocalStorage> partitionsOfStorage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From our dashboard, there are only a few thousand partitions running at the same time

For large spark apps, it's common to have ~10K shuffle partitions and it's just one app.

However maybe we have not reached this kind of scale.

@zuston
Copy link
Member Author

zuston commented Dec 16, 2022

Could you help review again? @advancedxy @jerqi @advancedxy

And I'm doing some multiple disk selection for one partition based on this. I think I could push this forward quickly.

@zuston
Copy link
Member Author

zuston commented Dec 18, 2022

Fix the potential bug of thread race condition. @advancedxy

(key, localStorage) -> {
// If this is the first time to select storage or existing storage is corrupted,
// we should refresh the cache.
if (localStorage == null || localStorage.isCorrupted()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a second thought, in which case localStorage would be isCorrupted? Since in L152 - L155, we would already throw an exception there?

Copy link
Member Author

@zuston zuston Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an event, if the storage is selected but event don't write any data to this storage (maybe the event is in pending queue), that means we could replace it to new storage, which won't cause data lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For an event, if the storage is selected but event don't write any data to this storage (maybe the event is in pending queue), that means we could replace it to new storage, which won't cause data lost.

This only make sense when L154-L156 doesn't throw an exception, right?

Copy link
Member Author

@zuston zuston Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm... I have removed the throw exception logic, so it has kept consistent with the original logic in this part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I know. What bothered me is that:

  • Previously L154-L156 throws an exception, I cannot image in which case, the localStorage.isCorrupted holds true
  • In latest code, this makes sense.

Did i miss something?

Copy link
Member Author

@zuston zuston Dec 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously L154-L156 throws an exception, I cannot image in which case, the localStorage.isCorrupted holds true

In the previous version commit, when localStorage.isCorrupted() == true and storage.containsWriteHandler(appId, shuffleId, partitionId) == false, the code will enter the part you mentioned.

Do I catch you thought?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Thanks for the clarifying.

@advancedxy
Copy link
Contributor

Left a minor comment, otherwise I'm ok with this PR.

@jerqi please take another look if you have time.

int[] range = ShuffleStorageUtils.getPartitionRange(partitionId, partitionNumPerRange, partitionNum);
Storage storage = storageManager.selectStorage(new ShuffleDataReadEvent(appId, shuffleId, partitionId, range[0]));
if (storage == null) {
throw new FileNotFoundException("No such data in current storage manager.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we throw a FileNotFoundException?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception of FileNotFoundException could be handled by getLocalShuffleIndex grpc layer, because maybe there is no local storage when directly flushing to HDFS.

@zuston zuston requested a review from jerqi December 21, 2022 07:18
@zuston
Copy link
Member Author

zuston commented Dec 21, 2022

Updated @jerqi

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @zuston @advancedxy , Let @advancedxy merge this pr.

@advancedxy advancedxy merged commit 5321292 into apache:master Dec 21, 2022
@advancedxy
Copy link
Contributor

Merged, thanks @zuston

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants