Skip to content

Commit

Permalink
Avoid acquiring closeLock.readLock() on every add/read operation
Browse files Browse the repository at this point in the history
In the `BookieClient`, we are always acquiring a readlock when grabbing a connection to use for sending a write/read request.

The lock is the `closeLock` and it's only acquired in "write" mode when the `BookKeeper` instance is closed.

The problem with the read-lock is that it introduces contention between the threads that are acquiring it (even if all of them in read mode). Multiple threads can be be in read mode in the critical section, though they have contention when they're entering/exiting the section.

Additionally, the Java implementation of read/write lock is creating and destroying a lot of objects when that contention happens.

My understanding of the code is that we don't need to acquire the read lock in that point. The reason is that, we are already acquiring the lock in the `lookupClient()` method, although only if the pool is null. Additionally, when `Bookkeeper.close()` is invoked all PCBC will be set to closed as well, so it will not be possibile to create a new connection.

All the line changes in the patch are just removing the readLock acquire and try/finally, and reducing the indentation level.

Author: Matteo Merli <mmerli@apache.org>

Reviewers: Ivan Kelly <ivank@apache.org>, Enrico Olivelli <eolivelli@gmail.com>, Jia Zhai <None>, Sijie Guo <sijie@apache.org>

This closes #1292 from merlimat/bookie-client-rw-lock and squashes the following commits:

2104a3a [Matteo Merli] Converted anonymous classes into lambdas
cabad14 [Matteo Merli] Avoid acquiring closeLock.readLock() on every add/read operation
  • Loading branch information
merlimat committed Mar 29, 2018
1 parent d8c9588 commit 9df8dac
Showing 1 changed file with 104 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.bookkeeper.proto;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;

import com.google.common.collect.Lists;
import com.google.protobuf.ExtensionRegistry;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
Expand All @@ -62,7 +64,6 @@
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -113,12 +114,9 @@ public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,

this.scheduler = scheduler;
if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
SafeRunnable monitor = new SafeRunnable() {
@Override
public void safeRun() {
monitorPendingOperations();
}
};
SafeRunnable monitor = safeRun(() -> {
monitorPendingOperations();
});
this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
conf.getTimeoutMonitorIntervalSec(),
conf.getTimeoutMonitorIntervalSec(),
Expand Down Expand Up @@ -189,40 +187,29 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {

public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, addr, ctx);
return;
}

toSend.retain();
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.writeLacComplete(rc, ledgerId, addr, ctx);
}
});
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, addr, ctx);
return;
}

toSend.release();
toSend.retain();
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, safeRun(() -> {
cb.writeLacComplete(rc, ledgerId, addr, ctx);
}));
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}

toSend.release();
}, ledgerId);
}

private void completeAdd(final int rc,
Expand Down Expand Up @@ -255,26 +242,21 @@ public void addEntry(final BookieSocketAddress addr,
final WriteCallback cb,
final Object ctx,
final int options) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, addr, cb, ctx);
return;
}
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, addr, cb, ctx);
return;
}

// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();
// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();

client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
ctx, cb, options, masterKey),
ledgerId);
} finally {
closeLock.readLock().unlock();
}
client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
ctx, cb, options, masterKey),
ledgerId);
}

private void completeRead(final int rc,
Expand Down Expand Up @@ -371,37 +353,26 @@ public void recycle() {

public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.readLacComplete(rc, ledgerId, null, null, ctx);
}
});
} catch (RejectedExecutionException re) {
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
ledgerId, null, null, ctx);
}
return;
}
pcbc.readLac(ledgerId, cb, ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
ctx);
return;
}
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, safeRun(() -> {
cb.readLacComplete(rc, ledgerId, null, null, ctx);
}));
} catch (RejectedExecutionException re) {
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
ledgerId, null, null, ctx);
}
} else {
pcbc.readLac(ledgerId, cb, ctx);
}
}, ledgerId);
}

public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
Expand All @@ -411,28 +382,20 @@ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,

public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, ctx);
return;
}

client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, ctx);
return;
}

client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
} else {
pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
}
}, ledgerId);
}


Expand All @@ -444,65 +407,45 @@ public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
final boolean piggyBackEntry,
final ReadEntryCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(BKException.Code.BookieHandleNotAvailableException,
ledgerId, entryId, null, cb, ctx);
return;
}

client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {

if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(BKException.Code.BookieHandleNotAvailableException,
ledgerId, entryId, null, cb, ctx);
return;
}

client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
} else {
pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
ctx);
}
}, ledgerId);
}

public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submit(new SafeRunnable() {
@Override
public void safeRun() {
cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
}
});
} catch (RejectedExecutionException re) {
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
new BookieInfo(), ctx);
}
return;
}
pcbc.getBookieInfo(requested, cb, ctx);
}
}, requested);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
ctx);
return;
}
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submit(safeRun(() -> {
cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
}));
} catch (RejectedExecutionException re) {
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
new BookieInfo(), ctx);
}
} else {
pcbc.getBookieInfo(requested, cb, ctx);
}
}, requested);
}

private void monitorPendingOperations() {
Expand Down

0 comments on commit 9df8dac

Please sign in to comment.