Skip to content

Commit

Permalink
Trigger offload when managed ledger reaches a size threshold (#1960)
Browse files Browse the repository at this point in the history
When a managed ledger reaches a certain size, start offloading ledgers
in the background.

Master Issue: #1511
  • Loading branch information
ivankelly authored and sijie committed Jun 13, 2018
1 parent 830eef1 commit 5c2b0aa
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class ManagedLedgerConfig {
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
private long offloadAutoTriggerSizeThresholdBytes = -1;

private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
Expand Down Expand Up @@ -409,6 +410,27 @@ public long getOffloadLedgerDeletionLagMillis() {
return offloadLedgerDeletionLagMs;
}

/**
* Size, in bytes, at which the managed ledger will start to automatically offload ledgers to longterm storage.
* A negative value disables autotriggering.
* Offloading will not occur if no offloader has been set {@link #setLedgerOffloader(LedgerOffloader)}.
* Automatical offloading occurs when the ledger is rolled, and the ledgers up to that point exceed the threshold.
*
* @param threshold Threshold in bytes at which offload is automatically triggered
*/
public ManagedLedgerConfig setOffloadAutoTriggerSizeThresholdBytes(long threshold) {
this.offloadAutoTriggerSizeThresholdBytes = threshold;
return this;
}

/**
* Size, in bytes, at which offloading will automatically be triggered for this managed ledger.
* @return the trigger threshold, in bytes
*/
public long getOffloadAutoTriggerSizeThresholdBytes() {
return this.offloadAutoTriggerSizeThresholdBytes;
}

/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
* corrupted at bookkeeper and managed-cursor is stuck at that ledger.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public InvalidReplayPositionException(String msg) {
}
}

public static class OffloadInProgressException extends ManagedLedgerException {
public OffloadInProgressException(String msg) {
super(msg);
}
}

@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -164,6 +165,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex ledgersListMutex = new CallbackMutex();
private final CallbackMutex trimmerMutex = new CallbackMutex();

private final CallbackMutex offloadMutex = new CallbackMutex();
private final static CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE
= CompletableFuture.completedFuture(PositionImpl.latest);
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
Expand Down Expand Up @@ -1305,6 +1309,8 @@ synchronized void ledgerClosed(final LedgerHandle lh) {

trimConsumedLedgersInBackground();

maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);

if (!pendingAddEntries.isEmpty()) {
// Need to create a new ledger to write pending entries
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1575,6 +1581,63 @@ private void scheduleDeferredTrimming(CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(promise)), 100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getOffloadAutoTriggerSizeThresholdBytes() > 0) {
executor.executeOrdered(name, safeRun(() -> maybeOffload(promise)));
}
}

private void maybeOffload(CompletableFuture<PositionImpl> finalPromise) {
if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
100, TimeUnit.MILLISECONDS);
} else {
CompletableFuture<PositionImpl> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
offloadMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
}
});

long threshold = config.getOffloadAutoTriggerSizeThresholdBytes();
long sizeSummed = 0;
long alreadyOffloadedSize = 0;
long toOffloadSize = 0;

ConcurrentLinkedDeque<LedgerInfo> toOffload = new ConcurrentLinkedDeque();

// go through ledger list from newest to oldest and build a list to offload in oldest to newest order
for (Map.Entry<Long, LedgerInfo> e : ledgers.descendingMap().entrySet()) {
long size = e.getValue().getSize();
sizeSummed += size;
boolean alreadyOffloaded = e.getValue().hasOffloadContext()
&& e.getValue().getOffloadContext().getComplete();
if (alreadyOffloaded) {
alreadyOffloadedSize += size;
} else if (sizeSummed > threshold) {
toOffloadSize += size;
toOffload.addFirst(e.getValue());
}
}

if (toOffload.size() > 0) {
log.info("[{}] Going to automatically offload ledgers {}"
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
}

offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
}
}

private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
if (config.getRetentionTimeMillis() < 0) {
// Negative retention time equates to infinite retention
Expand Down Expand Up @@ -2002,18 +2065,25 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
return;
}

log.info("[{}] Going to offload ledgers {}", name,
ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()));
if (offloadMutex.tryLock()) {
log.info("[{}] Going to offload ledgers {}", name,
ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()));

CompletableFuture<PositionImpl> promise = new CompletableFuture<>();
offloadLoop(promise, ledgersToOffload, firstUnoffloaded, Optional.empty());
promise.whenComplete((result, exception) -> {
if (exception != null) {
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
} else {
callback.offloadComplete(result, ctx);
}
});
CompletableFuture<PositionImpl> promise = new CompletableFuture<>();
promise.whenComplete((result, exception) -> {
offloadMutex.unlock();
if (exception != null) {
callback.offloadFailed(new ManagedLedgerException(exception), ctx);
} else {
callback.offloadComplete(result, ctx);
}
});
offloadLoop(promise, ledgersToOffload, firstUnoffloaded, Optional.empty());
} else {
callback.offloadFailed(
new ManagedLedgerException.OffloadInProgressException("Offload operation already running"),
ctx);
}
}

private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,195 @@ public void testDontOffloadEmpty() throws Exception {
Assert.assertEquals(offloader.offloadedLedgers(), ImmutableSet.of(firstLedgerId, thirdLedgerId));
}

private static byte[] buildEntry(int size, String pattern) {
byte[] entry = new byte[size];
byte[] patternBytes = pattern.getBytes();

for (int i = 0; i < entry.length; i++) {
entry[i] = patternBytes[i % patternBytes.length];
}
return entry;
}

@Test
public void testAutoTriggerOffload() throws Exception {
MockLedgerOffloader offloader = new MockLedgerOffloader();
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

// Ledger will roll twice, offload will run on first ledger after second closed
for (int i = 0; i < 25; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3);

// offload should eventually be triggered
assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId()));
}

@Test
public void manualTriggerWhileAutoInProgress() throws Exception {
CompletableFuture<Void> slowOffload = new CompletableFuture<>();
CountDownLatch offloadRunning = new CountDownLatch(1);
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadRunning.countDown();
return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata));
}
};

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

// Ledger will roll twice, offload will run on first ledger after second closed
for (int i = 0; i < 25; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
offloadRunning.await();

for (int i = 0; i < 20; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
Position p = ledger.addEntry(buildEntry(10, "last-entry"));

try {
ledger.offloadPrefix(p);
Assert.fail("Shouldn't have succeeded");
} catch (ManagedLedgerException.OffloadInProgressException e) {
// expected
}

slowOffload.complete(null);

// eventually all over threshold will be offloaded
assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
ledger.getLedgersInfoAsList().get(2).getLedgerId()));

// then a manual offload can run and offload the one ledger under the threshold
ledger.offloadPrefix(p);

Assert.assertEquals(offloader.offloadedLedgers().size(), 4);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
ledger.getLedgersInfoAsList().get(2).getLedgerId(),
ledger.getLedgersInfoAsList().get(3).getLedgerId()));
}

@Test
public void autoTriggerWhileManualInProgress() throws Exception {
CompletableFuture<Void> slowOffload = new CompletableFuture<>();
CountDownLatch offloadRunning = new CountDownLatch(1);
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadRunning.countDown();
return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata));
}
};

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

// Ledger rolls once, threshold not hit so auto shouldn't run
for (int i = 0; i < 14; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
Position p = ledger.addEntry(buildEntry(10, "trigger-entry"));

OffloadCallbackPromise cbPromise = new OffloadCallbackPromise();
ledger.asyncOffloadPrefix(p, cbPromise, null);
offloadRunning.await();

// add enough entries to roll the ledger a couple of times and trigger some offloads
for (int i = 0; i < 20; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

// allow the manual offload to complete
slowOffload.complete(null);

Assert.assertEquals(cbPromise.join(),
PositionImpl.get(ledger.getLedgersInfoAsList().get(1).getLedgerId(), 0));

// auto trigger should eventually offload everything else over threshold
assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
ledger.getLedgersInfoAsList().get(1).getLedgerId()));
}

@Test
public void multipleAutoTriggers() throws Exception {
CompletableFuture<Void> slowOffload = new CompletableFuture<>();
CountDownLatch offloadRunning = new CountDownLatch(1);
MockLedgerOffloader offloader = new MockLedgerOffloader() {
@Override
public CompletableFuture<Void> offload(ReadHandle ledger,
UUID uuid,
Map<String, String> extraMetadata) {
offloadRunning.countDown();
return slowOffload.thenCompose((res) -> super.offload(ledger, uuid, extraMetadata));
}
};

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(10);
config.setOffloadAutoTriggerSizeThresholdBytes(100);
config.setRetentionTime(10, TimeUnit.MINUTES);
config.setLedgerOffloader(offloader);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);

// Ledger will roll twice, offload will run on first ledger after second closed
for (int i = 0; i < 25; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}
offloadRunning.await();

// trigger a bunch more rolls. Eventually there will be 5 ledgers.
// first 3 should be offloaded, 4th is 100bytes, 5th is 0 bytes.
// 4th and 5th sum to 100 bytes so they're just at edge of threshold
for (int i = 0; i < 20; i++) {
ledger.addEntry(buildEntry(10, "entry-" + i));
}

// allow the first offload to continue
slowOffload.complete(null);

assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
Assert.assertEquals(offloader.offloadedLedgers(),
ImmutableSet.of(ledger.getLedgersInfoAsList().get(0).getLedgerId(),
ledger.getLedgersInfoAsList().get(1).getLedgerId(),
ledger.getLedgersInfoAsList().get(2).getLedgerId()));
}

static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
// wait up to 3 seconds
for (int i = 0; i < 30 && !predicate.getAsBoolean(); i++) {
Expand Down

0 comments on commit 5c2b0aa

Please sign in to comment.