Skip to content
Permalink
Browse files
Fix force GC doesn't work under forceAllowCompaction when disk is full (
#3205)

## Motivation
When I set `forceAllowCompaction=true` and one ledger disk reaches max usage threshold and transfer bookie to readOnly mode, I expire some pulsar topics or delete some topics to free up disk space. I found that ledger compression cannot be triggered when using `curl -XPUT http://localhost:8000/api/v1/bookie/gc` command.

The root cause is that when one ledger disk reaches max usage threshold, it will suspend minor and major compaction
https://github.com/apache/bookkeeper/blob/f7579fd13d62ce630ea26638e73f5884da505ec8/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java#L1041-L1058

When we use `curl -XPUT http://localhost:8000/api/v1/bookie/gc` command to trigger compaction, it will be filtered by `suspendMajor` and `suspendMinor` flag.  

https://github.com/apache/bookkeeper/blob/f7579fd13d62ce630ea26638e73f5884da505ec8/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java#L416-L444

It will lead to 
- The bookie won't clean up deleted ledgers 
- Ledger disk can't free up disk usage
- Bookie can't recover from readOnly state into Writeable state.

And then we can only trigger compaction by the following steps.
- Increase max disk usage threshold
- Restart the bookie
- Use command `curl -XPUT http://localhost:8000/api/v1/bookie/gc` to trigger compaction

### Changes
1. Don't take the `suspendMajor` and `suspendMinor` flag into consideration when setting `forceAllowCompaction=true` and triggered by force GC.
  • Loading branch information
hangc0276 committed Apr 20, 2022
1 parent 16c9766 commit f0ff353d02ec56715d671040717870ec5f1a7630
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 1 deletion.
@@ -294,6 +294,15 @@ public void enableForceGC() {
}
}

public void enableForceGC(Boolean forceMajor, Boolean forceMinor) {
if (forceGarbageCollection.compareAndSet(false, true)) {
LOG.info("Forced garbage collection triggered by thread: {}, forceMajor: {}, forceMinor: {}",
Thread.currentThread().getName(), forceMajor, forceMinor);
triggerGC(true, forceMajor == null ? suspendMajorCompaction.get() : !forceMajor,
forceMinor == null ? suspendMinorCompaction.get() : !forceMinor);
}
}

public void disableForceGC() {
if (forceGarbageCollection.compareAndSet(true, false)) {
LOG.info("{} disabled force garbage collection since bookie has enough space now.", Thread
@@ -268,6 +268,11 @@ public void forceGC() {
gcThread.enableForceGC();
}

@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
gcThread.enableForceGC(forceMajor, forceMinor);
}

@Override
public boolean isInForceGC() {
return gcThread.isInForceGC();
@@ -230,6 +230,13 @@ default void forceGC() {
return;
}

/**
* Force trigger Garbage Collection with forceMajor or forceMinor parameter.
*/
default void forceGC(Boolean forceMajor, Boolean forceMinor) {
return;
}

/**
* Class for describing location of a generic inconsistency. Implementations should
* ensure that detail is populated with an exception which adequately describes the
@@ -370,6 +370,11 @@ public void forceGC() {
interleavedLedgerStorage.forceGC();
}

@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
interleavedLedgerStorage.forceGC(forceMajor, forceMinor);
}

@Override
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
return interleavedLedgerStorage.localConsistencyCheck(rateLimiter);
@@ -403,6 +403,11 @@ public void forceGC() {
ledgerStorageList.stream().forEach(SingleDirectoryDbLedgerStorage::forceGC);
}

@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
ledgerStorageList.stream().forEach(s -> s.forceGC(forceMajor, forceMinor));
}

@Override
public boolean isInForceGC() {
return ledgerStorageList.stream().anyMatch(SingleDirectoryDbLedgerStorage::isInForceGC);
@@ -249,6 +249,11 @@ public void forceGC() {
gcThread.enableForceGC();
}

@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
gcThread.enableForceGC(forceMajor, forceMinor);
}

@Override
public boolean isInForceGC() {
return gcThread.isInForceGC();
@@ -20,6 +20,8 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.common.util.JsonUtil;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.http.HttpServer;
@@ -61,7 +63,16 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
HttpServiceResponse response = new HttpServiceResponse();

if (HttpServer.Method.PUT == request.getMethod()) {
bookieServer.getBookie().getLedgerStorage().forceGC();
String requestBody = request.getBody();
if (null == requestBody) {
bookieServer.getBookie().getLedgerStorage().forceGC();
} else {
@SuppressWarnings("unchecked")
Map<String, Object> configMap = JsonUtil.fromJson(requestBody, HashMap.class);
Boolean forceMajor = (Boolean) configMap.getOrDefault("forceMajor", null);
Boolean forceMinor = (Boolean) configMap.getOrDefault("forceMinor", null);
bookieServer.getBookie().getLedgerStorage().forceGC(forceMajor, forceMinor);
}

String output = "Triggered GC on BookieServer: " + bookieServer.toString();
String jsonResponse = JsonUtil.toJson(output);
@@ -332,6 +332,74 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact)
});
}

@Test
public void testForceGarbageCollectionWhenDiskIsFull() throws Exception {
testForceGarbageCollectionWhenDiskIsFull(true);
testForceGarbageCollectionWhenDiskIsFull(false);
}

public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAllowWhenDisableCompaction)
throws Exception {

restartBookies(conf -> {
if (isForceCompactionAllowWhenDisableCompaction) {
conf.setMinorCompactionInterval(0);
conf.setMajorCompactionInterval(0);
conf.setForceAllowCompaction(true);
conf.setMajorCompactionThreshold(0.5f);
conf.setMinorCompactionThreshold(0.2f);
} else {
conf.setMinorCompactionInterval(120000);
conf.setMajorCompactionInterval(240000);
}
return conf;
});

getGCThread().suspendMajorGC();
getGCThread().suspendMinorGC();
long majorCompactionCntBeforeGC = 0;
long minorCompactionCntBeforeGC = 0;
long majorCompactionCntAfterGC = 0;
long minorCompactionCntAfterGC = 0;

// disable forceMajor and forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, true, true).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC);

// enable forceMajor and forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, false, false).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC);

// enable forceMajor and disable forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, false, true).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
assertEquals(minorCompactionCntBeforeGC, minorCompactionCntAfterGC);

// disable forceMajor and enable forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, true, false).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
assertEquals(minorCompactionCntBeforeGC + 1, minorCompactionCntAfterGC);

}

@Test
public void testMinorCompaction() throws Exception {
// prepare data
@@ -270,6 +270,11 @@ public void forceGC() {
LedgerStorage.super.forceGC();
}

@Override
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
LedgerStorage.super.forceGC(forceMajor, forceMinor);
}

@Override
public List<DetectedInconsistency> localConsistencyCheck(Optional<RateLimiter> rateLimiter) throws IOException {
return LedgerStorage.super.localConsistencyCheck(rateLimiter);

0 comments on commit f0ff353

Please sign in to comment.