Skip to content

Commit

Permalink
Introduce config to skip non-recoverable data-ledger (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia committed Jan 24, 2018
1 parent da4bd4a commit f5a2ec7
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 32 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Expand Up @@ -300,6 +300,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

### --- Load balancer --- ###

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Expand Up @@ -266,6 +266,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

### --- Load balancer --- ###

Expand Down
Expand Up @@ -51,6 +51,7 @@ public class ManagedLedgerConfig {
private double throttleMarkDelete = 0;
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;

private DigestType digestType = DigestType.MAC;
private byte[] password = "".getBytes(Charsets.UTF_8);
Expand Down Expand Up @@ -353,6 +354,20 @@ public long getRetentionSizeInMB() {
return retentionSizeInMB;
}

/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
* corrupted at bookkeeper and managed-cursor is stuck at that ledger.
*
* @param autoSkipNonRecoverableData
*/
public boolean isAutoSkipNonRecoverableData() {
return autoSkipNonRecoverableData;
}

public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
Expand Down
Expand Up @@ -31,6 +31,13 @@ public ManagedLedgerException(Throwable e) {
super(e);
}

public static ManagedLedgerException getManagedLedgerException(Throwable e) {
if (e instanceof ManagedLedgerException) {
return (ManagedLedgerException) e;
}
return new ManagedLedgerException(e);
}

public static class MetaStoreException extends ManagedLedgerException {
public MetaStoreException(Exception e) {
super(e);
Expand Down Expand Up @@ -89,6 +96,12 @@ public TooManyRequestsException(String msg) {
}
}

public static class NonRecoverableLedgerException extends ManagedLedgerException {
public NonRecoverableLedgerException(String msg) {
super(msg);
}
}

public static class InvalidReplayPositionException extends ManagedLedgerException {
public InvalidReplayPositionException(String msg) {
super(msg);
Expand Down
Expand Up @@ -25,14 +25,13 @@
import java.util.Collection;
import java.util.List;

import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.bookkeeper.mledger.util.RangeCache.Weighter;
Expand Down Expand Up @@ -184,7 +183,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt
lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), (rc, ledgerHandle, sequence, obj) -> {
if (rc != BKException.Code.OK) {
ml.invalidateLedgerHandle(ledgerHandle, rc);
callback.readEntryFailed(new ManagedLedgerException(BKException.create(rc)), obj);
callback.readEntryFailed(createManagedLedgerException(rc), obj);
return;
}

Expand Down Expand Up @@ -253,11 +252,11 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo

if (rc != BKException.Code.OK) {
if (rc == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(new TooManyRequestsException("Too many request error from bookies"),
ctx);
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
} else {
ml.invalidateLedgerHandle(lh1, rc);
callback.readEntriesFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
ManagedLedgerException mlException = createManagedLedgerException(rc);
callback.readEntriesFailed(mlException, ctx);
}
return;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import java.util.Enumeration;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo
lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object bkctx) {
if (rc != BKException.Code.OK) {
callback.readEntriesFailed(new ManagedLedgerException(BKException.create(rc)), ctx);
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
return;
}

Expand Down
Expand Up @@ -82,6 +82,8 @@
import com.google.protobuf.InvalidProtocolBufferException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedCursorImpl implements ManagedCursor {

Expand Down Expand Up @@ -281,7 +283,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc1)));
callback.operationFailed(createManagedLedgerException(rc1));
return;
}

Expand Down Expand Up @@ -330,8 +332,12 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (!ledger.ledgerExists(position.getLedgerId())) {
long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
position = PositionImpl.get(nextExistingLedger, -1);
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
position);
}
position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

Expand Down Expand Up @@ -1311,7 +1317,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
callback.markDeleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -1555,7 +1561,7 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
callback.deleteFailed(new ManagedLedgerException(e), ctx);
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -2040,7 +2046,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.operationFailed(createManagedLedgerException(rc));
}
}, null);
}
Expand Down Expand Up @@ -2127,7 +2133,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc == BKException.Code.OK) {
callback.closeComplete(ctx);
} else {
callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.closeFailed(createManagedLedgerException(rc), ctx);
}
}
}, ctx);
Expand Down Expand Up @@ -2301,6 +2307,11 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) {
return position.getNext();
}

public Position getNextLedgerPosition(long currentLedgerId) {
Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId);
return nextExistingLedger!=null ? PositionImpl.get(nextExistingLedger, 0) : null;
}

public boolean isIndividuallyDeletedEntriesEmpty() {
lock.readLock().lock();
try {
Expand Down
Expand Up @@ -71,6 +71,7 @@
import com.google.common.collect.Maps;

import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetaStore store;
Expand Down Expand Up @@ -432,7 +433,7 @@ public void operationFailed(MetaStoreException e) {
// Completed all the cursors info
callback.getInfoComplete(info, ctx);
}).exceptionally((ex) -> {
callback.getInfoFailed(new ManagedLedgerException(ex), ctx);
callback.getInfoFailed(getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import java.util.Iterator;
Expand Down Expand Up @@ -61,6 +62,8 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
Expand Down Expand Up @@ -89,6 +92,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
Expand Down Expand Up @@ -263,7 +267,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
initializeBookKeeper(callback);
} else {
log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc));
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
}));
Expand Down Expand Up @@ -337,7 +341,7 @@ public void operationFailed(MetaStoreException e) {
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}

Expand Down Expand Up @@ -908,7 +912,7 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx)
}
mbean.endDataLedgerCloseOp();
if (rc != BKException.Code.OK) {
callback.terminateFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.terminateFailed(createManagedLedgerException(rc), ctx);
} else {
lastConfirmedEntry = new PositionImpl(lh.getId(), lh.getLastAddConfirmed());
// Store the new state in metadata
Expand Down Expand Up @@ -1042,7 +1046,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
}
mbean.endDataLedgerCloseOp();
if (rc != BKException.Code.OK) {
callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.closeFailed(createManagedLedgerException(rc), ctx);
return;
}

Expand All @@ -1062,7 +1066,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {
Futures.waitForAll(futures).thenRun(() -> {
callback.closeComplete(ctx);
}).exceptionally(exception -> {
callback.closeFailed(new ManagedLedgerException(exception), ctx);
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
return null;
});
}
Expand All @@ -1078,7 +1082,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
ManagedLedgerException status = new ManagedLedgerException(BKException.getMessage(rc));
ManagedLedgerException status = createManagedLedgerException(rc);

// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
Expand Down Expand Up @@ -1278,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(new ManagedLedgerException(ex), opReadEntry.ctx);
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
return null;
});
}
Expand Down Expand Up @@ -1306,7 +1310,7 @@ CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give chance to reopen it later
ledgerCache.remove(ledgerId, future);
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
future.completeExceptionally(createManagedLedgerException(rc));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId());
Expand Down Expand Up @@ -1347,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct
entryCache.asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(new ManagedLedgerException(ex), ctx);
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -1744,7 +1748,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
int toDelete = ledgersToDelete.get();
if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) {
// Trigger callback only once
callback.deleteLedgerFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx);
}
}
}, null);
Expand Down Expand Up @@ -1956,7 +1960,7 @@ boolean ledgerExists(long ledgerId) {
return ledgers.get(ledgerId) != null;
}

long getNextValidLedger(long ledgerId) {
Long getNextValidLedger(long ledgerId) {
return ledgers.ceilingKey(ledgerId + 1);
}

Expand Down Expand Up @@ -2172,6 +2176,30 @@ public long getCacheSize() {
return entryCache.getSize();
}

/**
* return BK error codes that are considered not likely to be recoverable
*/
private static boolean isBkErrorNotRecoverable(int rc) {
switch (rc) {
case BKException.Code.NoSuchLedgerExistsException:
case BKException.Code.NoSuchEntryException:
return true;

default:
return false;
}
}

public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
} else {
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}

0 comments on commit f5a2ec7

Please sign in to comment.