Permalink
Browse files

BOOKKEEPER-463: Refactor garbage collection code for ease to plugin d…

…ifferent GC algorithm. (Fangmin, ivank, fpj via sijie)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/bookkeeper/trunk@1425585 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent a3da8c7 commit fa13426970bd4b6d7ffc086e776559f87349ef71 @sijie sijie committed Dec 24, 2012
Showing with 735 additions and 635 deletions.
  1. +2 −0 CHANGES.txt
  2. +9 −12 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
  3. +51 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollector.java
  4. +31 −19 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
  5. +13 −6 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
  6. +8 −7 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
  7. +108 −0 bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ScanAndCompareGarbageCollector.java
  8. +2 −1 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java
  9. +56 −150 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/AbstractZkLedgerManager.java
  10. +0 −84 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/ActiveLedgerManager.java
  11. +25 −34 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java
  12. +0 −5 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManagerFactory.java
  13. +102 −113 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManager.java
  14. +0 −5 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/HierarchicalLedgerManagerFactory.java
  15. +96 −6 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManager.java
  16. +1 −9 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
  17. +60 −150 bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/MSLedgerManagerFactory.java
  18. +2 −3 bookkeeper-server/src/main/java/org/apache/bookkeeper/{meta → util}/SnapshotMap.java
  19. +60 −2 bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java
  20. +90 −2 bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ZkUtils.java
  21. +4 −5 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
  22. +12 −8 bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
  23. +3 −14 bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
View
@@ -270,6 +270,8 @@ Trunk (unreleased changes)
BOOKKEEPER-490: add documentation for MetaStore interface (sijie, ivank via sijie)
+ BOOKKEEPER-463: Refactor garbage collection code for ease to plugin different GC algorithm. (Fangmin, ivank, fpj via sijie)
+
hedwig-server:
BOOKKEEPER-250: Need a ledger manager like interface to manage metadata operations in Hedwig (sijie via ivank)
@@ -39,7 +39,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
@@ -82,8 +82,8 @@
final ServerConfiguration conf;
final SyncThread syncThread;
- final LedgerManagerFactory activeLedgerManagerFactory;
- final ActiveLedgerManager activeLedgerManager;
+ final LedgerManagerFactory ledgerManagerFactory;
+ final LedgerManager ledgerManager;
final LedgerStorage ledgerStorage;
final Journal journal;
@@ -478,17 +478,14 @@ public Bookie(ServerConfiguration conf)
this.conf = conf;
this.journalDirectory = getCurrentDirectory(conf.getJournalDir());
this.ledgerDirsManager = new LedgerDirsManager(conf);
-
// instantiate zookeeper client to initialize ledger manager
this.zk = instantiateZookeeperClient(conf);
checkEnvironment(this.zk);
-
- activeLedgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
- activeLedgerManager = activeLedgerManagerFactory.newActiveLedgerManager();
-
+ ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
+ LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
+ ledgerManager = ledgerManagerFactory.newLedgerManager();
syncThread = new SyncThread(conf);
- ledgerStorage = new InterleavedLedgerStorage(conf, activeLedgerManager,
- ledgerDirsManager);
+ ledgerStorage = new InterleavedLedgerStorage(conf, ledgerManager, ledgerDirsManager);
handles = new HandleFactoryImpl(ledgerStorage);
// instantiate the journal
journal = new Journal(conf, ledgerDirsManager);
@@ -910,8 +907,8 @@ synchronized int shutdown(int exitCode) {
// close Ledger Manager
try {
- activeLedgerManager.close();
- activeLedgerManagerFactory.uninitialize();
+ ledgerManager.close();
+ ledgerManagerFactory.uninitialize();
} catch (IOException ie) {
LOG.error("Failed to close active ledger manager : ", ie);
}
@@ -0,0 +1,51 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+/**
+ * This is the garbage collector interface, garbage collector implementers
+ * need to extends this class to remove the deleted ledgers.
+ */
+public interface GarbageCollector {
+ /**
+ * Do the garbage collector work
+ *
+ * @param garbageCleaner
+ * cleaner used to clean selected garbages
+ */
+ public abstract void gc(GarbageCleaner garbageCleaner);
+
+ /**
+ * A interface used to define customised garbage cleaner
+ */
+ public interface GarbageCleaner {
+
+ /**
+ * Clean a specific ledger
+ *
+ * @param ledgerId
+ * Ledger ID to be cleaned
+ */
+ public void clean(final long ledgerId) ;
+ }
+
+}
@@ -32,9 +32,11 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.bookie.GarbageCollector.GarbageCleaner;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,8 +73,7 @@
// Ledger Cache Handle
final LedgerCache ledgerCache;
-
- final ActiveLedgerManager activeLedgerManager;
+ final SnapshotMap<Long, Boolean> activeLedgers;
// flag to ensure gc thread will not be interrupted during compaction
// to reduce the risk getting entry log corrupted
@@ -83,6 +84,9 @@
// track the last scanned successfully log id
long scannedLogId = 0;
+ final GarbageCollector garbageCollector;
+ final GarbageCleaner garbageCleaner;
+
/**
* A scanner wrapper to check whether a ledger is alive in an entry log file
*/
@@ -114,19 +118,37 @@ public void process(long ledgerId, long offset, ByteBuffer entry)
* @throws IOException
*/
public GarbageCollectorThread(ServerConfiguration conf,
- LedgerCache ledgerCache,
+ final LedgerCache ledgerCache,
EntryLogger entryLogger,
- ActiveLedgerManager activeLedgerManager,
- EntryLogScanner scanner)
+ SnapshotMap<Long, Boolean> activeLedgers,
+ EntryLogScanner scanner,
+ LedgerManager ledgerManager)
throws IOException {
super("GarbageCollectorThread");
this.ledgerCache = ledgerCache;
this.entryLogger = entryLogger;
- this.activeLedgerManager = activeLedgerManager;
+ this.activeLedgers = activeLedgers;
this.scanner = scanner;
this.gcWaitTime = conf.getGcWaitTime();
+
+ this.garbageCleaner = new GarbageCollector.GarbageCleaner() {
+ @Override
+ public void clean(long ledgerId) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("delete ledger : " + ledgerId);
+ }
+ ledgerCache.deleteLedger(ledgerId);
+ } catch (IOException e) {
+ LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
+ }
+ }
+ };
+
+ this.garbageCollector = new ScanAndCompareGarbageCollector(ledgerManager, activeLedgers);
+
// compaction parameters
minorCompactionThreshold = conf.getMinorCompactionThreshold();
minorCompactionInterval = conf.getMinorCompactionInterval() * SECOND;
@@ -223,17 +245,7 @@ public void run() {
* Do garbage collection ledger index files
*/
private void doGcLedgers() {
- activeLedgerManager.garbageCollectLedgers(
- new ActiveLedgerManager.GarbageCollector() {
- @Override
- public void gc(long ledgerId) {
- try {
- ledgerCache.deleteLedger(ledgerId);
- } catch (IOException e) {
- LOG.error("Exception when deleting the ledger index file on the Bookie: ", e);
- }
- }
- });
+ garbageCollector.gc(garbageCleaner);
}
/**
@@ -245,7 +257,7 @@ private void doGcEntryLogs() {
EntryLogMetadata meta = entryLogMetaMap.get(entryLogId);
for (Long entryLogLedger : meta.ledgersMap.keySet()) {
// Remove the entry log ledger from the set if it isn't active.
- if (!activeLedgerManager.containsActiveLedger(entryLogLedger)) {
+ if (!activeLedgers.containsKey(entryLogLedger)) {
meta.removeLedger(entryLogLedger);
}
}
@@ -26,8 +26,10 @@
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
+import org.apache.bookkeeper.util.SnapshotMap;
+import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +44,10 @@
EntryLogger entryLogger;
LedgerCache ledgerCache;
+
+ // A sorted map to stored all active ledger ids
+ protected final SnapshotMap<Long, Boolean> activeLedgers;
+
// This is the thread that garbage collects the entry logs that do not
// contain any active ledgers in them; and compacts the entry logs that
// has lower remaining percentage to reclaim disk space.
@@ -51,15 +57,16 @@
private volatile boolean somethingWritten = false;
InterleavedLedgerStorage(ServerConfiguration conf,
- ActiveLedgerManager activeLedgerManager,
- LedgerDirsManager ledgerDirsManager) throws IOException {
+ LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager)
+ throws IOException {
+ activeLedgers = new SnapshotMap<Long, Boolean>();
entryLogger = new EntryLogger(conf, ledgerDirsManager);
- ledgerCache = new LedgerCacheImpl(conf, activeLedgerManager, ledgerDirsManager);
+ ledgerCache = new LedgerCacheImpl(conf, activeLedgers, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerCache, entryLogger,
- activeLedgerManager, new EntryLogCompactionScanner());
+ activeLedgers, new EntryLogCompactionScanner(), ledgerManager);
}
- @Override
+ @Override
public void start() {
gcThread.start();
}
@@ -35,7 +35,7 @@
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.bookkeeper.meta.ActiveLedgerManager;
+import org.apache.bookkeeper.util.SnapshotMap;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -54,7 +54,8 @@
private LedgerDirsManager ledgerDirsManager;
final private AtomicBoolean shouldRelocateIndexFile = new AtomicBoolean(false);
- public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, LedgerDirsManager ledgerDirsManager)
+ public LedgerCacheImpl(ServerConfiguration conf, SnapshotMap<Long, Boolean> activeLedgers,
+ LedgerDirsManager ledgerDirsManager)
throws IOException {
this.ledgerDirsManager = ledgerDirsManager;
this.openFileLimit = conf.getOpenFileLimit();
@@ -69,7 +70,7 @@ public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, Ledger
}
LOG.info("maxMemory = " + Runtime.getRuntime().maxMemory());
LOG.info("openFileLimit is " + openFileLimit + ", pageSize is " + pageSize + ", pageLimit is " + pageLimit);
- activeLedgerManager = alm;
+ this.activeLedgers = activeLedgers;
// Retrieve all of the active ledgers.
getActiveLedgers();
ledgerDirsManager.addLedgerDirsListener(getLedgerDirsListener());
@@ -90,7 +91,7 @@ public LedgerCacheImpl(ServerConfiguration conf, ActiveLedgerManager alm, Ledger
// Manage all active ledgers in LedgerManager
// so LedgerManager has knowledge to garbage collect inactive/deleted ledgers
- final ActiveLedgerManager activeLedgerManager;
+ final SnapshotMap<Long, Boolean> activeLedgers;
final int openFileLimit;
final int pageSize;
@@ -257,7 +258,7 @@ FileInfo getFileInfo(Long ledger, byte masterKey[]) throws IOException {
// A new ledger index file has been created for this Bookie.
// Add this new ledger to the set of active ledgers.
LOG.debug("New ledger index file created for ledgerId: {}", ledger);
- activeLedgerManager.addActiveLedger(ledger, true);
+ activeLedgers.put(ledger, true);
}
evictFileInfoIfNecessary();
fi = new FileInfo(lf, masterKey);
@@ -697,7 +698,7 @@ private void getActiveLedgers() throws IOException {
}
}
}
- activeLedgerManager.addActiveLedger(Long.parseLong(ledgerIdInHex, 16), true);
+ activeLedgers.put(Long.parseLong(ledgerIdInHex, 16), true);
}
}
}
@@ -739,7 +740,7 @@ public void deleteLedger(long ledgerId) throws IOException {
}
// Remove it from the active ledger manager
- activeLedgerManager.removeActiveLedger(ledgerId);
+ activeLedgers.remove(ledgerId);
// Now remove it from all the other lists and maps.
// These data structures need to be synchronized first before removing entries.
Oops, something went wrong. Retry.

0 comments on commit fa13426

Please sign in to comment.