From 0baad03abf2e999e7063924c239f7db6a4f74560 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 29 Apr 2016 14:30:25 -0700 Subject: [PATCH] BOOKKEEPER-926: Compacted entries are not properly synced before updating index --- .../bookie/GarbageCollectorThread.java | 96 ++++++------------- .../bookkeeper/bookie/CompactionTest.java | 63 +++++++++++- 2 files changed, 91 insertions(+), 68 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java index 2821ec889b2..e5ee8d753ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java @@ -126,7 +126,7 @@ void acquire(int permits) { /** * A scanner wrapper to check whether a ledger is alive in an entry log file */ - class CompactionScannerFactory implements EntryLogger.EntryLogListener { + class CompactionScannerFactory { List offsets = new ArrayList(); EntryLogScanner newScanner(final EntryLogMetadata meta) { @@ -141,66 +141,38 @@ public boolean accept(long ledgerId) { } @Override - public void process(final long ledgerId, long offset, ByteBuffer entry) - throws IOException { + public void process(final long ledgerId, long offset, ByteBuffer entry) throws IOException { throttler.acquire(entry.remaining()); - synchronized (CompactionScannerFactory.this) { - if (offsets.size() > maxOutstandingRequests) { - waitEntrylogFlushed(); - } - entry.getLong(); // discard ledger id, we already have it - long entryId = entry.getLong(); - entry.rewind(); - - long newoffset = entryLogger.addEntry(ledgerId, entry); - offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); + + if (offsets.size() > maxOutstandingRequests) { + flush(); } + entry.getLong(); // discard ledger id, we already have it + long entryId = entry.getLong(); + entry.rewind(); + + long newoffset = entryLogger.addEntry(ledgerId, entry); + offsets.add(new EntryLocation(ledgerId, entryId, newoffset)); + } }; } - final Object flushLock = new Object(); - - @Override - public void onRotateEntryLog() { - synchronized (flushLock) { - flushLock.notifyAll(); + void flush() throws IOException { + if (offsets.isEmpty()) { + LOG.debug("Skipping entry log flushing, as there are no offset!"); + return; } - } - synchronized private void waitEntrylogFlushed() throws IOException { + // Before updating the index, we want to wait until all the compacted entries are flushed into the + // entryLog try { - if (offsets.size() <= 0) { - LOG.debug("Skipping entry log flushing, as there is no offset!"); - return; - } - - EntryLocation lastOffset = offsets.get(offsets.size()-1); - long lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); - while (lastOffsetLogId < entryLogger.getLeastUnflushedLogId() && running) { - synchronized (flushLock) { - flushLock.wait(1000); - } + entryLogger.flush(); - lastOffset = offsets.get(offsets.size()-1); - lastOffsetLogId = EntryLogger.logIdForOffset(lastOffset.location); - } - if (lastOffsetLogId >= entryLogger.getLeastUnflushedLogId() && !running) { - throw new IOException("Shutdown before flushed"); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted waiting for flush", ie); + ledgerStorage.updateEntriesLocations(offsets); + } finally { + offsets.clear(); } - - ledgerStorage.updateEntriesLocations(offsets); - offsets.clear(); - } - - synchronized void flush() throws IOException { - waitEntrylogFlushed(); - - ledgerStorage.flushEntriesLocationsIndex(); } } @@ -227,7 +199,6 @@ public GarbageCollectorThread(ServerConfiguration conf, this.compactionRateByEntries = conf.getCompactionRateByEntries(); this.compactionRateByBytes = conf.getCompactionRateByBytes(); this.scannerFactory = new CompactionScannerFactory(); - entryLogger.addListener(this.scannerFactory); this.garbageCleaner = new GarbageCollector.GarbageCleaner() { @Override @@ -456,7 +427,6 @@ public int compare(EntryLogMetadata m1, EntryLogMetadata m2) { List logsToCompact = new ArrayList(); logsToCompact.addAll(entryLogMetaMap.values()); Collections.sort(logsToCompact, sizeComparator); - List toRemove = new ArrayList(); for (EntryLogMetadata meta : logsToCompact) { if (meta.getUsage() >= threshold) { @@ -464,11 +434,15 @@ public int compare(EntryLogMetadata m1, EntryLogMetadata m2) { } if (LOG.isDebugEnabled()) { - LOG.debug("Compacting entry log {} below threshold {}.", meta.getEntryLogId(), threshold); + LOG.debug("Compacting entry log {} below threshold {}", meta.getEntryLogId(), threshold); } try { compactEntryLog(scannerFactory, meta); - toRemove.add(meta.getEntryLogId()); + scannerFactory.flush(); + + LOG.info("Removing entry log {} after compaction", meta.getEntryLogId()); + removeEntryLog(meta.getEntryLogId()); + } catch (LedgerDirsManager.NoWritableLedgerDirException nwlde) { LOG.warn("No writable ledger directory available, aborting compaction", nwlde); break; @@ -483,18 +457,6 @@ public int compare(EntryLogMetadata m1, EntryLogMetadata m2) { return; } } - try { - // compaction finished, flush any outstanding offsets - scannerFactory.flush(); - } catch (IOException ioe) { - LOG.error("Cannot flush compacted entries, skip removal", ioe); - return; - } - - // offsets have been flushed, its now safe to remove the old entrylogs - for (Long l : toRemove) { - removeEntryLog(l); - } } /** @@ -545,7 +507,7 @@ protected void compactEntryLog(CompactionScannerFactory scannerFactory, return; } - LOG.info("Compacting entry log : {}", entryLogMeta.getEntryLogId()); + LOG.info("Compacting entry log : {} - Usage: {} %", entryLogMeta.getEntryLogId(), entryLogMeta.getUsage()); try { entryLogger.scanEntryLog(entryLogMeta.getEntryLogId(), diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java index 4f3bb870dfa..5d384baf26d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java @@ -33,6 +33,8 @@ import java.util.Collection; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner; +import org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactionScannerFactory; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; @@ -49,7 +51,7 @@ import org.apache.bookkeeper.util.TestUtils; import org.apache.bookkeeper.versioning.Version; import org.apache.zookeeper.AsyncCallback; - +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -114,6 +116,7 @@ public void setUp() throws Exception { baseConf.setEntryLogSizeLimit(numEntries * ENTRY_SIZE); // Disable skip list for compaction baseConf.setGcWaitTime(gcWaitTime); + baseConf.setFlushInterval(100); baseConf.setMinorCompactionThreshold(minorCompactionThreshold); baseConf.setMajorCompactionThreshold(majorCompactionThreshold); baseConf.setMinorCompactionInterval(minorCompactionInterval); @@ -631,4 +634,62 @@ public void checkpointComplete(Checkpoint checkPoint, boolean compact) storage.gcThread.resumeMinorGC(); storage.gcThread.resumeMajorGC(); } + + @Test(timeout = 60000) + public void testCompactionWithEntryLogRollover() throws Exception { + // Disable bookie gc during this test + baseConf.setGcWaitTime(60000); + baseConf.setMinorCompactionInterval(0); + baseConf.setMajorCompactionInterval(0); + restartBookies(); + + // prepare data + LedgerHandle[] lhs = prepareData(3, false); + + for (LedgerHandle lh : lhs) { + lh.close(); + } + + // remove ledger2 and ledger3 + bkc.deleteLedger(lhs[1].getId()); + bkc.deleteLedger(lhs[2].getId()); + LOG.info("Finished deleting the ledgers contains most entries."); + + InterleavedLedgerStorage ledgerStorage = (InterleavedLedgerStorage) bs.get(0).getBookie().ledgerStorage; + GarbageCollectorThread garbageCollectorThread = ledgerStorage.gcThread; + CompactionScannerFactory compactionScannerFactory = garbageCollectorThread.scannerFactory; + long entryLogId = 0; + EntryLogger entryLogger = ledgerStorage.entryLogger; + + LOG.info("Before compaction -- Least unflushed log id: {}", entryLogger.getLeastUnflushedLogId()); + + // Compact entryLog 0 + EntryLogScanner scanner = compactionScannerFactory.newScanner(entryLogger.getEntryLogMetadata(entryLogId)); + + entryLogger.scanEntryLog(entryLogId, scanner); + + long entryLogIdAfterCompaction = entryLogger.getLeastUnflushedLogId(); + LOG.info("After compaction -- Least unflushed log id: {}", entryLogIdAfterCompaction); + + // Add more entries to trigger entrylog roll over + LedgerHandle[] lhs2 = prepareData(3, false); + + for (LedgerHandle lh : lhs2) { + lh.close(); + } + + // Wait for entry logger to move forward + while (entryLogger.getLeastUnflushedLogId() <= entryLogIdAfterCompaction) { + Thread.sleep(100); + } + + long entryLogIdBeforeFlushing = entryLogger.getLeastUnflushedLogId(); + LOG.info("Added more data -- Least unflushed log id: {}", entryLogIdBeforeFlushing); + + Assert.assertTrue(entryLogIdAfterCompaction < entryLogIdBeforeFlushing); + + // Wait for entries to be flushed on entry logs and update index + // This operation should succeed even if the entry log rolls over after the last entry was compacted + compactionScannerFactory.flush(); + } }