Skip to content

Commit

Permalink
Support LacPiggyback, LongPoll and ExplicitLac in db ledger storage
Browse files Browse the repository at this point in the history
Descriptions of the changes in this PR:

*Motivation*

DbLedgerStorage was developed in yahoo and just merged recently to master. This implementation only works for v2 protocol.
It doesn't work with v3 protocol. When using v3 protocol clients (e.g. dlog library) to talk to bookies use DbLedgerStorage,
the clients hang on waiting responses. because a few methods in DbLedgerStorage throw UnsupportedException

*Solution*

This change focuses on adding the implementations for those methods, in order to support LacPiggyback, LongPoll and ExplicitLac
for v3 protocol. The implementation follows what we had in FileInfoCache but did it in a much simpler way, since all the persistence
information has been managed by db ledger storage itself. All the information required by `LacPiggyback`, `LongPoll` and `ExplicitLac`
are transient.

This change doesn't introduce any new test cases. It reuses existing test cases to test those features using DbLedgerStorage. These
tests are:

- ExplicitLacTest: for testing explicit lac
- TestPiggybackLAC: for testing piggback lac
- TestReadLastConfirmedLongPoll: for testing long polling lac
- TestReadLastConfirmedAndEntry: for testing entry piggyback

Author: Sijie Guo <sijie@apache.org>

Reviewers: Ivan Kelly <ivank@apache.org>, Matteo Merli <mmerli@apache.org>

This closes #1218 from sijie/support_long_polls
  • Loading branch information
sijie committed Feb 28, 2018
1 parent 70ff9fc commit 8b8c551
Show file tree
Hide file tree
Showing 11 changed files with 494 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public ByteBuf getExplicitLac() {
}

public void setExplicitLac(ByteBuf lac) {
long explicitLacValue;
synchronized (this) {
if (explicitLac == null) {
explicitLac = ByteBuffer.allocate(lac.capacity());
Expand All @@ -176,13 +177,13 @@ public void setExplicitLac(ByteBuf lac) {

// skip the ledger id
explicitLac.getLong();
long explicitLacValue = explicitLac.getLong();
setLastAddConfirmed(explicitLacValue);
explicitLacValue = explicitLac.getLong();
explicitLac.rewind();
if (LOG.isDebugEnabled()) {
LOG.debug("fileInfo:SetLac: {}", explicitLac);
}
}
setLastAddConfirmed(explicitLacValue);
}

public synchronized void readHeader() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,13 @@ public long getLastAddConfirmed(long ledgerId) throws IOException {
if (null == bb) {
return BookieProtocol.INVALID_ENTRY_ID;
} else {
bb.readLong(); // ledger id
bb.readLong(); // entry id
lac = bb.readLong();
lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
try {
bb.skipBytes(2 * Long.BYTES); // skip ledger & entry id
lac = bb.readLong();
lac = ledgerCache.updateLastAddConfirmed(ledgerId, lac);
} finally {
bb.release();
}
}
}
return lac;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,21 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.SortedMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
Expand All @@ -55,6 +63,7 @@
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats.LedgerData;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage.Batch;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory.DbConfigType;
import org.apache.bookkeeper.common.util.Watchable;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
Expand All @@ -74,10 +83,121 @@
*/
public class DbLedgerStorage implements CompactableLedgerStorage {

/**
* This class borrows the logic from FileInfo.
*
* <p>This class is used for holding all the transient states for a given ledger.
*/
private static class TransientLedgerInfo
extends Watchable<LastAddConfirmedUpdateNotification>
implements AutoCloseable {

// lac
private Long lac = null;
// request from explicit lac requests
private ByteBuffer explicitLac = null;
// is the ledger info closed?
private boolean isClosed;

private final long ledgerId;
// reference to LedgerMetadataIndex
private final LedgerMetadataIndex ledgerIndex;

/**
* Construct an Watchable with zero watchers.
*/
public TransientLedgerInfo(long ledgerId, LedgerMetadataIndex ledgerIndex) {
super(WATCHER_RECYCLER);
this.ledgerId = ledgerId;
this.ledgerIndex = ledgerIndex;
}

synchronized Long getLastAddConfirmed() {
return lac;
}

Long setLastAddConfirmed(long lac) {
long lacToReturn;
boolean changed = false;
synchronized (this) {
if (null == this.lac || this.lac < lac) {
this.lac = lac;
changed = true;
}
lacToReturn = this.lac;
}
if (changed) {
notifyWatchers(lacToReturn);
}
return lacToReturn;
}

synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
if ((null != lac && lac > previousLAC)
|| isClosed || ledgerIndex.get(ledgerId).getFenced()) {
return false;
}

addWatcher(watcher);
return true;
}

public ByteBuf getExplicitLac() {
ByteBuf retLac = null;
synchronized (this) {
if (explicitLac != null) {
retLac = Unpooled.buffer(explicitLac.capacity());
explicitLac.rewind(); //copy from the beginning
retLac.writeBytes(explicitLac);
explicitLac.rewind();
return retLac;
}
}
return retLac;
}

public void setExplicitLac(ByteBuf lac) {
long explicitLacValue;
synchronized (this) {
if (explicitLac == null) {
explicitLac = ByteBuffer.allocate(lac.capacity());
}
lac.readBytes(explicitLac);
explicitLac.rewind();

// skip the ledger id
explicitLac.getLong();
explicitLacValue = explicitLac.getLong();
explicitLac.rewind();
}
setLastAddConfirmed(explicitLacValue);
}

void notifyWatchers(long lastAddConfirmed) {
notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lastAddConfirmed);
}

@Override
public void close() {
synchronized (this) {
if (isClosed) {
return;
}
isClosed = true;
}
// notify watchers
notifyWatchers(Long.MAX_VALUE);
}

}

private EntryLogger entryLogger;

private LedgerMetadataIndex ledgerIndex;
private EntryLocationIndex entryLocationIndex;
private LoadingCache<Long, TransientLedgerInfo> transientLedgerInfoCache;

private GarbageCollectorThread gcThread;

Expand Down Expand Up @@ -167,12 +287,41 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
ledgerIndex = new LedgerMetadataIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);
entryLocationIndex = new EntryLocationIndex(conf, KeyValueStorageRocksDB.factory, baseDir, stats);

// build the ledger info cache
int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
RemovalListener<Long, TransientLedgerInfo> ledgerInfoRemovalListener = this::handleLedgerEviction;
CacheBuilder<Long, TransientLedgerInfo> builder = CacheBuilder.newBuilder()
.initialCapacity(conf.getFileInfoCacheInitialCapacity())
.maximumSize(conf.getOpenFileLimit())
.concurrencyLevel(concurrencyLevel)
.removalListener(ledgerInfoRemovalListener);
if (conf.getFileInfoMaxIdleTime() > 0) {
builder.expireAfterAccess(conf.getFileInfoMaxIdleTime(), TimeUnit.SECONDS);
}
transientLedgerInfoCache = builder.build(new CacheLoader<Long, TransientLedgerInfo>() {
@Override
public TransientLedgerInfo load(Long key) throws Exception {
return new TransientLedgerInfo(key, ledgerIndex);
}
});

entryLogger = new EntryLogger(conf, ledgerDirsManager);
gcThread = new GarbageCollectorThread(conf, ledgerManager, this, statsLogger);

registerStats();
}

/**
* When a ledger is evicted from transient ledger info cache, we can just simply discard the object.
*/
private void handleLedgerEviction(RemovalNotification<Long, TransientLedgerInfo> notification) {
TransientLedgerInfo ledgerInfo = notification.getValue();
if (null == ledgerInfo || null == notification.getKey()) {
return;
}
ledgerInfo.close();
}

public void registerStats() {
stats.registerGauge("write-cache-size", new Gauge<Long>() {
@Override
Expand Down Expand Up @@ -285,7 +434,15 @@ public boolean setFenced(long ledgerId) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Set fenced. ledger: {}", ledgerId);
}
return ledgerIndex.setFenced(ledgerId);
boolean changed = ledgerIndex.setFenced(ledgerId);
if (changed) {
// notify all the watchers if a ledger is fenced
TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
if (null != ledgerInfo) {
ledgerInfo.notifyWatchers(Long.MAX_VALUE);
}
}
return changed;
}

@Override
Expand All @@ -310,9 +467,10 @@ public long addEntry(ByteBuf entry) throws IOException {

long ledgerId = entry.getLong(entry.readerIndex());
long entryId = entry.getLong(entry.readerIndex() + 8);
long lac = entry.getLong(entry.readerIndex() + 16);

if (log.isDebugEnabled()) {
log.debug("Add entry. {}@{}", ledgerId, entryId);
log.debug("Add entry. {}@{}, lac = {}", ledgerId, entryId, lac);
}

// Waits if the write cache is being switched for a flush
Expand All @@ -328,6 +486,10 @@ public long addEntry(ByteBuf entry) throws IOException {
triggerFlushAndAddEntry(ledgerId, entryId, entry);
}

// after successfully insert the entry, update LAC and notify the watchers
transientLedgerInfoCache.getUnchecked(ledgerId)
.setLastAddConfirmed(lac);

recordSuccessfulEvent(addEntryStats, startTime);
return entryId;
}
Expand Down Expand Up @@ -712,23 +874,42 @@ public EntryLogger getEntryLogger() {

@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
throw new UnsupportedOperationException();
TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
Long lac = null != ledgerInfo ? ledgerInfo.getLastAddConfirmed() : null;
if (null == lac) {
ByteBuf bb = getEntry(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED);
try {
bb.skipBytes(2 * Long.BYTES); // skip ledger id and entry id
lac = bb.readLong();
lac = transientLedgerInfoCache.getUnchecked(ledgerId).setLastAddConfirmed(lac);
} finally {
bb.release();
}
}
return lac;
}

@Override
public boolean waitForLastAddConfirmedUpdate(long ledgerId, long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
throw new UnsupportedOperationException();
return transientLedgerInfoCache.getUnchecked(ledgerId)
.waitForLastAddConfirmedUpdate(previousLAC, watcher);
}

@Override
public void setExplicitlac(long ledgerId, ByteBuf lac) throws IOException {
throw new UnsupportedOperationException();
transientLedgerInfoCache.getUnchecked(ledgerId)
.setExplicitLac(lac);
}

@Override
public ByteBuf getExplicitLac(long ledgerId) {
throw new UnsupportedOperationException();
TransientLedgerInfo ledgerInfo = transientLedgerInfoCache.getIfPresent(ledgerId);
if (null == ledgerInfo) {
return null;
} else {
return ledgerInfo.getExplicitLac();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void simple() throws Exception {
ByteBuf entry = Unpooled.buffer(1024);
entry.writeLong(4); // ledger id
entry.writeLong(1); // entry id
entry.writeLong(0); // lac
entry.writeBytes("entry-1".getBytes());

assertEquals(false, ((DbLedgerStorage) storage).isFlushRequired());
Expand Down Expand Up @@ -152,6 +153,7 @@ public void simple() throws Exception {
ByteBuf entry2 = Unpooled.buffer(1024);
entry2.writeLong(4); // ledger id
entry2.writeLong(2); // entry id
entry2.writeLong(1); // lac
entry2.writeBytes("entry-2".getBytes());

storage.addEntry(entry2);
Expand All @@ -160,32 +162,41 @@ public void simple() throws Exception {
res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
assertEquals(entry2, res);

// Read last add confirmed in ledger
assertEquals(1L, storage.getLastAddConfirmed(4));

ByteBuf entry3 = Unpooled.buffer(1024);
entry3.writeLong(4); // ledger id
entry3.writeLong(3); // entry id
entry3.writeLong(2); // lac
entry3.writeBytes("entry-3".getBytes());
storage.addEntry(entry3);

ByteBuf entry4 = Unpooled.buffer(1024);
entry4.writeLong(4); // ledger id
entry4.writeLong(4); // entry id
entry4.writeLong(3); // lac
entry4.writeBytes("entry-4".getBytes());
storage.addEntry(entry4);

res = storage.getEntry(4, 4);
assertEquals(entry4, res);

assertEquals(3, storage.getLastAddConfirmed(4));

// Delete
assertEquals(true, storage.ledgerExists(4));
storage.deleteLedger(4);
assertEquals(false, storage.ledgerExists(4));

// Should not throw exception event if the ledger was deleted
storage.getEntry(4, 4);
assertEquals(3, storage.getLastAddConfirmed(4));

storage.addEntry(Unpooled.wrappedBuffer(entry2));
res = storage.getEntry(4, BookieProtocol.LAST_ADD_CONFIRMED);
assertEquals(entry4, res);
assertEquals(3, storage.getLastAddConfirmed(4));

// Get last entry from storage
storage.flush();
Expand Down

0 comments on commit 8b8c551

Please sign in to comment.