Skip to content

Commit

Permalink
Add option to persist unack-msg range to cursor-ledger if range is hi…
Browse files Browse the repository at this point in the history
…gh (#758)

* Add option to persist unack-msg range to cursor-ledger if range is high

* Increase managedLedgerMaxUnackedRangesToPersist=10K
  • Loading branch information
rdhabalia committed Sep 15, 2017
1 parent d4a8c25 commit fed5ae3
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 27 deletions.
6 changes: 5 additions & 1 deletion conf/broker.conf
Expand Up @@ -280,8 +280,12 @@ managedLedgerCursorRolloverTimeInSeconds=14400
# that were acknowledged. After the max number of ranges is reached, the information # that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of # will only be tracked in memory and messages will be redelivered in case of
# crashes. # crashes.
managedLedgerMaxUnackedRangesToPersist=1000 managedLedgerMaxUnackedRangesToPersist=10000


# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000




### --- Load balancer --- ### ### --- Load balancer --- ###
Expand Down
13 changes: 13 additions & 0 deletions conf/standalone.conf
Expand Up @@ -246,6 +246,19 @@ managedLedgerCursorMaxEntriesPerLedger=50000
# Max time before triggering a rollover on a cursor ledger # Max time before triggering a rollover on a cursor ledger
managedLedgerCursorRolloverTimeInSeconds=14400 managedLedgerCursorRolloverTimeInSeconds=14400


# Max number of "acknowledgment holes" that are going to be persistently stored.
# When acknowledging out of order, a consumer will leave holes that are supposed
# to be quickly filled by acking all the messages. The information of which
# messages are acknowledged is persisted by compressing in "ranges" of messages
# that were acknowledged. After the max number of ranges is reached, the information
# will only be tracked in memory and messages will be redelivered in case of
# crashes.
managedLedgerMaxUnackedRangesToPersist=10000

# Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
# than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000




### --- Load balancer --- ### ### --- Load balancer --- ###
Expand Down
Expand Up @@ -34,7 +34,8 @@
@Beta @Beta
public class ManagedLedgerConfig { public class ManagedLedgerConfig {


private int maxUnackedRangesToPersist = 1000; private int maxUnackedRangesToPersist = 10000;
private int maxUnackedRangesToPersistInZk = 1000;
private int maxEntriesPerLedger = 50000; private int maxEntriesPerLedger = 50000;
private int maxSizePerLedgerMb = 100; private int maxSizePerLedgerMb = 100;
private int minimumRolloverTimeMs = 0; private int minimumRolloverTimeMs = 0;
Expand Down Expand Up @@ -368,4 +369,16 @@ public ManagedLedgerConfig setMaxUnackedRangesToPersist(int maxUnackedRangesToPe
this.maxUnackedRangesToPersist = maxUnackedRangesToPersist; this.maxUnackedRangesToPersist = maxUnackedRangesToPersist;
return this; return this;
} }

/**
* @return max unacked message ranges up to which it can store in Zookeeper
*
*/
public int getMaxUnackedRangesToPersistInZk() {
return maxUnackedRangesToPersistInZk;
}

public void setMaxUnackedRangesToPersistInZk(int maxUnackedRangesToPersistInZk) {
this.maxUnackedRangesToPersistInZk = maxUnackedRangesToPersistInZk;
}
} }
Expand Up @@ -1362,7 +1362,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {


lastMarkDeleteEntry = mdEntry; lastMarkDeleteEntry = mdEntry;


persistPosition(cursorLedger, mdEntry, new VoidCallback() { persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
@Override @Override
public void operationComplete() { public void operationComplete() {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1717,6 +1717,61 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
} }
} }


/**
* Persist given markDelete position to cursor-ledger or zk-metaStore based on max number of allowed unack-range
* that can be persist in zk-metastore. If current unack-range is higher than configured threshold then broker
* persists mark-delete into cursor-ledger else into zk-metastore.
*
* @param cursorsLedgerId
* @param position
* @param properties
* @param callback
*/
private void persistPosition(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {

if (shouldPersistUnackRangesToLedger()) {
persistPositionToLedger(cursorLedger, new MarkDeleteEntry(position, properties, null, null),
new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}][{}] Updated md-position={} into cursor-ledger {}", ledger.getName(), name,
markDeletePosition, cursorLedger.getId());
callback.closeComplete(ctx);
}

@Override
public void operationFailed(ManagedLedgerException e) {
log.warn("[{}][{}] Failed to persist mark-delete position into cursor-ledger{}: {}",
ledger.getName(), name, cursorLedger.getId(), e.getMessage());
callback.closeFailed(e, ctx);
}
});
} else {
persistPositionMetaStore(cursorsLedgerId, position, properties, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}][{}] Closed cursor at md-position={}", ledger.getName(), name, markDeletePosition);
// At this point the position had already been safely stored in the cursor z-node
callback.closeComplete(ctx);
asyncDeleteLedger(cursorLedger);
}

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}][{}] Failed to update cursor info when closing: {}", ledger.getName(), name,
e.getMessage());
callback.closeFailed(e, ctx);
}
}, true);
}
}

private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null && config.getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.asRanges().size() > config.getMaxUnackedRangesToPersistInZk();
}

private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties, private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) { MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
// When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger, // When closing we store the last mark-delete position in the z-node itself, so we won't need the cursor ledger,
Expand Down Expand Up @@ -1757,27 +1812,7 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object
callback.closeComplete(ctx); callback.closeComplete(ctx);
return; return;
} }

persistPosition(-1, lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
persistPositionMetaStore(-1, lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}][{}] Closed cursor at md-position={}", ledger.getName(), name,
markDeletePosition);

// At this point the position had already been safely stored in the cursor z-node
callback.closeComplete(ctx);

asyncDeleteLedger(cursorLedger);
}

@Override
public void operationFailed(MetaStoreException e) {
log.warn("[{}][{}] Failed to update cursor info when closing: {}", ledger.getName(), name,
e.getMessage());
callback.closeFailed(e, ctx);
}
}, true);
} }


/** /**
Expand Down Expand Up @@ -1870,7 +1905,7 @@ void createNewMetadataLedger(final VoidCallback callback) {
// Created the ledger, now write the last position // Created the ledger, now write the last position
// content // content
MarkDeleteEntry mdEntry = lastMarkDeleteEntry; MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
persistPosition(lh, mdEntry, new VoidCallback() { persistPositionToLedger(lh, mdEntry, new VoidCallback() {
@Override @Override
public void operationComplete() { public void operationComplete() {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1958,7 +1993,7 @@ private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
} }
} }


void persistPosition(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
PositionImpl position = mdEntry.newPosition; PositionImpl position = mdEntry.newPosition;
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId()) PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
.setEntryId(position.getEntryId()) .setEntryId(position.getEntryId())
Expand Down
Expand Up @@ -39,10 +39,14 @@
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;


import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
Expand All @@ -53,10 +57,15 @@
import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -2458,5 +2467,152 @@ public void operationFailed(ManagedLedgerException exception) {
} }
} }


/**
* Verifies cursor persists individually unack range into cursor-ledger if range count is higher than
* MaxUnackedRangesToPersistInZk
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exception {

final int totalAddEntries = 100;
String ledgerName = "my_test_ledger";
String cursorName = "c1";
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
// metaStore is allowed to store only up to 10 deleted entries range
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(10);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);

List<Position> addedPositions = new ArrayList<>();
for (int i = 0; i < totalAddEntries; i++) {
Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
addedPositions.add(p);
if (i % 2 == 0) {
// Acknowledge alternative message to create totalEntries/2 holes
c1.delete(addedPositions.get(i));
}
}

assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);

// Close ledger to persist individual-deleted positions into cursor-ledger
ledger.close();

// verify cursor-ledgerId is updated properly into cursor-metaStore
CountDownLatch cursorLedgerLatch = new CountDownLatch(1);
AtomicLong cursorLedgerId = new AtomicLong(0);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), cursorName, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo result, Stat stat) {
cursorLedgerId.set(result.getCursorsLedgerId());
cursorLedgerLatch.countDown();
}

@Override
public void operationFailed(MetaStoreException e) {
cursorLedgerLatch.countDown();
}
});
cursorLedgerLatch.await();
assertEquals(cursorLedgerId.get(), c1.getCursorLedger());

// verify cursor-ledger's last entry has individual-deleted positions
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
bkc.asyncOpenLedger(c1.getCursorLedger(), DigestType.MAC, "".getBytes(), (rc, lh, ctx) -> {
if (rc == BKException.Code.OK) {
lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> {
try {
LedgerEntry entry = seq.nextElement();
PositionInfo positionInfo;
positionInfo = PositionInfo.parseFrom(entry.getEntry());
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
} catch (Exception e) {
}
latch.countDown();
}, null);
} else {
latch.countDown();
}
}, null);

latch.await();
assertEquals(individualDeletedMessagesCount.get(), totalAddEntries / 2 - 1);

// Re-Open
factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor("c1");
// verify cursor has been recovered
assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);

// try to read entries which should only read non-deleted positions
List<Entry> entries = c1.readEntries(totalAddEntries);
assertEquals(entries.size(), totalAddEntries / 2);
}

/**
* Close Cursor without MaxUnackedRangesToPersistInZK: It should store individually unack range into Zk
*
* @throws Exception
*/
@Test(timeOut = 20000)
public void testOutOfOrderDeletePersistenceIntoZkWithClose() throws Exception {
final int totalAddEntries = 100;
String ledgerName = "my_test_ledger_zk";
String cursorName = "c1";
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);

ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);

List<Position> addedPositions = new ArrayList<>();
for (int i = 0; i < totalAddEntries; i++) {
Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
addedPositions.add(p);
if (i % 2 == 0) {
// Acknowledge alternative message to create totalEntries/2 holes
c1.delete(addedPositions.get(i));
}
}

assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);

// Close ledger to persist individual-deleted positions into cursor-ledger
ledger.close();

// verify cursor-ledgerId is updated as -1 into cursor-metaStore
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
ledger.getStore().asyncGetCursorInfo(ledger.getName(), cursorName, new MetaStoreCallback<ManagedCursorInfo>() {
@Override
public void operationComplete(ManagedCursorInfo result, Stat stat) {
individualDeletedMessagesCount.set(result.getIndividualDeletedMessagesCount());
latch.countDown();
}

@Override
public void operationFailed(MetaStoreException e) {
latch.countDown();
}
});
latch.await();
assertEquals(individualDeletedMessagesCount.get(), totalAddEntries / 2 - 1);

// Re-Open
factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle());
ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
// verify cursor has been recovered
assertEquals(c1.getNumberOfEntriesInBacklog(), totalAddEntries / 2);

// try to read entries which should only read non-deleted positions
List<Entry> entries = c1.readEntries(totalAddEntries);
assertEquals(entries.size(), totalAddEntries / 2);
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
} }
Expand Up @@ -255,7 +255,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
// that were acknowledged. After the max number of ranges is reached, the information // that were acknowledged. After the max number of ranges is reached, the information
// will only be tracked in memory and messages will be redelivered in case of // will only be tracked in memory and messages will be redelivered in case of
// crashes. // crashes.
private int managedLedgerMaxUnackedRangesToPersist = 1000; private int managedLedgerMaxUnackedRangesToPersist = 10000;
// Max number of "acknowledgment holes" that can be stored in Zookeeper. If number of unack message range is higher
// than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into
// zookeeper.
private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000;


/*** --- Load balancer --- ****/ /*** --- Load balancer --- ****/
// Enable load balancer // Enable load balancer
Expand Down Expand Up @@ -919,6 +923,15 @@ public void setManagedLedgerMaxUnackedRangesToPersist(int managedLedgerMaxUnacke
this.managedLedgerMaxUnackedRangesToPersist = managedLedgerMaxUnackedRangesToPersist; this.managedLedgerMaxUnackedRangesToPersist = managedLedgerMaxUnackedRangesToPersist;
} }


public int getManagedLedgerMaxUnackedRangesToPersistInZooKeeper() {
return managedLedgerMaxUnackedRangesToPersistInZooKeeper;
}

public void setManagedLedgerMaxUnackedRangesToPersistInZooKeeper(
int managedLedgerMaxUnackedRangesToPersistInZookeeper) {
this.managedLedgerMaxUnackedRangesToPersistInZooKeeper = managedLedgerMaxUnackedRangesToPersistInZookeeper;
}

public boolean isLoadBalancerEnabled() { public boolean isLoadBalancerEnabled() {
return loadBalancerEnabled; return loadBalancerEnabled;
} }
Expand Down
Expand Up @@ -663,6 +663,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(Destination
managedLedgerConfig.setDigestType(DigestType.CRC32); managedLedgerConfig.setDigestType(DigestType.CRC32);


managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
TimeUnit.MINUTES); TimeUnit.MINUTES);
Expand Down

0 comments on commit fed5ae3

Please sign in to comment.