From ce621d38d2a2245b9ac43adeef46afbfaa00d4fd Mon Sep 17 00:00:00 2001 From: Govind Menon Date: Mon, 27 Mar 2017 13:53:31 -0500 Subject: [PATCH] BOOKKEEPER-1015: Release LedgerDescriptor and master-key objects when not used anymore --- .../org/apache/bookkeeper/bookie/Bookie.java | 28 ++++++------ .../bookkeeper/bookie/HandleFactoryImpl.java | 45 ++++++++++--------- .../bookie/InterleavedLedgerStorage.java | 13 ++++++ .../bookkeeper/bookie/LedgerStorage.java | 12 +++++ .../bookkeeper/bookie/TestSyncThread.java | 4 ++ .../meta/LedgerManagerTestCase.java | 12 ++--- 6 files changed, 74 insertions(+), 40 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index b3e0ed3b46f..067f844fd2a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -65,6 +65,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; @@ -141,7 +142,7 @@ public class Bookie extends BookieCriticalThread { BookieBean jmxBookieBean; BKMBeanInfo jmxLedgerStorageBean; - final ConcurrentMap masterKeyCache = new ConcurrentHashMap(); + final ConcurrentLongHashMap masterKeyCache = new ConcurrentLongHashMap(); final protected String zkBookieRegPath; final protected String zkBookieReadOnlyPath; @@ -1337,20 +1338,21 @@ synchronized int shutdown(int exitCode) { * * @throws BookieException if masterKey does not match the master key of the ledger */ - private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, byte[] masterKey) + private LedgerDescriptor getLedgerForEntry(ByteBuffer entry, final byte[] masterKey) throws IOException, BookieException { - long ledgerId = entry.getLong(); + final long ledgerId = entry.getLong(); LedgerDescriptor l = handles.getHandle(ledgerId, masterKey); - if (!masterKeyCache.containsKey(ledgerId)) { - // new handle, we should add the key to journal ensure we can rebuild - ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length); - bb.putLong(ledgerId); - bb.putLong(METAENTRY_ID_LEDGER_KEY); - bb.putInt(masterKey.length); - bb.put(masterKey); - bb.flip(); - - if (null == masterKeyCache.putIfAbsent(ledgerId, masterKey)) { + if (null == masterKeyCache.get(ledgerId)) { + // Force the load into masterKey cache + byte[] oldValue = masterKeyCache.putIfAbsent(ledgerId, masterKey); + if (oldValue == null) { + // new handle, we should add the key to journal ensure we can rebuild + ByteBuffer bb = ByteBuffer.allocate(8 + 8 + 4 + masterKey.length); + bb.putLong(ledgerId); + bb.putLong(METAENTRY_ID_LEDGER_KEY); + bb.putInt(masterKey.length); + bb.put(masterKey); + bb.flip(); getJournal(ledgerId).logAddEntry(bb, new NopWriteCallback(), null); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java index 45be76397a0..92d8f9b9f0a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/HandleFactoryImpl.java @@ -21,33 +21,31 @@ package org.apache.bookkeeper.bookie; +import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; + import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -class HandleFactoryImpl implements HandleFactory { - ConcurrentMap ledgers = new ConcurrentHashMap(); - ConcurrentMap readOnlyLedgers - = new ConcurrentHashMap(); +class HandleFactoryImpl implements HandleFactory, LedgerStorage.LedgerDeletionListener { + private final ConcurrentLongHashMap ledgers; + private final ConcurrentLongHashMap readOnlyLedgers; final LedgerStorage ledgerStorage; HandleFactoryImpl(LedgerStorage ledgerStorage) { this.ledgerStorage = ledgerStorage; + this.ledgers = new ConcurrentLongHashMap<>(); + this.readOnlyLedgers = new ConcurrentLongHashMap<>(); + + ledgerStorage.registerLedgerDeletionListener(this); } @Override public LedgerDescriptor getHandle(long ledgerId, byte[] masterKey) throws IOException, BookieException { - LedgerDescriptor handle = null; - if (null == (handle = ledgers.get(ledgerId))) { - // LedgerDescriptor#create sets the master key in the ledger storage, calling it - // twice on the same ledgerId is safe because it eventually puts a value in the ledger cache - // that guarantees synchronized access across all cached entries. - handle = ledgers.putIfAbsent(ledgerId, LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage)); - if (null == handle) { - handle = ledgers.get(ledgerId); - } + LedgerDescriptor handle = ledgers.get(ledgerId); + if (null == handle) { + handle = LedgerDescriptor.create(masterKey, ledgerId, ledgerStorage); + ledgers.putIfAbsent(ledgerId, handle); } handle.checkAccess(masterKey); return handle; @@ -56,13 +54,18 @@ public LedgerDescriptor getHandle(long ledgerId, byte[] masterKey) @Override public LedgerDescriptor getReadOnlyHandle(long ledgerId) throws IOException, Bookie.NoLedgerException { - LedgerDescriptor handle = null; - if (null == (handle = readOnlyLedgers.get(ledgerId))) { - handle = readOnlyLedgers.putIfAbsent(ledgerId, LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage)); - if (null == handle) { - handle = readOnlyLedgers.get(ledgerId); - } + LedgerDescriptor handle = readOnlyLedgers.get(ledgerId); + if (null == handle) { + handle = LedgerDescriptor.createReadOnly(ledgerId, ledgerStorage); + readOnlyLedgers.putIfAbsent(ledgerId, handle); + } return handle; } + + @Override + public void ledgerDeleted(long ledgerId) { + ledgers.remove(ledgerId); + readOnlyLedgers.remove(ledgerId); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java index 308110bcc93..a2207097496 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java @@ -25,11 +25,14 @@ import java.io.IOException; import java.nio.ByteBuffer; +import com.google.common.collect.Lists; import org.apache.bookkeeper.bookie.Bookie.NoLedgerException; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeUnit; import java.util.Map; @@ -82,6 +85,7 @@ protected synchronized Checkpoint getLastCheckpoint() { LedgerCache ledgerCache; private CheckpointSource checkpointSource; protected final CheckpointHolder checkpointHolder = new CheckpointHolder(); + private final CopyOnWriteArrayList ledgerDeletionListeners = Lists.newCopyOnWriteArrayList(); // A sorted map to stored all active ledger ids protected final SnapshotMap activeLedgers; @@ -374,6 +378,10 @@ synchronized public void flush() throws IOException { public void deleteLedger(long ledgerId) throws IOException { activeLedgers.remove(ledgerId); ledgerCache.deleteLedger(ledgerId); + + for(LedgerDeletionListener ledgerDeletionListener : ledgerDeletionListeners) { + ledgerDeletionListener.ledgerDeleted(ledgerId); + } } @Override @@ -418,6 +426,11 @@ protected void processEntry(long ledgerId, long entryId, ByteBuffer entry) throw processEntry(ledgerId, entryId, entry, true); } + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + ledgerDeletionListeners.add(listener); + } + synchronized protected void processEntry(long ledgerId, long entryId, ByteBuffer entry, boolean rollLog) throws IOException { /* diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java index 84a309f819d..b92f7c3f917 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java @@ -145,6 +145,18 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, */ void deleteLedger(long ledgerId) throws IOException; + public static interface LedgerDeletionListener { + void ledgerDeleted(long ledgerId); + } + + /** + * Register a listener for ledgers deletion notification + * + * @param listener + * object that will be notified every time a ledger is deleted + */ + void registerLedgerDeletionListener(LedgerDeletionListener listener); + /** * Get the JMX management bean for this LedgerStorage */ diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java index 1ce30e99028..6895ca19e8c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java @@ -279,6 +279,10 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, public void deleteLedger(long ledgerId) throws IOException { } + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + } + @Override public void start() { } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java index 4c2ddaa15a3..323b20268d3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java @@ -28,13 +28,8 @@ import java.util.Map; import java.util.NavigableMap; -import org.apache.bookkeeper.bookie.BookieException; -import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.*; import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint; -import org.apache.bookkeeper.bookie.CompactableLedgerStorage; -import org.apache.bookkeeper.bookie.EntryLocation; -import org.apache.bookkeeper.bookie.EntryLogger; -import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.jmx.BKMBeanInfo; import org.apache.bookkeeper.stats.StatsLogger; @@ -180,6 +175,11 @@ public void deleteLedger(long ledgerId) throws IOException { activeLedgers.remove(ledgerId); } + @Override + public void registerLedgerDeletionListener(LedgerDeletionListener listener) { + + } + @Override public Iterable getActiveLedgersInRange(long firstLedgerId, long lastLedgerId) { NavigableMap bkActiveLedgersSnapshot = activeLedgers.snapshot();