Skip to content

Commit

Permalink
Remove potential deadlock in offloading (#1961)
Browse files Browse the repository at this point in the history
Managed ledger has a lock, ledgersListMutex, to control write access
to the list of ledgers that form the managed ledger.

Normally this lock is locked in a non-blocking way (if it fails it
backs off and tries again). Offloading added some operations that
locked in a blocking manner, which can cause some deadlocking, if
operations are running on the orderedexecutor.

This patch changes the offloading operations to only use tryLock on
this lock.

Master Issue: #1511
  • Loading branch information
ivankelly authored and sijie committed Jun 13, 2018
1 parent dbfe9b7 commit 830eef1
Showing 1 changed file with 49 additions and 33 deletions.
Expand Up @@ -2053,11 +2053,10 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
}
Optional<Throwable> errorToReport = firstError;
synchronized (ManagedLedgerImpl.this) {
ledgersListMutex.lock();
// if the ledger doesn't exist anymore, ignore the error
if (ledgers.containsKey(ledgerId)) {
errorToReport = Optional.of(firstError.orElse(exception));
}
ledgersListMutex.unlock();
}

offloadLoop(promise, ledgersToOffload,
Expand Down Expand Up @@ -2086,43 +2085,60 @@ static class OffloadConflict extends ManagedLedgerException {
}

private CompletableFuture<Void> transformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation) {
CompletableFuture<Void> promise = new CompletableFuture<>();
synchronized (this) {
ledgersListMutex.lock();
CompletableFuture<Void> promise = new CompletableFuture<Void>();

promise.whenComplete((ignore, exception) -> {
ledgersListMutex.unlock();
});
tryTransformLedgerInfo(ledgerId, transformation, promise);

LedgerInfo oldInfo = ledgers.get(ledgerId);
if (oldInfo == null) {
promise.completeExceptionally(
new OffloadConflict(
"Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed"));
} else {
try {
LedgerInfo newInfo = transformation.transform(oldInfo);
ledgers.put(ledgerId, newInfo);
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
ledgersStat = stat;
promise.complete(null);
}
return promise;
}

@Override
public void operationFailed(MetaStoreException e) {
promise.completeExceptionally(e);
}
});
} catch (ManagedLedgerException mle) {
promise.completeExceptionally(mle);
private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
CompletableFuture<Void> finalPromise) {
synchronized (this) {
if (!ledgersListMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation,
finalPromise)),
100, TimeUnit.MILLISECONDS);
} else { // lock acquired
CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
unlockingPromise.whenComplete((res, ex) -> {
ledgersListMutex.unlock();
if (ex != null) {
finalPromise.completeExceptionally(ex);
} else {
finalPromise.complete(res);
}
});

LedgerInfo oldInfo = ledgers.get(ledgerId);
if (oldInfo == null) {
unlockingPromise.completeExceptionally(
new OffloadConflict(
"Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed"));
} else {
try {
LedgerInfo newInfo = transformation.transform(oldInfo);
ledgers.put(ledgerId, newInfo);
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat,
new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
ledgersStat = stat;
unlockingPromise.complete(null);
}

@Override
public void operationFailed(MetaStoreException e) {
unlockingPromise.completeExceptionally(e);
}
});
} catch (ManagedLedgerException mle) {
unlockingPromise.completeExceptionally(mle);
}
}
}

}
return promise;
}

private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid) {
Expand Down

0 comments on commit 830eef1

Please sign in to comment.