Skip to content

Commit

Permalink
Add ledger op timeout to avoid topics stuck on ledger-creation (#2535)
Browse files Browse the repository at this point in the history
* Add ledger op timeout to avoid topics stuck on ledger-creation

* rename to metadataOperationsTimeoutSeconds

* ad service config for managedLedgerMetadataOperationsTimeoutSeconds
  • Loading branch information
rdhabalia committed Sep 17, 2018
1 parent 858648e commit d5e88c1
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 103 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

### --- Load balancer --- ###

# Enable load balancer
Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Expand Up @@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

### --- Load balancer --- ###

loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
Expand Down
Expand Up @@ -56,6 +56,7 @@ public class ManagedLedgerConfig {
private boolean autoSkipNonRecoverableData;
private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
private long offloadAutoTriggerSizeThresholdBytes = -1;
private long metadataOperationsTimeoutSeconds = 60;

private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);
Expand Down Expand Up @@ -511,4 +512,24 @@ public ManagedLedgerConfig setClock(Clock clock) {
this.clock = clock;
return this;
}

/**
*
* Ledger-Op (Create/Delete) timeout
*
* @return
*/
public long getMetadataOperationsTimeoutSeconds() {
return metadataOperationsTimeoutSeconds;
}

/**
* Ledger-Op (Create/Delete) timeout after which callback will be completed with failure
*
* @param metadataOperationsTimeoutSeconds
*/
public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) {
this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
return this;
}
}
Expand Up @@ -1974,68 +1974,73 @@ void internalFlushPendingMarkDeletes() {
void createNewMetadataLedger(final VoidCallback callback) {
ledger.mbean.startCursorLedgerCreateOp();

bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(),
config.getMetadataAckQuorumSize(), digestType, config.getPassword(), (rc, lh, ctx) -> {
ledger.getExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
BKException.getMessage(rc));
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
return;
}
ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> {

if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

ledger.getExecutor().execute(safeRun(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
BKException.getMessage(rc));
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
return;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
}
// Created the ledger, now write the last position
// content
MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
persistPositionToLedger(lh, mdEntry, new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
}
// Created the ledger, now write the last position
// content
MarkDeleteEntry mdEntry = lastMarkDeleteEntry;
persistPositionToLedger(lh, mdEntry, new VoidCallback() {
switchToNewLedger(lh, new VoidCallback() {
@Override
public void operationComplete() {
if (log.isDebugEnabled()) {
log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);
}
switchToNewLedger(lh, new VoidCallback() {
@Override
public void operationComplete() {
callback.operationComplete();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
lh.getId());
}
}, null);
callback.operationFailed(exception);
}
});
callback.operationComplete();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);

ledger.mbean.startCursorLedgerDeleteOp();
bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
ledger.mbean.endCursorLedgerDeleteOp();
// it means it failed to switch the newly created ledger so, it should be
// deleted to prevent leak
bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> {
if (rc != BKException.Code.OK) {
log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(),
lh.getId());
}
}, null);
callback.operationFailed(exception);
}
});
}));
}, null, Collections.emptyMap());
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(),
mdEntry.newPosition, name);

ledger.mbean.startCursorLedgerDeleteOp();
bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
ledger.mbean.endCursorLedgerDeleteOp();
}
}, null);
callback.operationFailed(exception);
}
});
}));
}, Collections.emptyMap());

}

private List<LongProperty> buildPropertiesMap(Map<String, Long> properties) {
Expand Down
Expand Up @@ -22,17 +22,6 @@
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -51,6 +40,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -62,6 +52,7 @@
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.Backoff;
Expand Down Expand Up @@ -111,6 +102,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;

Expand Down Expand Up @@ -185,7 +187,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {

protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;

enum State {
None, // Uninitialized
LedgerOpened, // A ledger is ready to write into
Expand Down Expand Up @@ -364,39 +366,44 @@ public void operationFailed(MetaStoreException e) {
// Create a new ledger to start writing
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
digestType, config.getPassword(), (rc, lh, ctx) -> {
executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
return;
}

asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
while (lastConfirmedEntry.getEntryId() == -1) {
Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
if (formerLedger != null) {
LedgerInfo ledgerInfo = formerLedger.getValue();
lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
} else {
break;
}
}
if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

executor.executeOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
return;
}

LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
ledgers.put(lh.getId(), info);
log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
// bypass empty ledgers, find last ledger with Message if possible.
while (lastConfirmedEntry.getEntryId() == -1) {
Map.Entry<Long, LedgerInfo> formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId());
if (formerLedger != null) {
LedgerInfo ledgerInfo = formerLedger.getValue();
lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1);
} else {
break;
}
}

LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build();
ledgers.put(lh.getId(), info);

// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
}));
}, null, Collections.emptyMap());
// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
}));
}, Collections.emptyMap());
}

private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -564,9 +571,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), digestType, config.getPassword(), this, null,
Collections.emptyMap());
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
} else {
checkArgument(state == State.LedgerOpened, "ledger=%s is not opened", state);
Expand Down Expand Up @@ -1155,6 +1160,11 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}

if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}

mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
Expand Down Expand Up @@ -1320,9 +1330,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) {
STATE_UPDATER.set(this, State.CreatingLedger);
this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
mbean.startDataLedgerCreateOp();
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), digestType, config.getPassword(), this, null,
Collections.emptyMap());
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
}

Expand Down Expand Up @@ -2796,6 +2804,52 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
}
}

/**
* Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback
* with TimeoutException.
*
* @param bookKeeper
* @param config
* @param digestType
* @param cb
* @param emptyMap
*/
protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
CreateCallback cb, Map<Object, Object> emptyMap) {
AtomicBoolean ledgerCreated = new AtomicBoolean(false);
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(),
digestType, config.getPassword(), cb, ledgerCreated, Collections.emptyMap());
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
ledgerCreated.set(true);
cb.createComplete(BKException.Code.TimeoutException, null, null);
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
*
* @param rc
* @param lh
* @param ctx
* @return
*/
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx != null && ctx instanceof AtomicBoolean) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((AtomicBoolean) (ctx)).get()) {
if (rc == BKException.Code.OK) {
log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
}
return true;
}
((AtomicBoolean) ctx).set(true);
}
return false;
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}

0 comments on commit d5e88c1

Please sign in to comment.