From 9df8dac295c875ee3395d78a2bab3cda0c3c9ddd Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 29 Mar 2018 16:04:37 -0700 Subject: [PATCH] Avoid acquiring closeLock.readLock() on every add/read operation In the `BookieClient`, we are always acquiring a readlock when grabbing a connection to use for sending a write/read request. The lock is the `closeLock` and it's only acquired in "write" mode when the `BookKeeper` instance is closed. The problem with the read-lock is that it introduces contention between the threads that are acquiring it (even if all of them in read mode). Multiple threads can be be in read mode in the critical section, though they have contention when they're entering/exiting the section. Additionally, the Java implementation of read/write lock is creating and destroying a lot of objects when that contention happens. My understanding of the code is that we don't need to acquire the read lock in that point. The reason is that, we are already acquiring the lock in the `lookupClient()` method, although only if the pool is null. Additionally, when `Bookkeeper.close()` is invoked all PCBC will be set to closed as well, so it will not be possibile to create a new connection. All the line changes in the patch are just removing the readLock acquire and try/finally, and reducing the indentation level. Author: Matteo Merli Reviewers: Ivan Kelly , Enrico Olivelli , Jia Zhai , Sijie Guo This closes #1292 from merlimat/bookie-client-rw-lock and squashes the following commits: 2104a3aa7 [Matteo Merli] Converted anonymous classes into lambdas cabad14e5 [Matteo Merli] Avoid acquiring closeLock.readLock() on every add/read operation --- .../apache/bookkeeper/proto/BookieClient.java | 265 +++++++----------- 1 file changed, 104 insertions(+), 161 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 0bd9fe8e1fb..285a06aa745 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import com.google.common.collect.Lists; import com.google.protobuf.ExtensionRegistry; @@ -48,6 +49,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -62,7 +64,6 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.apache.bookkeeper.util.SafeRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,12 +114,9 @@ public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, this.scheduler = scheduler; if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) { - SafeRunnable monitor = new SafeRunnable() { - @Override - public void safeRun() { - monitorPendingOperations(); - } - }; + SafeRunnable monitor = safeRun(() -> { + monitorPendingOperations(); + }); this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor, conf.getTimeoutMonitorIntervalSec(), conf.getTimeoutMonitorIntervalSec(), @@ -189,40 +187,29 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) { public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, addr, ctx); - return; - } - - toSend.retain(); - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - cb.writeLacComplete(rc, ledgerId, addr, ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); - } - } else { - pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); - } + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, addr, ctx); + return; + } - toSend.release(); + toSend.retain(); + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submitOrdered(ledgerId, safeRun(() -> { + cb.writeLacComplete(rc, ledgerId, addr, ctx); + })); + } catch (RejectedExecutionException re) { + cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); - } + } else { + pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); + } + + toSend.release(); + }, ledgerId); } private void completeAdd(final int rc, @@ -255,26 +242,21 @@ public void addEntry(final BookieSocketAddress addr, final WriteCallback cb, final Object ctx, final int options) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, entryId, addr, cb, ctx); - return; - } + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, entryId, addr, cb, ctx); + return; + } - // Retain the buffer, since the connection could be obtained after - // the PendingApp might have already failed - toSend.retain(); + // Retain the buffer, since the connection could be obtained after + // the PendingApp might have already failed + toSend.retain(); - client.obtain(ChannelReadyForAddEntryCallback.create( - this, toSend, ledgerId, entryId, addr, - ctx, cb, options, masterKey), - ledgerId); - } finally { - closeLock.readLock().unlock(); - } + client.obtain(ChannelReadyForAddEntryCallback.create( + this, toSend, ledgerId, entryId, addr, + ctx, cb, options, masterKey), + ledgerId); } private void completeRead(final int rc, @@ -371,37 +353,26 @@ public void recycle() { public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, - ctx); - return; - } - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - cb.readLacComplete(rc, ledgerId, null, null, ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.readLacComplete(getRc(BKException.Code.InterruptedException), - ledgerId, null, null, ctx); - } - return; - } - pcbc.readLac(ledgerId, cb, ctx); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, + ctx); + return; } + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submitOrdered(ledgerId, safeRun(() -> { + cb.readLacComplete(rc, ledgerId, null, null, ctx); + })); + } catch (RejectedExecutionException re) { + cb.readLacComplete(getRc(BKException.Code.InterruptedException), + ledgerId, null, null, ctx); + } + } else { + pcbc.readLac(ledgerId, cb, ctx); + } + }, ledgerId); } public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId, @@ -411,28 +382,20 @@ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId, public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId, final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, entryId, null, ctx); - return; - } - - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - completeRead(rc, ledgerId, entryId, null, cb, ctx); - return; - } - pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, entryId, null, ctx); + return; } + + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + completeRead(rc, ledgerId, entryId, null, cb, ctx); + } else { + pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey); + } + }, ledgerId); } @@ -444,65 +407,45 @@ public void readEntryWaitForLACUpdate(final BookieSocketAddress addr, final boolean piggyBackEntry, final ReadEntryCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - completeRead(BKException.Code.BookieHandleNotAvailableException, - ledgerId, entryId, null, cb, ctx); - return; - } - - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - - if (rc != BKException.Code.OK) { - completeRead(rc, ledgerId, entryId, null, cb, ctx); - return; - } - pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, - ctx); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + completeRead(BKException.Code.BookieHandleNotAvailableException, + ledgerId, entryId, null, cb, ctx); + return; } + + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + completeRead(rc, ledgerId, entryId, null, cb, ctx); + } else { + pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, + ctx); + } + }, ledgerId); } public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), - ctx); - return; - } - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submit(new SafeRunnable() { - @Override - public void safeRun() { - cb.getBookieInfoComplete(rc, new BookieInfo(), ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException), - new BookieInfo(), ctx); - } - return; - } - pcbc.getBookieInfo(requested, cb, ctx); - } - }, requested); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), + ctx); + return; } + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submit(safeRun(() -> { + cb.getBookieInfoComplete(rc, new BookieInfo(), ctx); + })); + } catch (RejectedExecutionException re) { + cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException), + new BookieInfo(), ctx); + } + } else { + pcbc.getBookieInfo(requested, cb, ctx); + } + }, requested); } private void monitorPendingOperations() {