Skip to content

[#2111] improvement(client): cleanup the useless shuffle server clients when unregister shuffle#2112

Closed
xianjingfeng wants to merge 4 commits intoapache:masterfrom
xianjingfeng:issue_2111
Closed

[#2111] improvement(client): cleanup the useless shuffle server clients when unregister shuffle#2112
xianjingfeng wants to merge 4 commits intoapache:masterfrom
xianjingfeng:issue_2111

Conversation

@xianjingfeng
Copy link
Member

@xianjingfeng xianjingfeng commented Sep 11, 2024

What changes were proposed in this pull request?

Cleanup useless the shuffle server clients when unregister shuffle

Why are the changes needed?

Fix: #2111

Does this PR introduce any user-facing change?

No.

How was this patch tested?

UT

@github-actions
Copy link

github-actions bot commented Sep 11, 2024

Test Results

 3 034 files  +  152   3 034 suites  +152   6h 43m 35s ⏱️ + 46m 53s
 1 179 tests +  160   1 178 ✅ +  160   1 💤 ±0  0 ❌ ±0 
14 937 runs  +2 069  14 922 ✅ +2 069  15 💤 ±0  0 ❌ ±0 

Results for commit 8729ecd. ± Comparison against base commit d170004.

This pull request removes 30 and adds 190 tests. Note that renamed tests count towards both.
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile1{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile1{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile2{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile2{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile3{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile3{String, File}[2]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile4{String, File}[1]
org.apache.uniffle.common.records.RecordsReaderWriterTest ‑ testWriteAndReadRecordFile4{String, File}[2]
org.apache.uniffle.common.serializer.PartialInputStreamTest ‑ testReadFileInputStream
org.apache.uniffle.common.serializer.PartialInputStreamTest ‑ testReadMemroyInputStream
…
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteNormalWithRemoteMerge
org.apache.hadoop.mapred.SortWriteBufferManagerTest ‑ testWriteNormalWithRemoteMergeAndCombine
org.apache.hadoop.mapreduce.task.reduce.RMRssShuffleTest ‑ testReadShuffleWithCombine
org.apache.hadoop.mapreduce.task.reduce.RMRssShuffleTest ‑ testReadShuffleWithoutCombine
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testDefaultIncludeExcludeProperties
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testExcludeProperties
org.apache.spark.shuffle.DelegationRssShuffleManagerTest ‑ testIncludeProperties
org.apache.spark.shuffle.handle.MutableShuffleHandleInfoTest ‑ testUpdateAssignmentOnPartitionSplit
org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RMRssShuffleTest ‑ testReadMultiPartitionShuffleData
org.apache.tez.runtime.library.common.shuffle.orderedgrouped.RMRssShuffleTest ‑ testReadShuffleData
…

♻️ This comment has been updated with latest results.

@xianjingfeng xianjingfeng requested a review from roryqi September 13, 2024 10:16
@roryqi
Copy link
Contributor

roryqi commented Sep 13, 2024

What will it happen if we don't have this patch?

@xianjingfeng
Copy link
Member Author

xianjingfeng commented Sep 14, 2024

What will it happen if we don't have this patch?

If there are many stages in a application, the client will keep the clients for all shuffle servers, which will waste memory.

@roryqi
Copy link
Contributor

roryqi commented Sep 14, 2024

What will it happen if we don't have this patch?

If there are many stages in a application, the client will keep the clients for all shuffle servers, which will waste memory.

One server may be used by multiple stages. We can't remove it if only one stage is finished.

@xianjingfeng
Copy link
Member Author

What will it happen if we don't have this patch?

If there are many stages in a application, the client will keep the clients for all shuffle servers, which will waste memory.

One server may be used by multiple stages. We can't remove it if only one stage is finished.

Right, not all the shuffle servers will be deleted in this PR

@xianjingfeng xianjingfeng changed the title [#2111] improvement(client): cleanup shuffle server client when unregister shuffle [#2111] improvement(client): cleanup useless shuffle server client when unregister shuffle Sep 14, 2024
@xianjingfeng xianjingfeng changed the title [#2111] improvement(client): cleanup useless shuffle server client when unregister shuffle [#2111] improvement(client): cleanup useless the shuffle server clients when unregister shuffle Sep 14, 2024
@xianjingfeng xianjingfeng changed the title [#2111] improvement(client): cleanup useless the shuffle server clients when unregister shuffle [#2111] improvement(client): cleanup the useless shuffle server clients when unregister shuffle Sep 14, 2024
Copy link
Member

@zuston zuston left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm. I think we should have the more graceful way to close the shuffle-server clients.

For my sight, I hope the closing way should be optional, which can be implemented by the pluggable closable policies.

return serverToClients.get(shuffleServerInfo);
}

public synchronized void cleanUselessShuffleServerClients(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is is possible that the another thread to hold the removing client?

Copy link
Member Author

@xianjingfeng xianjingfeng Sep 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is impossible, if another thread hold the removing client, the shuffleServerInfo of the removing client will always be in shuffleServerInfoMap.

@xianjingfeng
Copy link
Member Author

I hope the closing way should be optional,

+1

which can be implemented by the pluggable closable policies.

What are the possible policies?

@roryqi
Copy link
Contributor

roryqi commented Dec 16, 2024

@zuston gently ping.

1 similar comment
@xianjingfeng
Copy link
Member Author

@zuston gently ping.

return serverToClients.get(shuffleServerInfo);
}

public synchronized void cleanUselessShuffleServerClients(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method impl is not straightforward, I hope the impl like the following code:

public synchronized void closeClients(List<ShuffleServerInfo> candidates) {
    // ....
  }

@zuston
Copy link
Member

zuston commented Apr 9, 2025

Sorry the late review and I think i missed this thread. @xianjingfeng

return serverToClients.get(shuffleServerInfo);
}

public synchronized void closeClients(Set<ShuffleServerInfo> candidates) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you misunderstand my thought. I hope the candidates are the deleted client candidates rather than being excluded clients.

public synchronized void closeClients(Set<ShuffleServerInfo> candidates) {
clients.remove_all(candidates)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need use synchronized outside if we find the candidates outside of this method. As follow

ShuffleServerClientFactory shuffleServerClientFactory = ShuffleServerClientFactory.getInstance();
synchronized (shuffleServerClientFactory) {
  Set<ShuffleServerInfo> candidates = shuffleServerClientFactory.findClosableClients(getAllShuffleServers());
  shuffleServerClientFactory.closeClients(candidates);
}

It feels worse than the original implementation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my thought, all clients are managed by the shuffleServerClientFactory, if in this class, we could maintain a structure to record the every client reference count num. if the client reference count is 0, then we could release it.

If so, the +1 operation could be happened on the getShuffleServerClient. And -1 should be happened on the removing in shuffleWriteClientImpl.

How about this way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, the +1 operation could be happened on the getShuffleServerClient. And -1 should be happened on the removing in shuffleWriteClientImpl.

I think this can't be achieved. To do this, we must invoke getShuffleServerClient before we invoke the RPC interface every time and -1 after each call to the RPC interface.

This reverts commit f6485f7.
Copy link
Contributor

@summaryzb summaryzb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about get the client through a guava cache with expire time setting, maybe it's more common without caring about whether the corresponding server is useless or availeble

@xianjingfeng
Copy link
Member Author

How about get the client through a guava cache with expire time setting, maybe it's more common without caring about whether the corresponding server is useless or availeble

This solution will have problems if the client is only retrieved from the cache once when it is used in the future.

@xianjingfeng
Copy link
Member Author

I think I should close this PR. Because I find ShuffleServerClientFactory#getShuffleServerClient needs to be optimized more than it. #2441

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement] cleanup shuffle server client when unregister shuffle

4 participants