Skip to content
Permalink
Browse files
[BK-SERVER] Clean up over-replicated ledgers owned by different bookies
### Motivation
As described at: apache/pulsar#4632
- Sometimes due to overreplication, bookie contains ledgers which are not owned by that bookie anymore and that bookie is not part of the ensemble-list of those ledgers. In this case, GC finds out those overreplicated ledgers and 
- deletes their index from dbStorage (rocksDB) and 
- tries to delete them from entrylog files.

However, bookie doesn't delete them from entry-log files due to change made in [#870](#870) where bookie avoids deleting ledger if znode of that ledger exists.

Because of that bookie ends up storing large number entrylog files with ledgers which are owned by different bookies. It also cause OOM when GC tries to deal with large number of entry log files.

### Modification

Delete the ledgers if bookie is not part of ensemble list of over-replicated ledgers

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

This closes #2119 from rdhabalia/overRepl
  • Loading branch information
rdhabalia authored and eolivelli committed Jul 12, 2019
1 parent 762a613 commit 6b892a45f02a6d8d0b1d33235d4134d91e3ea111
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 4 deletions.
@@ -34,8 +34,10 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
@@ -45,6 +47,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -149,6 +152,8 @@ public void gc(GarbageCleaner garbageCleaner) {
long start;
long end = -1;
boolean done = false;
AtomicBoolean isBookieInEnsembles = new AtomicBoolean(false);
Versioned<LedgerMetadata> metadata = null;
while (!done) {
start = end + 1;
if (ledgerRangeIterator.hasNext()) {
@@ -169,9 +174,12 @@ public void gc(GarbageCleaner garbageCleaner) {
for (Long bkLid : subBkActiveLedgers) {
if (!ledgersInMetadata.contains(bkLid)) {
if (verifyMetadataOnGc) {
isBookieInEnsembles.set(false);
metadata = null;
int rc = BKException.Code.OK;
try {
result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs, TimeUnit.MILLISECONDS);
metadata = result(ledgerManager.readLedgerMetadata(bkLid), zkOpTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (BKException | TimeoutException e) {
if (e instanceof BKException) {
rc = ((BKException) e).getCode();
@@ -182,7 +190,19 @@ public void gc(GarbageCleaner garbageCleaner) {
continue;
}
}
if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
// check bookie should be part of ensembles in one
// of the segment else ledger should be deleted from
// local storage
if (metadata != null && metadata.getValue() != null) {
metadata.getValue().getAllEnsembles().forEach((entryId, ensembles) -> {
if (ensembles != null && ensembles.contains(selfBookieAddress)) {
isBookieInEnsembles.set(true);
}
});
if (isBookieInEnsembles.get()) {
continue;
}
} else if (rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException) {
LOG.warn("Ledger {} Missing in metadata list, but ledgerManager returned rc: {}.",
bkLid, rc);
continue;
@@ -52,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -93,12 +94,18 @@ public GcLedgersTest(Class<? extends LedgerManagerFactory> lmFactoryCls) {
super(lmFactoryCls);
}

private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
BookieSocketAddress selfBookie = Bookie.getBookieAddress(baseConf);
createLedgers(numLedgers, createdLedgers, selfBookie);
}

/**
* Create ledgers.
*/
private void createLedgers(int numLedgers, final Set<Long> createdLedgers) throws IOException {
private void createLedgers(int numLedgers, final Set<Long> createdLedgers, BookieSocketAddress selfBookie)
throws IOException {
final AtomicInteger expected = new AtomicInteger(numLedgers);
List<BookieSocketAddress> ensemble = Lists.newArrayList(new BookieSocketAddress("192.0.2.1", 1234));
List<BookieSocketAddress> ensemble = Lists.newArrayList(selfBookie);

for (int i = 0; i < numLedgers; i++) {
getLedgerIdGenerator().generateLedgerId(new GenericCallback<Long>() {
@@ -692,4 +699,55 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException {
return null;
}
}

/**
* Verifies that gc should cleaned up overreplicatd ledgers which is not
* owned by the bookie anymore.
*
* @throws Exception
*/
@Test
public void testGcLedgersForOverreplicated() throws Exception {
baseConf.setVerifyMetadataOnGc(true);
final SortedSet<Long> createdLedgers = Collections.synchronizedSortedSet(new TreeSet<Long>());
final SortedSet<Long> cleaned = Collections.synchronizedSortedSet(new TreeSet<Long>());

// Create few ledgers
final int numLedgers = 5;

BookieSocketAddress bookieAddress = new BookieSocketAddress("192.0.0.1", 1234);
createLedgers(numLedgers, createdLedgers, bookieAddress);

LedgerManager mockLedgerManager = new CleanupLedgerManager(getLedgerManager()) {
@Override
public LedgerRangeIterator getLedgerRanges(long zkOpTimeout) {
return new LedgerRangeIterator() {
@Override
public LedgerRange next() throws IOException {
return null;
}

@Override
public boolean hasNext() throws IOException {
return false;
}
};
}
};

final GarbageCollector garbageCollector = new ScanAndCompareGarbageCollector(mockLedgerManager,
new MockLedgerStorage(), baseConf, NullStatsLogger.INSTANCE);
GarbageCollector.GarbageCleaner cleaner = new GarbageCollector.GarbageCleaner() {
@Override
public void clean(long ledgerId) {
LOG.info("Cleaned {}", ledgerId);
cleaned.add(ledgerId);
}
};

validateLedgerRangeIterator(createdLedgers);

garbageCollector.gc(cleaner);
assertEquals("Should have cleaned all ledgers", cleaned.size(), numLedgers);
}
}

0 comments on commit 6b892a4

Please sign in to comment.