Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-62] Bookkeeper client introduce batch read request api. #4188

Merged
merged 5 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bookkeeper-proto/src/main/proto/BookkeeperProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum OperationType {
START_TLS = 9;
FORCE_LEDGER = 10;
GET_LIST_OF_ENTRIES_OF_LEDGER = 11;
BATCH_READ_ENTRY = 12;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
Expand Down Expand Up @@ -180,6 +181,47 @@ void readEntry(BookieId address, long ledgerId, long entryId,
ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey,
boolean allowFastFail);

/**
* Batch read entries with a null masterkey, disallowing failfast.
* @see #batchReadEntries(BookieId,long,long,int,long,BatchedReadEntryCallback,Object,int,byte[],boolean)
*/
default void batchReadEntries(BookieId address, long ledgerId, long startEntryId,
int maxCount, long maxSize, BatchedReadEntryCallback cb, Object ctx,
int flags) {
batchReadEntries(address, ledgerId, startEntryId, maxCount, maxSize, cb, ctx, flags, null);
}

/**
* Batch read entries, disallowing failfast.
* @see #batchReadEntries(BookieId,long,long,int,long,BatchedReadEntryCallback,Object,int,byte[],boolean)
*/
default void batchReadEntries(BookieId address, long ledgerId, long startEntryId,
int maxCount, long maxSize, BatchedReadEntryCallback cb, Object ctx,
int flags, byte[] masterKey) {
batchReadEntries(address, ledgerId, startEntryId, maxCount, maxSize, cb, ctx, flags, masterKey, false);
}

/**
* Batch read entries from bookie at address {@code address}.
*
* @param address address of the bookie to read from
* @param ledgerId id of the ledger the entry belongs to
* @param startEntryId id of the entry started
* @param maxCount the total entries count in this batch
* @param maxSize the total entries size in this batch
* @param cb the callback notified when the request completes
* @param ctx a context object passed to the callback on completion
* @param flags a bit mask of flags from BookieProtocol.FLAG_*
* {@link org.apache.bookkeeper.proto.BookieProtocol}
* @param masterKey the master key of the ledger being read from. This is only required
* if the FLAG_DO_FENCING is specified.
* @param allowFastFail fail the read immediately if the channel is non-writable
* {@link #isWritable(BookieId,long)}
*/
void batchReadEntries(BookieId address, long ledgerId, long startEntryId,
int maxCount, long maxSize, BatchedReadEntryCallback cb, Object ctx,
int flags, byte[] masterKey, boolean allowFastFail);

/**
* Send a long poll request to bookie, waiting for the last add confirmed
* to be updated. The client can also request that the full entry is returned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ForceLedgerCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.FutureGetListOfEntriesOfLedger;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
Expand Down Expand Up @@ -353,6 +354,20 @@ private void completeRead(final int rc,
}
}

private void completeBatchRead(final int rc,
final long ledgerId,
final long startEntryId,
final ByteBufList bufList,
final BatchedReadEntryCallback cb,
final Object ctx) {
try {
executor.executeOrdered(ledgerId, () -> cb.readEntriesComplete(rc, ledgerId, startEntryId, bufList, ctx));
} catch (RejectedExecutionException ree) {
cb.readEntriesComplete(getRc(BKException.Code.InterruptedException),
ledgerId, startEntryId, bufList, ctx);
}
}

private static class ChannelReadyForAddEntryCallback
implements GenericCallback<PerChannelBookieClient> {
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
Expand Down Expand Up @@ -489,6 +504,26 @@ public void readEntry(final BookieId addr, final long ledgerId, final long entry
}, ledgerId);
}

@Override
public void batchReadEntries(final BookieId address, final long ledgerId, final long startEntryId,
final int maxCount, final long maxSize, final BatchedReadEntryCallback cb, final Object ctx,
final int flags, final byte[] masterKey, final boolean allowFastFail) {
final PerChannelBookieClientPool client = lookupClient(address);
if (client == null) {
cb.readEntriesComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, startEntryId, null, ctx);
return;
}

client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
completeBatchRead(rc, ledgerId, startEntryId, null, cb, ctx);
} else {
pcbc.batchReadEntries(ledgerId, startEntryId, maxCount, maxSize, cb, ctx, flags, masterKey,
allowFastFail);
}
}, ledgerId);
}

@Override
public void readEntryWaitForLACUpdate(final BookieId addr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public Object decode(ByteBuf buffer)
}
}
return new BookieProtocol.BatchedReadResponse(version, rc, ledgerId, entryId, requestId, data == null
? null : data.retain());
? ByteBufList.get() : data.retain());
case BookieProtocol.AUTH:
ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,10 @@ class BatchedReadResponse extends Response implements ReferenceCounted {
final long requestId;
final ByteBufList data;

BatchedReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, long requestId) {
this(protocolVersion, errorCode, ledgerId, entryId, requestId, ByteBufList.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reminder: No matter what the error code is, don't forget to release the ByteBufList in the client side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, the error BatchedReadResponse will also be released.

}

BatchedReadResponse(byte protocolVersion, int errorCode, long ledgerId, long entryId, long requestId,
ByteBufList data) {
init(protocolVersion, BATCH_READ_ENTRY, errorCode, ledgerId, entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,10 @@ public void processRequest(Object msg, BookieRequestHandler requestHandler) {
checkArgument(r instanceof BookieProtocol.ReadRequest);
processReadRequest((BookieProtocol.ReadRequest) r, requestHandler);
break;
case BookieProtocol.BATCH_READ_ENTRY:
checkArgument(r instanceof BookieProtocol.BatchedReadRequest);
processReadRequest((BookieProtocol.BatchedReadRequest) r, requestHandler);
break;
case BookieProtocol.AUTH:
LOG.info("Ignoring auth operation from client {}",
requestHandler.ctx().channel().remoteAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
Expand Down Expand Up @@ -221,6 +222,16 @@ public interface ReadEntryCallback {
void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx);
}

/**
* Declaration of a callback implementation for calls from BookieClient objects.
* Such calls are for replies of batched read operations (operations to read multi entries
* from a ledger).
*
*/
public interface BatchedReadEntryCallback {
void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx);
}

/**
* Listener on entries responded.
*/
Expand Down
Loading
Loading