Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed simultaneus reads on same ledger/entry with v2 protocol #720

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,6 @@

import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.Certificate;
Expand Down Expand Up @@ -121,6 +81,48 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.ExtensionRegistry;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
* This class manages all details of connection to a particular bookie. It also
* has reconnect logic if a connection to a bookie fails.
Expand Down Expand Up @@ -155,6 +157,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
private final ConcurrentHashMap<CompletionKey, CompletionValue> completionObjects =
new ConcurrentHashMap<CompletionKey, CompletionValue>();

// Map that hold duplicated read requests. The idea is to only use this map (synchronized) when there is a duplicate
// read request for the same ledgerId/entryId
private final ListMultimap<CompletionKey, CompletionValue> completionObjectsV2Conflicts =
LinkedListMultimap.create();

private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
private final OpStatsLogger readTimeoutOpLogger;
Expand Down Expand Up @@ -604,14 +611,13 @@ public void readEntryAndFenceLedger(final long ledgerId, byte[] masterKey,
.build();
}

CompletionValue completion = new ReadCompletion(completionKey,
cb, ctx,
ledgerId, entryId);
if (completionObjects.putIfAbsent(
completionKey, completion) != null) {
// We cannot have more than 1 pending read on the same ledger/entry in the v2 protocol
completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
return;
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
}

writeAndFlush(channel, completionKey, request);
Expand Down Expand Up @@ -726,15 +732,13 @@ private void readEntryInternal(final long ledgerId,
.build();
}

CompletionValue completion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(
completionKey, completion);
ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
CompletionValue existingValue = completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
// There's a pending read request on same ledger/entry. This is not supported in V2 protocol
LOG.warn("Failing concurrent request to read at ledger: {} entry: {}", ledgerId, entryId);
completion.errorOut(BKException.Code.UnexpectedConditionException);
return;
// There's a pending read request on same ledger/entry. Use the multimap to track all of them
synchronized (completionObjectsV2Conflicts) {
completionObjectsV2Conflicts.put(completionKey, readCompletion);
}
}

writeAndFlush(channel, completionKey, request);
Expand Down Expand Up @@ -868,6 +872,16 @@ void errorOut(final CompletionKey key, final int rc) {
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut(rc);
} else {
// If there's no completion object here, try in the multimap
synchronized (completionObjectsV2Conflicts) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completion = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completion);

completion.errorOut(rc);
}
}
}
}

Expand All @@ -879,13 +893,17 @@ void errorOut(final CompletionKey key, final int rc) {
*/

void errorOutOutstandingEntries(int rc) {

// DO NOT rewrite these using Map.Entry iterations. We want to iterate
// on keys and see if we are successfully able to remove the key from
// the map. Because the add and the read methods also do the same thing
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
while (completionObjectsV2Conflicts.get(key).size() > 0) {
errorOut(key, rc);
}
}
for (CompletionKey key : completionObjects.keySet()) {
errorOut(key, rc);
}
Expand Down Expand Up @@ -990,8 +1008,17 @@ private void readV2Response(final BookieProtocol.Response response) {
final StatusCode status = getStatusCodeFromErrorCode(response.errorCode);

final CompletionKey key = acquireV2Key(ledgerId, entryId, operationType);
final CompletionValue completionValue = completionObjects.remove(key);
CompletionValue completionValue = completionObjects.remove(key);
key.release();
if (completionValue == null) {
// If there's no completion object here, try in the multimap
synchronized (this) {
if (completionObjectsV2Conflicts.containsKey(key)) {
completionValue = completionObjectsV2Conflicts.get(key).get(0);
completionObjectsV2Conflicts.remove(key, completionValue);
}
}
}

if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been present.
Expand All @@ -1001,14 +1028,11 @@ private void readV2Response(final BookieProtocol.Response response) {
}
} else {
long orderingKey = completionValue.ledgerId;
final CompletionValue finalCompletionValue = completionValue;

executor.submitOrdered(orderingKey, new SafeRunnable() {
@Override
public void safeRun() {
completionValue.handleV2Response(ledgerId, entryId,
status, response);
response.recycle();
}
executor.submitOrdered(orderingKey, () -> {
finalCompletionValue.handleV2Response(ledgerId, entryId, status, response);
response.recycle();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,11 +400,11 @@ public void testReadHandleWithExplicitLAC() throws Exception {
Assert.assertTrue(
"Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual LAC of wlh: " + wlh.getLastAddConfirmed(),
(wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
// readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
// readhandle's lastaddconfirmed wont be updated until readExplicitLastConfirmed call is made
Assert.assertTrue(
"Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (numOfEntries - 2)));

long explicitlac = rlh.readExplicitLastConfirmed();
Assert.assertTrue(
"Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + " actual ExplicitLAC of rlh: " + explicitlac,
Expand All @@ -413,7 +413,7 @@ public void testReadHandleWithExplicitLAC() throws Exception {
Assert.assertTrue(
"Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual LAC of rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));

Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 * numOfEntries - 1);
int entryId = numOfEntries;
while (entries.hasMoreElements()) {
Expand Down Expand Up @@ -818,4 +818,68 @@ public void testReadEntryReleaseByteBufs() throws Exception {
}
}
}

/**
* Tests that issuing multiple reads for the same entry at the same time works as expected.
*
* @throws Exception
*/
@Test
public void testDoubleRead() throws Exception {
LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());

lh.addEntry("test".getBytes());

// Read the same entry more times asynchronously
final int N = 10;
final CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
lh.asyncReadEntries(0, 0, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
if (rc == BKException.Code.OK) {
latch.countDown();
} else {
fail("Read fail");
}
}
}, null);
}

latch.await();
}

/**
* Tests that issuing multiple reads for the same entry at the same time works as expected.
*
* @throws Exception
*/
@Test
public void testDoubleReadWithV2Protocol() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf);
conf.setUseV2WireProtocol(true);
BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());

lh.addEntry("test".getBytes());

// Read the same entry more times asynchronously
final int N = 10;
final CountDownLatch latch = new CountDownLatch(N);
for (int i = 0; i < N; i++) {
lh.asyncReadEntries(0, 0, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh,
Enumeration<LedgerEntry> seq, Object ctx) {
if (rc == BKException.Code.OK) {
latch.countDown();
} else {
fail("Read fail");
}
}
}, null);
}

latch.await();
bkc.close();
}
}