diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 806dd8b20d0..48818bd2f5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -880,7 +880,7 @@ CompletableFuture readEntriesInternalAsync(long firstEntry, // unresponsive thus helpful enough. DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(firstEntry); try { - if (!waitForWritable(ws, firstEntry, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) { + if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) { op.allowFailFastOnUnwritableChannel(); } } finally { @@ -1213,7 +1213,7 @@ void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length } private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet, - long key, int allowedNonWritableCount) { + int allowedNonWritableCount) { if (allowedNonWritableCount < 0) { allowedNonWritableCount = 0; } @@ -1224,7 +1224,7 @@ private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet, int nonWritableCount = 0; List currentEnsemble = getCurrentEnsemble(); for (int i = 0; i < sz; i++) { - if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), key)) { + if (!clientCtx.getBookieClient().isWritable(currentEnsemble.get(i), ledgerId)) { nonWritableCount++; if (nonWritableCount >= allowedNonWritableCount) { return false; @@ -1239,21 +1239,21 @@ private boolean isWriteSetWritable(DistributionSchedule.WriteSet writeSet, return true; } - protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long key, + protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, int allowedNonWritableCount, long durationMs) { if (durationMs < 0) { return true; } final long startTime = MathUtils.nowInNano(); - boolean success = isWriteSetWritable(writeSet, key, allowedNonWritableCount); + boolean success = isWriteSetWritable(writeSet, allowedNonWritableCount); if (!success && durationMs > 0) { int backoff = 1; final int maxBackoff = 4; final long deadline = startTime + TimeUnit.MILLISECONDS.toNanos(durationMs); - while (!isWriteSetWritable(writeSet, key, allowedNonWritableCount)) { + while (!isWriteSetWritable(writeSet, allowedNonWritableCount)) { if (MathUtils.nowInNano() < deadline) { long maxSleep = MathUtils.elapsedMSec(startTime); if (maxSleep < 0) { @@ -1265,7 +1265,7 @@ protected boolean waitForWritable(DistributionSchedule.WriteSet writeSet, long k TimeUnit.MILLISECONDS.sleep(sleepMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - success = isWriteSetWritable(writeSet, key, allowedNonWritableCount); + success = isWriteSetWritable(writeSet, allowedNonWritableCount); break; } if (backoff <= maxBackoff) { @@ -1340,7 +1340,7 @@ public String toString() { DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId()); try { - if (!waitForWritable(ws, op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) { + if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) { op.allowFailFastOnUnwritableChannel(); } } finally { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 2ea0e0a408f..1bdc653650f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -264,7 +264,7 @@ public String toString() { } if (!waitForWritable(distributionSchedule.getWriteSet(op.getEntryId()), - op.getEntryId(), 0, clientCtx.getConf().waitForWriteSetMs)) { + 0, clientCtx.getConf().waitForWriteSetMs)) { op.allowFailFastOnUnwritableChannel(); }