From 6034b7420b7be93025cebac691a114c50f05f9b5 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 22 May 2018 17:22:44 +0200 Subject: [PATCH] BP-14 force() API --- .../apache/bookkeeper/client/BookKeeper.java | 5 + .../client/BookKeeperClientStats.java | 3 + .../client/DistributionSchedule.java | 12 ++ .../bookkeeper/client/ForceLedgerOp.java | 121 +++++++++++ .../bookkeeper/client/LedgerHandle.java | 65 ++++++ .../RoundRobinDistributionSchedule.java | 10 + .../client/api/ForceableHandle.java | 50 +++++ .../bookkeeper/client/api/WriteAdvHandle.java | 2 +- .../bookkeeper/client/api/WriteFlag.java | 2 + .../bookkeeper/client/api/WriteHandle.java | 2 +- .../apache/bookkeeper/proto/BookieClient.java | 25 +++ .../proto/BookkeeperInternalCallbacks.java | 7 + .../proto/PerChannelBookieClient.java | 80 +++++++ .../bookie/BookieDeferredSyncTest.java | 197 ++++++++++++++++++ .../bookkeeper/client/DeferredSyncTest.java | 109 +++++++++- .../client/MockBookKeeperTestCase.java | 120 ++++++++--- 16 files changed, 775 insertions(+), 35 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 98d36519252..307a740a290 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -117,6 +117,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { private OpStatsLogger readLacAndEntryOpLogger; private OpStatsLogger readLacAndEntryRespLogger; private OpStatsLogger addOpLogger; + private OpStatsLogger forceOpLogger; private OpStatsLogger writeLacOpLogger; private OpStatsLogger readLacOpLogger; private OpStatsLogger recoverAddEntriesStats; @@ -1501,6 +1502,7 @@ private void initOpLoggers(StatsLogger stats) { readLacAndEntryRespLogger = stats.getOpStatsLogger( BookKeeperClientStats.READ_LAST_CONFIRMED_AND_ENTRY_RESPONSE); addOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.ADD_OP); + forceOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.FORCE_OP); addOpUrCounter = stats.getCounter(BookKeeperClientStats.ADD_OP_UR); writeLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.WRITE_LAC_OP); readLacOpLogger = stats.getOpStatsLogger(BookKeeperClientStats.READ_LAC_OP); @@ -1534,6 +1536,9 @@ OpStatsLogger getReadLacAndEntryRespLogger() { OpStatsLogger getAddOpLogger() { return addOpLogger; } + OpStatsLogger getForceOpLogger() { + return forceOpLogger; + } OpStatsLogger getWriteLacOpLogger() { return writeLacOpLogger; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java index 83e6421863c..6e24bcff023 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java @@ -41,6 +41,7 @@ public interface BookKeeperClientStats { String ADD_OP = "ADD_ENTRY"; String ADD_OP_UR = "ADD_ENTRY_UR"; // Under Replicated during AddEntry. + String FORCE_OP = "FORCE"; String READ_OP = "READ_ENTRY"; // Corrupted entry (Digest Mismatch/ Under Replication) detected during ReadEntry String READ_OP_DM = "READ_ENTRY_DM"; @@ -63,7 +64,9 @@ public interface BookKeeperClientStats { String CHANNEL_ADD_OP = "ADD_ENTRY"; String CHANNEL_TIMEOUT_ADD = "TIMEOUT_ADD_ENTRY"; String CHANNEL_WRITE_LAC_OP = "WRITE_LAC"; + String CHANNEL_FORCE_OP = "FORCE"; String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC"; + String CHANNEL_TIMEOUT_FORCE = "TIMEOUT_FORCE"; String CHANNEL_READ_LAC_OP = "READ_LAC"; String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC"; String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO"; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java index 2bd2a996296..849498ce9f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DistributionSchedule.java @@ -149,6 +149,13 @@ public WriteSet copy() { */ WriteSet getWriteSetForLongPoll(long entryId); + /** + * Return the set of bookies indices to send the messages to for force ledger. + * + * @return the set of bookies indices to send force request to. + */ + WriteSet getWriteSetForForceLedger(); + /** * An ack set represents the set of bookies from which * a response must be received so that an entry can be @@ -197,6 +204,11 @@ interface AckSet { */ AckSet getAckSet(); + /** + * Returns an ackset object useful to wait for all bookies in the ensemble, + * responses should be checked against this. + */ + AckSet getAckSetForForceLedger(); /** * Interface to keep track of which bookies in an ensemble, an action diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java new file mode 100644 index 00000000000..98ca04b3eb5 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ForceLedgerOp.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.common.concurrent.FutureUtils; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback; +import org.apache.bookkeeper.util.SafeRunnable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This represents a request to sync the ledger on every bookie. + */ +class ForceLedgerOp extends SafeRunnable implements ForceLedgerCallback { + + private static final Logger LOG = LoggerFactory.getLogger(ForceLedgerOp.class); + CompletableFuture cb; + BitSet receivedResponseSet; + + DistributionSchedule.AckSet ackSet; + boolean completed = false; + int lastSeenError = BKException.Code.WriteException; + ArrayList currentEnsemble; + + long currentNonDurableLastAddConfirmed = LedgerHandle.INVALID_ENTRY_ID; + + LedgerHandle lh; + + ForceLedgerOp(LedgerHandle lh, CompletableFuture cb) { + this.lh = lh; + this.cb = cb; + } + + void sendForceLedgerRequest(int bookieIndex) { + lh.bk.getBookieClient().forceLedger(currentEnsemble.get(bookieIndex), lh.ledgerId, this, bookieIndex); + } + + @Override + public void safeRun() { + initiate(); + } + + void initiate() { + + // capture currentNonDurableLastAddConfirmed, we are inside OrderedExecutor + this.currentNonDurableLastAddConfirmed = lh.pendingAddsSequenceHead; + LOG.debug("start force() on ledger {} capturing {} ", lh.ledgerId, currentNonDurableLastAddConfirmed); + + // we need to send the request to every bookie in the ensamble + this.currentEnsemble = lh.metadata.currentEnsemble; + this.ackSet = lh.distributionSchedule.getAckSetForForceLedger(); + this.receivedResponseSet = new BitSet( + lh.getLedgerMetadata().getEnsembleSize()); + this.receivedResponseSet.set(0, + lh.getLedgerMetadata().getEnsembleSize()); + + DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSetForForceLedger(); + try { + for (int i = 0; i < writeSet.size(); i++) { + sendForceLedgerRequest(writeSet.get(i)); + } + } finally { + writeSet.recycle(); + } + } + + @Override + public void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) { + int bookieIndex = (Integer) ctx; + + if (completed) { + return; + } + + if (BKException.Code.OK != rc) { + lastSeenError = rc; + } + + // We got response. + receivedResponseSet.clear(bookieIndex); + + if (rc == BKException.Code.OK) { + if (ackSet.completeBookieAndCheck(bookieIndex) && !completed) { + completed = true; + // we are able to say that every bookie sync'd its own journal + // for every ackknowledged entry before issuing the force() call + LOG.info("After force on ledger {} updating LastAddConfirmed to {} ", + ledgerId, currentNonDurableLastAddConfirmed); + lh.lastAddConfirmed = currentNonDurableLastAddConfirmed; + FutureUtils.complete(cb, null); + return; + } + } else { + LOG.warn("ForceLedger did not succeed: Ledger {} on {}", new Object[]{ledgerId, addr}); + } + + if (receivedResponseSet.isEmpty()) { + completed = true; + FutureUtils.completeExceptionally(cb, BKException.create(lastSeenError)); + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index b3fcbf7ee5a..660f1d12157 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -1099,6 +1099,67 @@ public void asyncAddEntry(final long entryId, final byte[] data, final int offse cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx); } + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture force() { + CompletableFuture result = new CompletableFuture<>(); + ForceLedgerOp op = new ForceLedgerOp(this, result); + boolean wasClosed = false; + synchronized (this) { + // synchronized on this to ensure that + // the ledger isn't closed between checking and + // updating lastAddPushed + if (metadata.isClosed()) { + wasClosed = true; + } + } + + if (wasClosed) { + // make sure the callback is triggered in main worker pool + try { + bk.getMainWorkerPool().submit(new SafeRunnable() { + @Override + public void safeRun() { + LOG.warn("Attempt to use a closed ledger: {}", ledgerId); + result.completeExceptionally(new BKException.BKLedgerClosedException()); + } + + @Override + public String toString() { + return String.format("force(lid=%d)", ledgerId); + } + }); + } catch (RejectedExecutionException e) { + result.completeExceptionally(new BKException.BKInterruptedException()); + } + return result; + } + + if (pendingAddsSequenceHead == INVALID_ENTRY_ID) { + bk.getMainWorkerPool().submit(new SafeRunnable() { + @Override + public void safeRun() { + FutureUtils.complete(result, null); + } + + @Override + public String toString() { + return String.format("force(lid=%d)", ledgerId); + } + }); + return result; + } + + try { + bk.getMainWorkerPool().executeOrdered(ledgerId, op); + } catch (RejectedExecutionException e) { + result.completeExceptionally(new BKException.BKInterruptedException()); + } + return result; + } + /** * Make a recovery add entry request. Recovery adds can add to a ledger even * if it has been fenced. @@ -1685,6 +1746,10 @@ void errorOutPendingAdds(int rc, List ops) { } } + void forceCompleted(long entryId) { + + } + void sendAddSuccessCallbacks() { // Start from the head of the queue and proceed while there are // entries that have had all their responses come back diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java index e399b0191ef..a7066702db2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RoundRobinDistributionSchedule.java @@ -60,6 +60,11 @@ public WriteSet getWriteSetForLongPoll(long entryId) { return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId); } + @Override + public WriteSet getWriteSetForForceLedger() { + return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, 0); + } + @VisibleForTesting static WriteSet writeSetFromValues(Integer... values) { WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0); @@ -252,6 +257,11 @@ public AckSet getAckSet() { return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize); } + @Override + public AckSet getAckSetForForceLedger() { + return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize); + } + private static class AckSetImpl implements AckSet { private int writeQuorumSize; private int ackQuorumSize; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java new file mode 100644 index 00000000000..48d7acb9eb8 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ForceableHandle.java @@ -0,0 +1,50 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client.api; + +import java.util.concurrent.CompletableFuture; +import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public; +import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; + +/** + * Provide the ability to enforce durability guarantees to the writer. + * + * @see WriteAdvHandle + * @see WriteHandle + * + * @since 4.8 + */ +@Public +@Unstable +public interface ForceableHandle { + + /** + * Enforce durability to the entries written by this handle. + *

This API is useful with {@link WriteFlag#DEFERRED_SYNC}, because with + * that flag writes are acknowledged by the bookie without waiting for a + * durable write + *

+ * + * @return an handle to the result + */ + CompletableFuture force(); + +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java index 37f45b9b770..c24c6d0279a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteAdvHandle.java @@ -39,7 +39,7 @@ */ @Public @Unstable -public interface WriteAdvHandle extends ReadHandle { +public interface WriteAdvHandle extends ReadHandle, ForceableHandle { /** * Add entry asynchronously to an open ledger. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java index 30199c26899..6914abeb256 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteFlag.java @@ -32,6 +32,8 @@ public enum WriteFlag { /** * Writes will be acknowledged after writing to the filesystem * but not yet been persisted to disks. + * + * @see ForceableHandle#force() */ DEFERRED_SYNC(0x1 << 0); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java index b2c04597f7a..edad5f46022 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/WriteHandle.java @@ -38,7 +38,7 @@ */ @Public @Unstable -public interface WriteHandle extends ReadHandle { +public interface WriteHandle extends ReadHandle, ForceableHandle { /** * Add entry asynchronously to an open ledger. diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 3dd837cd6d5..142301ac25d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -55,6 +55,7 @@ import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; @@ -193,6 +194,30 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) { return clientPool; } + public void forceLedger(final BookieSocketAddress addr, final long ledgerId, + final ForceLedgerCallback cb, final Object ctx) { + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.forceLedgerComplete(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, addr, null); + return; + } + + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.executeOrdered(ledgerId, safeRun(() -> { + cb.forceLedgerComplete(rc, ledgerId, addr, ctx); + })); + } catch (RejectedExecutionException re) { + cb.forceLedgerComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); + } + } else { + pcbc.forceLedger(ledgerId, cb, ctx); + } + }, ledgerId); + } + public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) { final PerChannelBookieClientPool client = lookupClient(addr); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java index 04a45465324..cd87ff1a05c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java @@ -90,6 +90,13 @@ public interface WriteLacCallback { void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx); } + /** + * Force callback interface. + */ + public interface ForceLedgerCallback { + void forceLedgerComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx); + } + /** * A callback interface for a STARTTLS command. */ diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 4d69acf850c..48a06ea6b7b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -92,6 +92,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; @@ -103,6 +104,8 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.AddRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.AddResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerRequest; +import org.apache.bookkeeper.proto.BookkeeperProtocol.ForceLedgerResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest; import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoResponse; import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType; @@ -173,9 +176,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { private final OpStatsLogger readTimeoutOpLogger; private final OpStatsLogger addEntryOpLogger; private final OpStatsLogger writeLacOpLogger; + private final OpStatsLogger forceLedgerOpLogger; private final OpStatsLogger readLacOpLogger; private final OpStatsLogger addTimeoutOpLogger; private final OpStatsLogger writeLacTimeoutOpLogger; + private final OpStatsLogger forceLedgerTimeoutOpLogger; private final OpStatsLogger readLacTimeoutOpLogger; private final OpStatsLogger getBookieInfoOpLogger; private final OpStatsLogger getBookieInfoTimeoutOpLogger; @@ -280,11 +285,13 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor readEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_OP); addEntryOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_ADD_OP); writeLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_WRITE_LAC_OP); + forceLedgerOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_FORCE_OP); readLacOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_READ_LAC_OP); getBookieInfoOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.GET_BOOKIE_INFO_OP); readTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ); addTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_ADD); writeLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_WRITE_LAC); + forceLedgerTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_FORCE); readLacTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_TIMEOUT_READ_LAC); getBookieInfoTimeoutOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.TIMEOUT_GET_BOOKIE_INFO); startTLSOpLogger = statsLogger.getOpStatsLogger(BookKeeperClientStats.CHANNEL_START_TLS_OP); @@ -565,6 +572,30 @@ void writeLac(final long ledgerId, final byte[] masterKey, final long lac, ByteB writeAndFlush(channel, completionKey, writeLacRequest); } + void forceLedger(final long ledgerId, ForceLedgerCallback cb, Object ctx) { + final long txnId = getTxnId(); + final CompletionKey completionKey = new V3CompletionKey(txnId, + OperationType.FORCE_LEDGER); + // force is mostly like addEntry hence uses addEntryTimeout + completionObjects.put(completionKey, + new ForceLedgerCompletion(completionKey, cb, + ctx, ledgerId)); + + // Build the request + BKPacketHeader.Builder headerBuilder = BKPacketHeader.newBuilder() + .setVersion(ProtocolVersion.VERSION_THREE) + .setOperation(OperationType.FORCE_LEDGER) + .setTxnId(txnId); + ForceLedgerRequest.Builder writeLacBuilder = ForceLedgerRequest.newBuilder() + .setLedgerId(ledgerId); + + final Request forceLedgerRequest = Request.newBuilder() + .setHeader(headerBuilder) + .setForceLedgerRequest(writeLacBuilder) + .build(); + writeAndFlush(channel, completionKey, forceLedgerRequest); + } + /** * This method should be called only after connection has been checked for * {@link #connectIfNeededAndDoOp(GenericCallback)}. @@ -1563,6 +1594,55 @@ public void handleV3Response(BookkeeperProtocol.Response response) { } } + class ForceLedgerCompletion extends CompletionValue { + final ForceLedgerCallback cb; + + public ForceLedgerCompletion(final CompletionKey key, + final ForceLedgerCallback originalCallback, + final Object originalCtx, + final long ledgerId) { + super("ForceLedger", + originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + forceLedgerOpLogger, forceLedgerTimeoutOpLogger); + this.cb = new ForceLedgerCallback() { + @Override + public void forceLedgerComplete(int rc, long ledgerId, + BookieSocketAddress addr, + Object ctx) { + logOpResult(rc); + originalCallback.forceLedgerComplete(rc, ledgerId, + addr, originalCtx); + key.release(); + } + }; + } + + @Override + public void errorOut() { + errorOut(BKException.Code.BookieHandleNotAvailableException); + } + + @Override + public void errorOut(final int rc) { + errorOutAndRunCallback( + () -> cb.forceLedgerComplete(rc, ledgerId, addr, ctx)); + } + + @Override + public void handleV3Response(BookkeeperProtocol.Response response) { + ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse(); + StatusCode status = response.getStatus() == StatusCode.EOK + ? forceLedgerResponse.getStatus() : response.getStatus(); + long ledgerId = forceLedgerResponse.getLedgerId(); + + if (LOG.isDebugEnabled()) { + logResponse(status, "ledger", ledgerId); + } + int rc = convertStatus(status, BKException.Code.WriteException); + cb.forceLedgerComplete(rc, ledgerId, addr, ctx); + } + } + // visible for testing class ReadLacCompletion extends CompletionValue { final ReadLacCallback cb; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java new file mode 100644 index 00000000000..d473edac6dc --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieDeferredSyncTest.java @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.bookie; + +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.assertEquals; + +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.client.api.WriteFlag; +import org.apache.bookkeeper.client.api.WriteHandle; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; + +/** + * Test the bookie journal without sync, driven by client with {@link WriteFlag#DEFERRED_SYNC} write flag. + */ +public class BookieDeferredSyncTest extends BookKeeperClusterTestCase { + + public BookieDeferredSyncTest() { + super(1); + } + + @Test + public void testWriteAndRecovery() throws Exception { + WriteHandle lh = result(bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32C) + .withPassword(new byte[0]) + .execute()); + + int n = 10; + + long ledgerId = lh.getId(); + + for (int i = 0; i < n; i++) { + lh.append(("entry-" + i).getBytes()); + } + + restartBookies(); + + try (ReadHandle readLh = result(bkc.newOpenLedgerOp() + .withLedgerId(ledgerId) + .withRecovery(true) + .withPassword(new byte[0]) + .execute());) { + + try (LedgerEntries entries = readLh.read(0, n - 1)) { + for (int i = 0; i < n; i++) { + org.apache.bookkeeper.client.api.LedgerEntry entry = entries.getEntry(i); + assertEquals("entry-" + i, new String(entry.getEntryBytes())); + } + } + } + } + + @Test + public void testCloseNoForce() throws Exception { + WriteHandle lh = result(bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32C) + .withPassword(new byte[0]) + .execute()); + + int n = 10; + + long ledgerId = lh.getId(); + + for (int i = 0; i < n; i++) { + lh.append(("entry-" + i).getBytes()); + } + + // this will close metadata, writing LastAddConfirmed = -1 + assertEquals(-1, lh.getLastAddConfirmed()); + lh.close(); + + restartBookies(); + + try (ReadHandle readLh = result(bkc.newOpenLedgerOp() + .withLedgerId(ledgerId) + .withRecovery(true) + .withPassword(new byte[0]) + .execute());) { + assertEquals(-1, readLh.getLastAddConfirmed()); + } + + } + + @Test + public void testCloseWithForce() throws Exception { + WriteHandle lh = result(bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32C) + .withPassword(new byte[0]) + .execute()); + + int n = 10; + + long ledgerId = lh.getId(); + + for (int i = 0; i < n; i++) { + lh.append(("entry-" + i).getBytes()); + } + + result(lh.force()); + assertEquals(n - 1, lh.getLastAddConfirmed()); + + lh.close(); + + restartBookies(); + + try (ReadHandle readLh = result(bkc.newOpenLedgerOp() + .withLedgerId(ledgerId) + .withRecovery(true) + .withPassword(new byte[0]) + .execute());) { + + try (LedgerEntries entries = readLh.read(0, n - 1)) { + for (int i = 0; i < n; i++) { + org.apache.bookkeeper.client.api.LedgerEntry entry = entries.getEntry(i); + assertEquals("entry-" + i, new String(entry.getEntryBytes())); + } + } + } + } + + @Test + public void testForceRoundTripWithDeferredSync() throws Exception { + try (WriteHandle lh = result(bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32C) + .withPassword(new byte[0]) + .execute());) { + int n = 10; + for (int i = 0; i < n; i++) { + lh.append(("entry-" + i).getBytes()); + } + result(lh.force()); + assertEquals(n - 1, lh.getLastAddConfirmed()); + + lh.close(); + } + } + + @Test + public void testForceRoundTripWithoutDeferredSync() throws Exception { + try (WriteHandle lh = result(bkc.newCreateLedgerOp() + .withEnsembleSize(1) + .withWriteQuorumSize(1) + .withAckQuorumSize(1) + .withWriteFlags(WriteFlag.NONE) + .withDigestType(org.apache.bookkeeper.client.api.DigestType.CRC32C) + .withPassword(new byte[0]) + .execute());) { + int n = 10; + for (int i = 0; i < n; i++) { + lh.append(("entry-" + i).getBytes()); + } + // this should work even with non-DEFERRED_SYNC writers + result(lh.force()); + assertEquals(n - 1, lh.getLastAddConfirmed()); + + lh.close(); + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java index 4bd5a8cb807..f638a2679c8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java @@ -22,8 +22,10 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.api.WriteFlag; import org.apache.bookkeeper.client.api.WriteHandle; +import org.apache.bookkeeper.net.BookieSocketAddress; import org.junit.Test; /** @@ -49,10 +51,111 @@ public void testAddEntryLastAddConfirmedDoesNotAdvance() throws Exception { } long lastEntryID = result(wh.appendAsync(DATA)); assertEquals(NUM_ENTRIES - 1, lastEntryID); + assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); + assertEquals(-1, wh.getLastAddConfirmed()); + } + } + + @Test + public void testAddEntryLastAddConfirmedAdvanceWithForce() throws Exception { + try (WriteHandle wh = result(newCreateLedgerOp() + .withEnsembleSize(3) + .withWriteQuorumSize(3) + .withAckQuorumSize(2) + .withPassword(PASSWORD) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .execute())) { + for (int i = 0; i < NUM_ENTRIES - 1; i++) { + result(wh.appendAsync(DATA)); + } + long lastEntryID = result(wh.appendAsync(DATA)); + assertEquals(NUM_ENTRIES - 1, lastEntryID); + assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); + assertEquals(-1, wh.getLastAddConfirmed()); + result(wh.force()); + assertEquals(NUM_ENTRIES - 1, wh.getLastAddConfirmed()); + } + } + + @Test + public void testForceRequiresFullEnsemble() throws Exception { + try (WriteHandle wh = result(newCreateLedgerOp() + .withEnsembleSize(3) + .withWriteQuorumSize(2) + .withAckQuorumSize(2) + .withPassword(PASSWORD) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .execute())) { + for (int i = 0; i < NUM_ENTRIES - 1; i++) { + result(wh.appendAsync(DATA)); + } + long lastEntryID = result(wh.appendAsync(DATA)); + assertEquals(NUM_ENTRIES - 1, lastEntryID); + assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); + assertEquals(-1, wh.getLastAddConfirmed()); + + BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0); + killBookie(bookieAddress); + + // write should succeed (we still have 2 bookies out of 3) + result(wh.appendAsync(DATA)); + + // force cannot go, it must be acknowledged by all of the bookies in the ensamble + try { + result(wh.force()); + } catch (BKException.BKBookieException failed) { + } + // bookie comes up again, force must succeed + startKilledBookie(bookieAddress); + result(wh.force()); + } + } + + @Test + public void testForceWillAdvanceLacOnlyUpToLastAcknoledgedWrite() throws Exception { + try (WriteHandle wh = result(newCreateLedgerOp() + .withEnsembleSize(3) + .withWriteQuorumSize(3) + .withAckQuorumSize(3) + .withPassword(PASSWORD) + .withWriteFlags(WriteFlag.DEFERRED_SYNC) + .execute())) { + for (int i = 0; i < NUM_ENTRIES - 1; i++) { + result(wh.appendAsync(DATA)); + } + long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA)); + assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend); + assertEquals(-1, wh.getLastAddConfirmed()); + + // one bookie will stop sending acks + BookieSocketAddress bookieAddress = wh.getLedgerMetadata().getEnsembleAt(wh.getLastAddPushed()).get(0); + suspendBookieWriteAcks(bookieAddress); + + // start a write + CompletableFuture suspendedWrite = wh.appendAsync(DATA); + long lastAddPushedAfterSuspendedWrite = wh.getLastAddPushed(); LedgerHandle lh = (LedgerHandle) wh; - assertEquals(NUM_ENTRIES - 1, lh.getLastAddPushed()); - assertEquals(-1, lh.getLastAddConfirmed()); + // we are waiting for the ack for that entry + assertEquals(lastEntryIdBeforeSuspend, lh.pendingAddsSequenceHead); + assertEquals(lastAddPushedAfterSuspendedWrite - 1, lh.pendingAddsSequenceHead); + + // start and complete a force, lastAddConfirmed cannot be "lastAddPushedAfterSuspendedWrite" + // because the write has not yet been acknowledged by AckQuorumSize Bookies + result(wh.force()); + assertEquals(lastEntryIdBeforeSuspend, wh.getLastAddConfirmed()); + + // receive the ack + resumeBookieWriteAcks(bookieAddress); + long suspendedWriteEntryId = result(suspendedWrite); + assertEquals(lastAddPushedAfterSuspendedWrite, suspendedWriteEntryId); + + assertEquals(suspendedWriteEntryId, wh.getLastAddPushed()); + + assertEquals(lastEntryIdBeforeSuspend, wh.getLastAddConfirmed()); + + result(wh.force()); + assertEquals(suspendedWriteEntryId, wh.getLastAddConfirmed()); } } -} \ No newline at end of file +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index b8536142dd2..b66ddbabdb0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -17,6 +17,7 @@ */ package org.apache.bookkeeper.client; +import static com.google.common.base.Preconditions.checkState; import static org.apache.bookkeeper.client.api.BKException.Code.NoBookieAvailableException; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -36,6 +37,7 @@ import java.security.GeneralSecurityException; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; import java.util.List; @@ -44,7 +46,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -95,6 +97,9 @@ public abstract class MockBookKeeperTestCase { protected ConcurrentSkipListSet fencedLedgers; protected ConcurrentMap>> mockLedgerData; + private Map> slowBookieResponses; + private Set suspendedBookiesForWrites; + List failedBookies; Set availableBookies; private int lastIndexForBK; @@ -129,6 +134,8 @@ public MockEntry(byte[] payload, long lastAddConfirmed) { @Before public void setup() throws Exception { + slowBookieResponses = new ConcurrentHashMap<>(); + suspendedBookiesForWrites = Collections.synchronizedSet(new HashSet<>()); mockLedgerMetadataRegistry = new ConcurrentHashMap<>(); mockLedgerData = new ConcurrentHashMap<>(); mockNextLedgerId = new AtomicLong(1); @@ -174,6 +181,7 @@ public void setup() throws Exception { setupBookieWatcherForEnsembleChange(); setupBookieClientReadEntry(); setupBookieClientAddEntry(); + setupBookieClientForceLedger(); } protected void mockBookKeeperGetConf(ClientConfiguration conf) { @@ -235,6 +243,25 @@ protected void killBookie(BookieSocketAddress killedBookieSocketAddress) { availableBookies.remove(killedBookieSocketAddress); } + protected void startKilledBookie(BookieSocketAddress killedBookieSocketAddress) { + checkState(failedBookies.contains(killedBookieSocketAddress)); + checkState(!availableBookies.contains(killedBookieSocketAddress)); + failedBookies.remove(killedBookieSocketAddress); + availableBookies.add(killedBookieSocketAddress); + } + + protected void suspendBookieWriteAcks(BookieSocketAddress address) { + suspendedBookiesForWrites.add(address); + } + + protected void resumeBookieWriteAcks(BookieSocketAddress address) { + suspendedBookiesForWrites.remove(address); + List pendingResponses = slowBookieResponses.remove(address); + if (pendingResponses != null) { + pendingResponses.forEach(Runnable::run); + } + } + protected BookieSocketAddress startNewBookie() { BookieSocketAddress address = generateBookieSocketAddress(lastIndexForBK++); availableBookies.add(address); @@ -287,13 +314,6 @@ public BookieSocketAddress answer(InvocationOnMock invocation) throws Throwable } }); } - private void submit(Runnable operation) { - try { - scheduler.submit(operation); - } catch (RejectedExecutionException rejected) { - operation.run(); - } - } protected void registerMockEntryForRead(long ledgerId, long entryId, BookieSocketAddress bookieSocketAddress, byte[] entryData, long lastAddConfirmed) { @@ -491,31 +511,44 @@ protected void setupBookieClientAddEntry() { boolean isRecoveryAdd = ((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD; - executor.executeOrdered(ledgerId, () -> { - byte[] entry; - try { - entry = extractEntryPayload(ledgerId, entryId, toSend); - } catch (BKDigestMatchException e) { - callback.writeComplete(Code.DigestMatchException, ledgerId, entryId, bookieSocketAddress, ctx); - return; - } - boolean fenced = fencedLedgers.contains(ledgerId); - if (fenced && !isRecoveryAdd) { - callback.writeComplete(BKException.Code.LedgerFencedException, - ledgerId, entryId, bookieSocketAddress, ctx); - } else { - if (failedBookies.contains(bookieSocketAddress)) { - callback.writeComplete(NoBookieAvailableException, ledgerId, entryId, bookieSocketAddress, ctx); + Runnable activity = () -> { + executor.executeOrdered(ledgerId, () -> { + byte[] entry; + try { + entry = extractEntryPayload(ledgerId, entryId, toSend); + } catch (BKDigestMatchException e) { + callback.writeComplete(Code.DigestMatchException, + ledgerId, entryId, bookieSocketAddress, ctx); return; } - if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) { - registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, bookieSocketAddress, - new byte[0], BookieProtocol.INVALID_ENTRY_ID); + boolean fenced = fencedLedgers.contains(ledgerId); + if (fenced && !isRecoveryAdd) { + callback.writeComplete(BKException.Code.LedgerFencedException, + ledgerId, entryId, bookieSocketAddress, ctx); + } else { + if (failedBookies.contains(bookieSocketAddress)) { + callback.writeComplete(NoBookieAvailableException, + ledgerId, entryId, bookieSocketAddress, ctx); + return; + } + if (getMockLedgerContentsInBookie(ledgerId, bookieSocketAddress).isEmpty()) { + registerMockEntryForRead(ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, + bookieSocketAddress, new byte[0], BookieProtocol.INVALID_ENTRY_ID); + } + registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId); + callback.writeComplete(BKException.Code.OK, + ledgerId, entryId, bookieSocketAddress, ctx); } - registerMockEntryForRead(ledgerId, entryId, bookieSocketAddress, entry, ledgerId); - callback.writeComplete(BKException.Code.OK, ledgerId, entryId, bookieSocketAddress, ctx); - } - }); + }); + }; + if (suspendedBookiesForWrites.contains(bookieSocketAddress)) { + List suspendedCallbacks = + slowBookieResponses.computeIfAbsent(bookieSocketAddress, (k)->new CopyOnWriteArrayList<>()); + suspendedCallbacks.add(activity); + } else { + activity.run(); + } + return null; }); @@ -526,4 +559,31 @@ protected void setupBookieClientAddEntry() { any(), anyInt(), anyBoolean(), any(EnumSet.class)); } + @SuppressWarnings("unchecked") + protected void setupBookieClientForceLedger() { + final Stubber stub = doAnswer(invokation -> { + Object[] args = invokation.getArguments(); + BookieSocketAddress bookieSocketAddress = (BookieSocketAddress) args[0]; + long ledgerId = (Long) args[1]; + BookkeeperInternalCallbacks.ForceLedgerCallback callback = + (BookkeeperInternalCallbacks.ForceLedgerCallback) args[2]; + Object ctx = args[3]; + + executor.executeOrdered(ledgerId, () -> { + if (failedBookies.contains(bookieSocketAddress)) { + callback.forceLedgerComplete(NoBookieAvailableException, ledgerId, bookieSocketAddress, ctx); + return; + } + callback.forceLedgerComplete(BKException.Code.OK, ledgerId, bookieSocketAddress, ctx); + + }); + return null; + }); + + stub.when(bookieClient).forceLedger(any(BookieSocketAddress.class), + anyLong(), + any(BookkeeperInternalCallbacks.ForceLedgerCallback.class), + any()); + } + }