Skip to content

Commit

Permalink
fix: Bookkeeper client throttling logic is based upon entryId instead…
Browse files Browse the repository at this point in the history
… of ledgerId

Descriptions of the changes in this PR:

Fixes: #2660 

### Changes

isWriteSetWritable() to use ledgerId for the client selection

Master Issue: #2660 

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Ivan Kelly <ivank@apache.org>

This closes #2664 from dlg99/fix/2660
  • Loading branch information
dlg99 committed May 3, 2021
1 parent e5e7666 commit 0f22d23
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
Expand Up @@ -880,7 +880,7 @@ CompletableFuture<LedgerEntries> readEntriesInternalAsync(long firstEntry,
// unresponsive thus helpful enough.
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry);
try {
if (!waitForWritable(ws, firstEntry, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
Expand Down Expand Up @@ -1213,7 +1213,7 @@ void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length
}

private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
long key, int allowedNonWritableCount) {
int allowedNonWritableCount) {
if (allowedNonWritableCount < 0) {
allowedNonWritableCount = 0;
}
Expand All @@ -1224,7 +1224,7 @@ private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
int nonWritableCount = 0;
List<BookieId> currentEnsemble = getCurrentEnsemble();
for (int i = 0; i < sz; i++) {
if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) {
if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) {
nonWritableCount++;
if (nonWritableCount >= allowedNonWritableCount) {
return false;
Expand All @@ -1239,21 +1239,21 @@ private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet,
return true;
}

protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key,
protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet,
int allowedNonWritableCount, long durationMs) {
if (durationMs < 0) {
return true;
}

final long startTime = MathUtils.nowInNano();
boolean success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
boolean success = isWriteSetWritable(writeSet, allowedNonWritableCount);

if (!success && durationMs > 0) {
int backoff = 1;
final int maxBackoff = 4;
final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs);

while (!isWriteSetWritable(writeSet, key, allowedNonWritableCount)) {
while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) {
if (MathUtils.nowInNano() < deadline) {
long maxSleep = MathUtils.elapsedMSec(startTime);
if (maxSleep < 0) {
Expand All @@ -1265,7 +1265,7 @@ protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long k
TimeUnit.MILLISECONDS.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
success = isWriteSetWritable(writeSet, key, allowedNonWritableCount);
success = isWriteSetWritable(writeSet, allowedNonWritableCount);
break;
}
if (backoff <= maxBackoff) {
Expand Down Expand Up @@ -1340,7 +1340,7 @@ public String toString() {

DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
try {
if (!waitForWritable(ws, op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
Expand Down
Expand Up @@ -264,7 +264,7 @@ public String toString() {
}

if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()),
op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) {
0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}

Expand Down

0 comments on commit 0f22d23

Please sign in to comment.