Skip to content

Commit

Permalink
Fixed simultaneus reads on same ledger/entry with v2 protocol
Browse files Browse the repository at this point in the history
This change is coming from yahoo branch at yahoo/bookkeeperce7b0f7

With v2 protocol there's no way to disambiguate the read request since the completion key is made from (ledgerId, entryId).
This PR introduced an additional multimap to handle the conflicts (multiple simultaneus read requests for the same entry).

Author: Ivan Kelly <ivank@apache.org>
Author: Matteo Merli <mmerli@apache.org>

Reviewers: Jia Zhai <None>, Sijie Guo <sijie@apache.org>

This closes #720 from merlimat/concurrent-read-v2
  • Loading branch information
ivankelly authored and sijie committed Dec 4, 2017
1 parent 7087fda commit a32a7f0
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,6 @@

import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.Certificate;
Expand Down Expand Up @@ -121,6 +81,48 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
* This class manages all details of connection to a particular bookie. It also
* has reconnect logic if a connection to a bookie fails.
Expand Down Expand Up @@ -155,6 +157,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects =
new ConcurrentHashMap<CompletionKey, CompletionValue>();

// Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
LinkedListMultimap.create();

private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
private final OpStatsLogger readTimeoutOpLogger;
Expand Down Expand Up @@ -604,14 +611,13 @@ public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
.build();
}

CompletionValue completion = new ReadCompletion(completionKey,
cb, ctx,
ledgerId, entryId);
if (completionObjects.putIfAbsent(
completionKey, completion) != null) {
// We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
return;
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
}

writeAndFlush(channel, completionKey, request);
Expand Down Expand Up @@ -726,15 +732,13 @@ private void readEntryInternal(final long ledgerId,
.build();
}

CompletionValue completion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(
completionKey, completion);
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. This is not supported in V2 protocol
LOG.warn("Failing concurrent request to read at ledger: {} entry: {}", ledgerId, entryId);
completion.errorOut(BKException.Code.UnexpectedConditionException);
return;
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
}

writeAndFlush(channel, completionKey, request);
Expand Down Expand Up @@ -868,6 +872,16 @@ void errorOut(final CompletionKey key, final int rc) {
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
synchronized (completionObjectsV2Conflicts) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completion = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completion);

completion.errorOut(rc);
}
}
}
}

Expand All @@ -879,13 +893,17 @@ void errorOut(final CompletionKey key, final int rc) {
*/

void errorOutOutstandingEntries(int rc) {

// DO NOT rewrite these using Map.Entry iterations. We want to iterate
// on keys and see if we are successfully able to remove the key from
// the map. Because the add and the read methods also do the same thing
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
while (completionObjectsV2Conflicts.get(key).size() > 0) {
errorOut(key, rc);
}
}
for (CompletionKey key : completionObjects.keySet()) {
errorOut(key, rc);
}
Expand Down Expand Up @@ -990,8 +1008,17 @@ private void readV2Response(final BookieProtocol.Response response) {
final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);

final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
final CompletionValue completionValue = completionObjects.remove(key);
CompletionValue completionValue = completionObjects.remove(key);
key.release();
if (completionValue == null) {
// If there's no completion object here, try in the multimap
synchronized (this) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completionValue = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completionValue);
}
}
}

if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been present.
Expand All @@ -1001,14 +1028,11 @@ private void readV2Response(final BookieProtocol.Response response) {
}
} else {
long orderingKey = completionValue.ledgerId;
final CompletionValue finalCompletionValue = completionValue;

executor.submitOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
completionValue.handleV2Response(ledgerId, entryId,
status, response);
response.recycle();
}
executor.submitOrdered(orderingKey, () -> {
finalCompletionValue.handleV2Response(ledgerId, entryId, status, response);
response.recycle();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ public void testReadHandleWithExplicitLAC() throws Exception {
Assert.assertTrue(
"Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
(wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
// readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
// readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
Assert.assertTrue(
"Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (numOfEntries - 2)));

long explicitlac = rlh.readExplicitLastConfirmed();
Assert.assertTrue(
"Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac,
Expand All @@ -413,7 +413,7 @@ public void testReadHandleWithExplicitLAC() throws Exception {
Assert.assertTrue(
"Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));

Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
int entryId = numOfEntries;
while (entries.hasMoreElements()) {
Expand Down Expand Up @@ -818,4 +818,68 @@ public void testReadEntryReleaseByteBufs() throws Exception {
}
}
}

/**
* Tests that issuing multiple reads for the same entry at the same time works as expected.
*
* @throws Exception
*/
@Test
public void testDoubleRead() throws Exception {
LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());

lh.addEntry("test".getBytes());

// Read the same entry more times asynchronously
final int N = 10;
final CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
lh.asyncReadEntries(0, 0, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
if (rc == BKException.Code.OK) {
latch.countDown();
} else {
fail("Read fail");
}
}
}, null);
}

latch.await();
}

/**
* Tests that issuing multiple reads for the same entry at the same time works as expected.
*
* @throws Exception
*/
@Test
public void testDoubleReadWithV2Protocol() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf);
conf.setUseV2WireProtocol(true);
BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());

lh.addEntry("test".getBytes());

// Read the same entry more times asynchronously
final int N = 10;
final CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
lh.asyncReadEntries(0, 0, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
if (rc == BKException.Code.OK) {
latch.countDown();
} else {
fail("Read fail");
}
}
}, null);
}

latch.await();
bkc.close();
}
}

0 comments on commit a32a7f0

Please sign in to comment.