Skip to content

Commit

Permalink
[ISSUE-173][FOLLOWUP] The size of single buffer flush should reach rs…
Browse files Browse the repository at this point in the history
…s.server.flush.cold.storage.threshold.size (#180)

### What changes were proposed in this pull request?
follow #173, when single buffer flush is triggered, the buffer size should reach rss.server.flush.cold.storage.threshold.size, we keep the same logic as MultiStorageManager#selectStorageManager

### Why are the changes needed?
Make sure cold storage is valid

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Already added

Co-authored-by: leixianming <leixianming@didiglobal.com>
  • Loading branch information
leixm and leixianming committed Aug 23, 2022
1 parent e9fd373 commit e968c00
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public ShuffleDataResult getShuffleData(

void flushSingleBufferIfNecessary(ShuffleBuffer buffer, String appId,
int shuffleId, int startPartition, int endPartition) {
if (this.bufferFlushEnabled && buffer.getSize() >= this.bufferFlushThreshold) {
// When we use multistorage and trigger single buffer flush, the buffer size should be bigger
// than rss.server.flush.cold.storage.threshold.size, otherwise cold storage will be useless.
if (this.bufferFlushEnabled && buffer.getSize() > this.bufferFlushThreshold) {
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ public void flushSingleBufferTest() throws Exception {

shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(0, 32));
assertEquals(64, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(1, 32));
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, createData(1, 48));
waitForFlush(shuffleFlushManager, appId, shuffleId, 4);
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
Expand Down

0 comments on commit e968c00

Please sign in to comment.