Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid acquiring closeLock.readLock() on every add/read operation #1292

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.bookkeeper.proto;

import static com.google.common.base.Charsets.UTF_8;
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;

import com.google.common.collect.Lists;
import com.google.protobuf.ExtensionRegistry;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
Expand All @@ -62,7 +64,6 @@
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -113,12 +114,9 @@ public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup,

this.scheduler = scheduler;
if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) {
SafeRunnable monitor = new SafeRunnable() {
@Override
public void safeRun() {
monitorPendingOperations();
}
};
SafeRunnable monitor = safeRun(() -> {
monitorPendingOperations();
});
this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor,
conf.getTimeoutMonitorIntervalSec(),
conf.getTimeoutMonitorIntervalSec(),
Expand Down Expand Up @@ -189,40 +187,29 @@ public PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) {

public void writeLac(final BookieSocketAddress addr, final long ledgerId, final byte[] masterKey,
final long lac, final ByteBufList toSend, final WriteLacCallback cb, final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, addr, ctx);
return;
}

toSend.retain();
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.writeLacComplete(rc, ledgerId, addr, ctx);
}
});
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.writeLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, addr, ctx);
return;
}

toSend.release();
toSend.retain();
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, safeRun(() -> {
cb.writeLacComplete(rc, ledgerId, addr, ctx);
}));
} catch (RejectedExecutionException re) {
cb.writeLacComplete(getRc(BKException.Code.InterruptedException), ledgerId, addr, ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
}
} else {
pcbc.writeLac(ledgerId, masterKey, lac, toSend, cb, ctx);
}

toSend.release();
}, ledgerId);
}

private void completeAdd(final int rc,
Expand Down Expand Up @@ -255,26 +242,21 @@ public void addEntry(final BookieSocketAddress addr,
final WriteCallback cb,
final Object ctx,
final int options) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, addr, cb, ctx);
return;
}
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeAdd(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, addr, cb, ctx);
return;
}

// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();
// Retain the buffer, since the connection could be obtained after
// the PendingApp might have already failed
toSend.retain();

client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
ctx, cb, options, masterKey),
ledgerId);
} finally {
closeLock.readLock().unlock();
}
client.obtain(ChannelReadyForAddEntryCallback.create(
this, toSend, ledgerId, entryId, addr,
ctx, cb, options, masterKey),
ledgerId);
}

private void completeRead(final int rc,
Expand Down Expand Up @@ -371,37 +353,26 @@ public void recycle() {

public void readLac(final BookieSocketAddress addr, final long ledgerId, final ReadLacCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
cb.readLacComplete(rc, ledgerId, null, null, ctx);
}
});
} catch (RejectedExecutionException re) {
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
ledgerId, null, null, ctx);
}
return;
}
pcbc.readLac(ledgerId, cb, ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readLacComplete(getRc(BKException.Code.BookieHandleNotAvailableException), ledgerId, null, null,
ctx);
return;
}
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submitOrdered(ledgerId, safeRun(() -> {
cb.readLacComplete(rc, ledgerId, null, null, ctx);
}));
} catch (RejectedExecutionException re) {
cb.readLacComplete(getRc(BKException.Code.InterruptedException),
ledgerId, null, null, ctx);
}
} else {
pcbc.readLac(ledgerId, cb, ctx);
}
}, ledgerId);
}

public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,
Expand All @@ -411,28 +382,20 @@ public void readEntry(BookieSocketAddress addr, long ledgerId, long entryId,

public void readEntry(final BookieSocketAddress addr, final long ledgerId, final long entryId,
final ReadEntryCallback cb, final Object ctx, int flags, byte[] masterKey) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, ctx);
return;
}

client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.readEntryComplete(getRc(BKException.Code.BookieHandleNotAvailableException),
ledgerId, entryId, null, ctx);
return;
}

client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
} else {
pcbc.readEntry(ledgerId, entryId, cb, ctx, flags, masterKey);
}
}, ledgerId);
}


Expand All @@ -444,65 +407,45 @@ public void readEntryWaitForLACUpdate(final BookieSocketAddress addr,
final boolean piggyBackEntry,
final ReadEntryCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(BKException.Code.BookieHandleNotAvailableException,
ledgerId, entryId, null, cb, ctx);
return;
}

client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {

if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
return;
}
pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
ctx);
}
}, ledgerId);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
completeRead(BKException.Code.BookieHandleNotAvailableException,
ledgerId, entryId, null, cb, ctx);
return;
}

client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
completeRead(rc, ledgerId, entryId, null, cb, ctx);
} else {
pcbc.readEntryWaitForLACUpdate(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb,
ctx);
}
}, ledgerId);
}

public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb,
final Object ctx) {
closeLock.readLock().lock();
try {
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
ctx);
return;
}
client.obtain(new GenericCallback<PerChannelBookieClient>() {
@Override
public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
if (rc != BKException.Code.OK) {
try {
executor.submit(new SafeRunnable() {
@Override
public void safeRun() {
cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
}
});
} catch (RejectedExecutionException re) {
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
new BookieInfo(), ctx);
}
return;
}
pcbc.getBookieInfo(requested, cb, ctx);
}
}, requested);
} finally {
closeLock.readLock().unlock();
final PerChannelBookieClientPool client = lookupClient(addr);
if (client == null) {
cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(),
ctx);
return;
}
client.obtain((rc, pcbc) -> {
if (rc != BKException.Code.OK) {
try {
executor.submit(safeRun(() -> {
cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
}));
} catch (RejectedExecutionException re) {
cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
new BookieInfo(), ctx);
}
} else {
pcbc.getBookieInfo(requested, cb, ctx);
}
}, requested);
}

private void monitorPendingOperations() {
Expand Down