-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#757] feat(server): separate flush thread pools for different storage types #775
Conversation
Codecov Report
@@ Coverage Diff @@
## master #775 +/- ##
============================================
+ Coverage 53.57% 54.78% +1.21%
- Complexity 2511 2518 +7
============================================
Files 378 359 -19
Lines 20528 18170 -2358
Branches 1773 1776 +3
============================================
- Hits 10997 9955 -1042
+ Misses 8835 7585 -1250
+ Partials 696 630 -66
... and 23 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -118,14 +123,24 @@ private void startEventProcessor() { | |||
processEventThread.start(); | |||
} | |||
|
|||
protected Executor createFlushEventExecutor() { | |||
protected void initFlushEventExecutor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't expose the concept of storageType in the FlushManager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
@@ -145,7 +160,14 @@ protected void eventLoop() { | |||
protected void processNextEvent() { | |||
try { | |||
ShuffleDataFlushEvent event = flushQueue.take(); | |||
threadPoolExecutor.execute(() -> processEvent(event)); | |||
Storage storage = storageManager.selectStorage(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks really strange for selecting storage here, which will select again before doing flush.
And the selected storage will be changed if writing failed. I think we'd better refactor flushing part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks really strange for selecting storage here, which will select again before doing flush.
And the selected storage will be changed if writing failed. I think we'd better refactor flushing part.
You're right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that the storageType should not be switched in the flushToFile method. When the writing fails or the storage is corrupted, the event should be put into pendingEvents, and flushToFile should be performed again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1.
It seems to be caused by concurrency issues, which cannot be reproduced in the development environment. |
Maybe you can read the logs of ut. Do you know how to find the logs of ut? |
@jerqi @smallzhongfeng PTAL. |
Yes, i know, already fixed. |
server/src/main/java/org/apache/uniffle/server/StorageTypeFlushEventHandler.java
Outdated
Show resolved
Hide resolved
@@ -587,6 +587,7 @@ public void shuffleFlushThreshold() throws Exception { | |||
shuffleBufferManager.cacheShuffleData(appId, smallShuffleId, false, createData(0, 31)); | |||
assertEquals(96 + 63, shuffleBufferManager.getUsedMemory()); | |||
shuffleFlushManager.flush(); | |||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to sleep here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java#createFlushEventExecutor method make here sync, so we do not sleep here before.
shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.toString()); | ||
shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath())); | ||
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L); | ||
shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_HDFS_THREAD_POOL_SIZE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can change it to default value now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
import org.apache.uniffle.storage.common.Storage; | ||
import org.apache.uniffle.storage.util.StorageType; | ||
|
||
public class StorageTypeFlushEventHandler implements FlushEventHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename to DefaultFlushEventHandler
?
LOG.error("Storage selected is null and this should not happen. event: {}", event); | ||
break; | ||
} | ||
Storage storage = storageManager.selectStorage(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still not satisfied with this selectStorage
again.
event.increaseRetryTimes(); | ||
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); | ||
event.markPended(); | ||
addPendingEvents(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about re-pushing to flushEventHandler
queue? I think the pending
queue could be removed, which is strange after this PR refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense.
public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions | ||
.key("rss.server.flush.threadPool.size") | ||
public static final ConfigOption<Integer> SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE = ConfigOptions | ||
.key("rss.server.flush.local-file.threadPool.size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rss.server.flush.local-file.threadPool.size
-> rss.server.flush.localfile.threadPool.size
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
.key("rss.server.flush.hdfs.threadPool.size") | ||
.intType() | ||
.defaultValue(10) | ||
.withDescription("thread pool for flush data to hdfs"); | ||
|
||
public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE = ConfigOptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not removing this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is useful to flush the waiting queue of the thread pool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's eliminate this pending queue and unify it with the event handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand.
private final ShuffleServerConf shuffleServerConf; | ||
private final ShuffleFlushManager shuffleFlushManager; | ||
private final StorageManager storageManager; | ||
private Executor localFileThreadPoolExecutor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we design a unified mechanism to avoid hard code ? That means we don't need to change this part when introducing a new storage type like object store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we design a unified mechanism to avoid hard code ? That means we don't need to change this part when introducing a new storage type like object store.
Do you have a better suggestion? We need to determine which thread pool to allocate according to the type of Storage. MultiStorageManager
also uses hard coding.
private final StorageManager warmStorageManager;
private final StorageManager coldStorageManager;
warmStorageManager = new LocalStorageManager(conf);
coldStorageManager = new HdfsStorageManager(conf);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet. OK. let's reserve current implementation.
|
||
protected Executor createFlushEventExecutor(int poolSize, String threadFactoryName) { | ||
int waitQueueSize = shuffleServerConf.getInteger( | ||
ShuffleServerConf.SERVER_FLUSH_THREAD_POOL_QUEUE_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See here @zuston
@zuston @smallzhongfeng PTAL. |
@jerqi PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -66,6 +66,7 @@ public void closeClient() { | |||
|
|||
@Test | |||
public void fallbackTest() throws Exception { | |||
REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to local variable maybe better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to local variable maybe better.
Other method use also.
We should add this change to uniffle-migration-guide.md. |
Ok. |
Done. |
LOG.error("Storage selected is null and this should not happen. event: {}", event); | ||
break; | ||
} | ||
Storage storage = event.getUnderStorage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When to set this storage
of event ?
// todo: Could we add an interface supportPending for storageManager | ||
// to unify following logic of multiple different storage managers | ||
if (event.getRetryTimes() <= retryMax) { | ||
if (event.isPended()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about removing this pending queue? WDYT @jerqi
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok.
} else if (event.getRetryTimes() <= retryMax) { | ||
event.increaseRetryTimes(); | ||
ShuffleServerMetrics.incStorageRetryCounter(storage.getStorageHost()); | ||
event.getUnderEventHandler().handle(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is wrong. That means once the storage is selected for one event, it will always try this, which will not give chance to change storage for event
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this re-pushing by itself looks strange.
The design looks not clear, the dependencies are complex. I think it should refactor like following steps
Plz let me know If I'm wrong. cc @jerqi I remember you want to remove the pending queue. How about doing this in this PR? |
break; | ||
} else { | ||
if (!storage.canWrite()) { | ||
// todo: Could we add an interface supportPending for storageManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the todo
?
@zuston Could you help this pr again? |
} | ||
|
||
protected void eventLoop() { | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have a variable to control whether stop the eventloop? Could we add the method stop
?
Yes, let me take a look in next few days |
@zuston PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm so sorry for this later review
LGTM. The refactor looks good to me . Please fix the conflict.
event.addCleanupCallback(() -> this.clearInFlushBuffer(event.getEventId())); | ||
event.addCleanupCallback(() -> { | ||
this.clearInFlushBuffer(event.getEventId()); | ||
spBlocks.forEach(spb -> spb.getData().release()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spBlocks.forEach(spb -> spb.getData().release());
This is for netty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My mistake.
@@ -181,6 +181,7 @@ public Storage selectStorage(ShuffleDataFlushEvent event) { | |||
storage.getBasePath(), event); | |||
} | |||
} else { | |||
event.setUnderStorage(storage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to check the storage is hdfs or localfile to put its corresponding thread pool ? Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#881 already add.
if (event.getUnderStorage() == null) {
event.setUnderStorage(storage);
}
I will fix later. |
I renamed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. let @zuston take another look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for your effort @leixm
What changes were proposed in this pull request?
Separate flush thread pools for different storage type.
Why are the changes needed?
Writing local files requires less concurrency, while writing hdfs requires more concurrency, it is best to separate thread pools.
Does this PR introduce any user-facing change?
Yes.
How was this patch tested?
existing UTs.