Skip to content
Permalink
Browse files
HBASE-26210 HBase Write should be doomed to hang when cell size excee…
…ds InmemoryFlushSize for CompactingMemStore (#3604)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
comnetwork committed Sep 1, 2021
1 parent 3c64f86 commit 36884c3dff96140062277e65fbc7538b31b1a647
Showing 4 changed files with 304 additions and 65 deletions.
@@ -208,7 +208,7 @@ public MemStoreSnapshot snapshot() {
stopCompaction();
// region level lock ensures pushing active to pipeline is done in isolation
// no concurrent update operations trying to flush the active segment
pushActiveToPipeline(getActive());
pushActiveToPipeline(getActive(), true);
resetTimeOfOldestEdit();
snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
@@ -413,34 +413,62 @@ protected List<KeyValueScanner> createList(int capacity) {
}

/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
* For CompactingMemStore, flush the active set to the read-only memory if it's
* size is above threshold
* Check whether anything need to be done based on the current active set size. The method is
* invoked upon every addition to the active set. For CompactingMemStore, flush the active set to
* the read-only memory if it's size is above threshold
* @param currActive intended segment to update
* @param cellToAdd cell to be added to the segment
* @param memstoreSizing object to accumulate changed size
* @return true if the cell can be added to the
* @return true if the cell can be added to the currActive
*/
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
protected boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
.getColumnFamilyName());
}
getPool().execute(runnable);
long cellSize = MutableSegment.getCellLength(cellToAdd);
boolean successAdd = false;
while (true) {
long segmentDataSize = currActive.getDataSize();
if (!inWalReplay && segmentDataSize > inmemoryFlushSize) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
break;
}
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
}
successAdd = true;
break;
}
return false;
}
return true;
}

if (!inWalReplay && currActive.getDataSize() > inmemoryFlushSize) {
// size above flush threshold so we flush in memory
this.tryFlushInMemoryAndCompactingAsync(currActive);
}
return successAdd;
}

/**
* Try to flush the currActive in memory and submit the background
* {@link InMemoryCompactionRunnable} to
* {@link RegionServicesForStores#getInMemoryCompactionPool()}. Just one thread can do the actual
* flushing in memory.
* @param currActive current Active Segment to be flush in memory.
*/
private void tryFlushInMemoryAndCompactingAsync(MutableSegment currActive) {
if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace(
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
}
getPool().execute(runnable);
}
}
}

// externally visible only for tests
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
@@ -497,26 +525,6 @@ private ThreadPoolExecutor getPool() {
return getRegionServices().getInMemoryCompactionPool();
}

protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
long cellSize = MutableSegment.getCellLength(cellToAdd);
long segmentDataSize = currActive.getDataSize();
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
}
// enough space for cell - no need to flush
return false;
}
segmentDataSize = currActive.getDataSize();
}
// size above flush threshold
return true;
}

/**
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
* The compaction may still happen if the request was sent too late
@@ -528,10 +536,6 @@ private void stopCompaction() {
}
}

protected void pushActiveToPipeline(MutableSegment currActive) {
pushActiveToPipeline(currActive, true);
}

/**
* NOTE: When {@link CompactingMemStore#flushInMemory(MutableSegment)} calls this method, due to
* concurrent writes and because we first add cell size to currActive.getDataSize and then
@@ -229,7 +229,7 @@ public boolean flattenOneSegment(long requesterVersion,
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
if (s.isEmpty()) {
// after s.waitForUpdates() is called, there is no updates preceding,if no cells in s,
// after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
// we can skip it.
continue;
}
@@ -821,11 +821,11 @@ public void testFlatteningToJumboCellChunkMap() throws IOException {

// The in-memory flush size is bigger than the size of a single cell,
// but smaller than the size of two cells.
// Therefore, the two created cells are flattened together.
// Therefore, the two created cells are flushed together as a single CSLMImmutableSegment and
// flattened.
totalHeapSize = MutableSegment.DEEP_OVERHEAD
+ CellChunkImmutableSegment.DEEP_OVERHEAD_CCM
+ 1 * oneCellOnCSLMHeapSize
+ 1 * oneCellOnCCMHeapSize;
+ 2 * oneCellOnCCMHeapSize;
assertEquals(totalHeapSize, ((CompactingMemStore) memstore).heapSize());
}

0 comments on commit 36884c3

Please sign in to comment.