diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 0bd9fe8e1fb..285a06aa745 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -21,6 +21,7 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Charsets.UTF_8; +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; import com.google.common.collect.Lists; import com.google.protobuf.ExtensionRegistry; @@ -48,6 +49,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -62,7 +64,6 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.apache.bookkeeper.util.SafeRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,12 +114,9 @@ public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, this.scheduler = scheduler; if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) { - SafeRunnable monitor = new SafeRunnable() { - @Override - public void safeRun() { - monitorPendingOperations(); - } - }; + SafeRunnable monitor = safeRun(() -> { + monitorPendingOperations(); + }); this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor, conf.getTimeoutMonitorIntervalSec(), conf.getTimeoutMonitorIntervalSec(), @@ -189,40 +187,29 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) { public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey, final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, addr, ctx); - return; - } - - toSend.retain(); - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - cb.writeLacComplete(rc, ledgerId, addr, ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); - } - } else { - pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); - } + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, addr, ctx); + return; + } - toSend.release(); + toSend.retain(); + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submitOrdered(ledgerId, safeRun(() -> { + cb.writeLacComplete(rc, ledgerId, addr, ctx); + })); + } catch (RejectedExecutionException re) { + cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx); } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); - } + } else { + pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx); + } + + toSend.release(); + }, ledgerId); } private void completeAdd(final int rc, @@ -255,26 +242,21 @@ public void addEntry(final BookieSocketAddress addr, final WriteCallback cb, final Object ctx, final int options) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, entryId, addr, cb, ctx); - return; - } + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, entryId, addr, cb, ctx); + return; + } - // Retain the buffer, since the connection could be obtained after - // the PendingApp might have already failed - toSend.retain(); + // Retain the buffer, since the connection could be obtained after + // the PendingApp might have already failed + toSend.retain(); - client.obtain(ChannelReadyForAddEntryCallback.create( - this, toSend, ledgerId, entryId, addr, - ctx, cb, options, masterKey), - ledgerId); - } finally { - closeLock.readLock().unlock(); - } + client.obtain(ChannelReadyForAddEntryCallback.create( + this, toSend, ledgerId, entryId, addr, + ctx, cb, options, masterKey), + ledgerId); } private void completeRead(final int rc, @@ -371,37 +353,26 @@ public void recycle() { public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, - ctx); - return; - } - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submitOrdered(ledgerId, new SafeRunnable() { - @Override - public void safeRun() { - cb.readLacComplete(rc, ledgerId, null, null, ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.readLacComplete(getRc(BKException.Code.InterruptedException), - ledgerId, null, null, ctx); - } - return; - } - pcbc.readLac(ledgerId, cb, ctx); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null, + ctx); + return; } + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submitOrdered(ledgerId, safeRun(() -> { + cb.readLacComplete(rc, ledgerId, null, null, ctx); + })); + } catch (RejectedExecutionException re) { + cb.readLacComplete(getRc(BKException.Code.InterruptedException), + ledgerId, null, null, ctx); + } + } else { + pcbc.readLac(ledgerId, cb, ctx); + } + }, ledgerId); } public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId, @@ -411,28 +382,20 @@ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId, public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId, final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException), - ledgerId, entryId, null, ctx); - return; - } - - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - completeRead(rc, ledgerId, entryId, null, cb, ctx); - return; - } - pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException), + ledgerId, entryId, null, ctx); + return; } + + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + completeRead(rc, ledgerId, entryId, null, cb, ctx); + } else { + pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey); + } + }, ledgerId); } @@ -444,65 +407,45 @@ public void readEntryWaitForLACUpdate(final BookieSocketAddress addr, final boolean piggyBackEntry, final ReadEntryCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - completeRead(BKException.Code.BookieHandleNotAvailableException, - ledgerId, entryId, null, cb, ctx); - return; - } - - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - - if (rc != BKException.Code.OK) { - completeRead(rc, ledgerId, entryId, null, cb, ctx); - return; - } - pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, - ctx); - } - }, ledgerId); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + completeRead(BKException.Code.BookieHandleNotAvailableException, + ledgerId, entryId, null, cb, ctx); + return; } + + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + completeRead(rc, ledgerId, entryId, null, cb, ctx); + } else { + pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, + ctx); + } + }, ledgerId); } public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) { - closeLock.readLock().lock(); - try { - final PerChannelBookieClientPool client = lookupClient(addr); - if (client == null) { - cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), - ctx); - return; - } - client.obtain(new GenericCallback() { - @Override - public void operationComplete(final int rc, PerChannelBookieClient pcbc) { - if (rc != BKException.Code.OK) { - try { - executor.submit(new SafeRunnable() { - @Override - public void safeRun() { - cb.getBookieInfoComplete(rc, new BookieInfo(), ctx); - } - }); - } catch (RejectedExecutionException re) { - cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException), - new BookieInfo(), ctx); - } - return; - } - pcbc.getBookieInfo(requested, cb, ctx); - } - }, requested); - } finally { - closeLock.readLock().unlock(); + final PerChannelBookieClientPool client = lookupClient(addr); + if (client == null) { + cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), + ctx); + return; } + client.obtain((rc, pcbc) -> { + if (rc != BKException.Code.OK) { + try { + executor.submit(safeRun(() -> { + cb.getBookieInfoComplete(rc, new BookieInfo(), ctx); + })); + } catch (RejectedExecutionException re) { + cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException), + new BookieInfo(), ctx); + } + } else { + pcbc.getBookieInfo(requested, cb, ctx); + } + }, requested); } private void monitorPendingOperations() {