Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve group and flush add-responses after journal sync #3848

Merged
2 changes: 2 additions & 0 deletions bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ Apache Software License, Version 2.
- lib/org.xerial.snappy-snappy-java-1.1.7.7.jar [50]
- lib/io.reactivex.rxjava3-rxjava-3.0.1.jar [51]
- lib/org.hdrhistogram-HdrHistogram-2.1.10.jar [52]
- lib/com.carrotsearch-hppc-0.9.1.jar [53]

[1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.13.4
[2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.13.4
Expand Down Expand Up @@ -369,6 +370,7 @@ Apache Software License, Version 2.
[50] Source available at https://github.com/google/snappy/releases/tag/1.1.7.7
[51] Source available at https://github.com/ReactiveX/RxJava/tree/v3.0.1
[52] Source available at https://github.com/HdrHistogram/HdrHistogram/tree/HdrHistogram-2.1.10
[53] Source available at https://github.com/carrotsearch/hppc/tree/0.9.1

------------------------------------------------------------------------------------
lib/io.netty-netty-codec-4.1.89.Final.jar bundles some 3rd party dependencies
Expand Down
2 changes: 2 additions & 0 deletions bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ Apache Software License, Version 2.
- lib/org.conscrypt-conscrypt-openjdk-uber-2.5.1.jar [49]
- lib/org.xerial.snappy-snappy-java-1.1.7.7.jar [50]
- lib/io.reactivex.rxjava3-rxjava-3.0.1.jar [51]
- lib/com.carrotsearch-hppc-0.9.1.jar [52]

[1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.13.4
[2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.13.4
Expand Down Expand Up @@ -331,6 +332,7 @@ Apache Software License, Version 2.
[49] Source available at https://github.com/google/conscrypt/releases/tag/2.5.1
[50] Source available at https://github.com/google/snappy/releases/tag/1.1.7.7
[51] Source available at https://github.com/ReactiveX/RxJava/tree/v3.0.1
[52] Source available at https://github.com/carrotsearch/hppc/tree/0.9.1

------------------------------------------------------------------------------------
lib/io.netty-netty-codec-4.1.89.Final.jar bundles some 3rd party dependencies
Expand Down
2 changes: 2 additions & 0 deletions bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ Apache Software License, Version 2.
- lib/org.conscrypt-conscrypt-openjdk-uber-2.5.1.jar [49]
- lib/org.xerial.snappy-snappy-java-1.1.7.7.jar [50]
- lib/io.reactivex.rxjava3-rxjava-3.0.1.jar [51]
- lib/com.carrotsearch-hppc-0.9.1.jar [52]

[1] Source available at https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.13.4
[2] Source available at https://github.com/FasterXML/jackson-core/tree/jackson-core-2.13.4
Expand Down Expand Up @@ -364,6 +365,7 @@ Apache Software License, Version 2.
[49] Source available at https://github.com/google/conscrypt/releases/tag/2.5.1
[50] Source available at https://github.com/google/snappy/releases/tag/1.1.7.7
[51] Source available at https://github.com/ReactiveX/RxJava/tree/v3.0.1
[52] Source available at https://github.com/carrotsearch/hppc/tree/0.9.1

------------------------------------------------------------------------------------
lib/io.netty-netty-codec-4.1.89.Final.jar bundles some 3rd party dependencies
Expand Down
4 changes: 4 additions & 0 deletions bookkeeper-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add to license file

</dependency>
<!-- testing dependencies -->
<dependency>
<groupId>org.apache.bookkeeper</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;

/**
Expand Down Expand Up @@ -87,8 +86,6 @@ void cancelWaitForLastAddConfirmedUpdate(long ledgerId,
// TODO: Should be constructed and passed in as a parameter
LedgerStorage getLedgerStorage();

void setRequestProcessor(RequestProcessor requestProcessor);

// TODO: Move this exceptions somewhere else
/**
* Exception is thrown when no such a ledger is found in this bookie.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNS;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -1282,11 +1281,4 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
}
}
}

@Override
public void setRequestProcessor(RequestProcessor requestProcessor) {
for (Journal journal : journals) {
journal.setRequestProcessor(requestProcessor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

package org.apache.bookkeeper.bookie;

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -51,7 +53,7 @@
import org.apache.bookkeeper.common.util.MemoryLimitController;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.proto.BookieRequestHandler;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand Down Expand Up @@ -333,6 +335,10 @@ public void run() {
recycle();
}

private Object getCtx() {
return ctx;
}

private final Handle<QueueEntry> recyclerHandle;

private QueueEntry(Handle<QueueEntry> recyclerHandle) {
Expand Down Expand Up @@ -368,13 +374,17 @@ public static class ForceWriteRequest {
private long logId;
private boolean flushed;

public int process() {
public int process(ObjectHashSet<BookieRequestHandler> writeHandlers) {
closeFileIfNecessary();

// Notify the waiters that the force write succeeded
for (int i = 0; i < forceWriteWaiters.size(); i++) {
QueueEntry qe = forceWriteWaiters.get(i);
if (qe != null) {
if (qe.getCtx() instanceof BookieRequestHandler
&& qe.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
writeHandlers.add((BookieRequestHandler) qe.getCtx());
}
qe.run();
}
}
Expand Down Expand Up @@ -454,7 +464,6 @@ private class ForceWriteThread extends BookieCriticalThread {
// successful force write
Thread threadToNotifyOnEx;

RequestProcessor requestProcessor;
// should we group force writes
private final boolean enableGroupForceWrites;
private final Counter forceWriteThreadTime;
Expand All @@ -481,6 +490,7 @@ public void run() {
}

final List<ForceWriteRequest> localRequests = new ArrayList<>();
final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>();

while (running) {
try {
Expand All @@ -503,17 +513,16 @@ public void run() {
// responses
for (int i = 0; i < requestsCount; i++) {
ForceWriteRequest req = localRequests.get(i);
numReqInLastForceWrite += req.process();
numReqInLastForceWrite += req.process(writeHandlers);
req.recycle();
}

journalStats.getForceWriteGroupingCountStats()
.registerSuccessfulValue(numReqInLastForceWrite);

if (requestProcessor != null) {
requestProcessor.flushPendingResponses();
}

writeHandlers.forEach(
(ObjectProcedure<? super BookieRequestHandler>)
BookieRequestHandler::flushPendingResponse);
writeHandlers.clear();
} catch (IOException ioe) {
LOG.error("I/O exception in ForceWrite thread", ioe);
running = false;
Expand Down Expand Up @@ -905,7 +914,7 @@ public void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
memoryLimitController.reserveMemory(entry.readableBytes());

queue.put(QueueEntry.create(
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
entry, ackBeforeSync, ledgerId, entryId, cb, ctx, MathUtils.nowInNano(),
journalStats.getJournalAddEntryStats(),
callbackTime));
}
Expand Down Expand Up @@ -983,6 +992,7 @@ public void run() {

long busyStartTime = System.nanoTime();
ArrayDeque<QueueEntry> localQueueEntries = new ArrayDeque<>();
final ObjectHashSet<BookieRequestHandler> writeHandlers = new ObjectHashSet<>();

QueueEntry qe = null;
while (true) {
Expand Down Expand Up @@ -1106,14 +1116,17 @@ journalFormatVersionToWrite, getBufferedChannelBuilder(),
if (entry != null && (!syncData || entry.ackBeforeSync)) {
toFlush.set(i, null);
numEntriesToFlush--;
if (entry.getCtx() instanceof BookieRequestHandler
&& entry.entryId != BookieImpl.METAENTRY_ID_FORCE_LEDGER) {
writeHandlers.add((BookieRequestHandler) entry.getCtx());
}
entry.run();
}

if (forceWriteThread.requestProcessor != null) {
forceWriteThread.requestProcessor.flushPendingResponses();
}
}

writeHandlers.forEach(
(ObjectProcedure<? super BookieRequestHandler>)
BookieRequestHandler::flushPendingResponse);
writeHandlers.clear();
lastFlushPosition = bc.position();
journalStats.getJournalFlushStats().registerSuccessfulEvent(
journalFlushWatcher.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -1230,10 +1243,6 @@ public BufferedChannelBuilder getBufferedChannelBuilder() {
return (FileChannel fc, int capacity) -> new BufferedChannel(allocator, fc, capacity);
}

public void setRequestProcessor(RequestProcessor requestProcessor) {
forceWriteThread.requestProcessor = requestProcessor;
}

/**
* Shuts down the journal.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,4 @@ public interface RequestProcessor extends AutoCloseable {
* channel received the given request <i>r</i>
*/
void processRequest(Object r, BookieRequestHandler channel);

/**
* Flush any pending response staged on all the client connections.
*/
void flushPendingResponses();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
@Slf4j
public class BookieRequestHandler extends ChannelInboundHandlerAdapter {

static final Object EVENT_FLUSH_ALL_PENDING_RESPONSES = new Object();
private static final int DEFAULT_PENDING_RESPONSE_SIZE = 256;

private final RequestProcessor requestProcessor;
private final ChannelGroup allChannels;

private ChannelHandlerContext ctx;

private ByteBuf pendingSendResponses = null;
private int maxPendingResponsesSize;
private int maxPendingResponsesSize = DEFAULT_PENDING_RESPONSE_SIZE;

BookieRequestHandler(ServerConfiguration conf, RequestProcessor processor, ChannelGroup allChannels) {
this.requestProcessor = processor;
Expand All @@ -56,7 +56,7 @@ public ChannelHandlerContext ctx() {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("Channel connected {}", ctx.channel());
log.info("Channel connected {}", ctx.channel());
this.ctx = ctx;
super.channelActive(ctx);
}
Expand Down Expand Up @@ -92,31 +92,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

public synchronized void prepareSendResponseV2(int rc, BookieProtocol.ParsedAddRequest req) {
if (pendingSendResponses == null) {
pendingSendResponses = ctx.alloc().directBuffer(maxPendingResponsesSize != 0
? maxPendingResponsesSize : 256);
pendingSendResponses = ctx().alloc().directBuffer(maxPendingResponsesSize);
}

BookieProtoEncoding.ResponseEnDeCoderPreV3.serializeAddResponseInto(rc, req, pendingSendResponses);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == EVENT_FLUSH_ALL_PENDING_RESPONSES) {
synchronized (this) {
if (pendingSendResponses != null) {
maxPendingResponsesSize = Math.max(maxPendingResponsesSize,
pendingSendResponses.readableBytes());
if (ctx.channel().isActive()) {
ctx.writeAndFlush(pendingSendResponses, ctx.voidPromise());
} else {
pendingSendResponses.release();
}

pendingSendResponses = null;
}
public synchronized void flushPendingResponse() {
if (pendingSendResponses != null) {
maxPendingResponsesSize = (int) Math.max(
maxPendingResponsesSize * 0.5 + 0.5 * pendingSendResponses.readableBytes(),
DEFAULT_PENDING_RESPONSE_SIZE);
if (ctx().channel().isActive()) {
ctx().writeAndFlush(pendingSendResponses, ctx.voidPromise());
} else {
pendingSendResponses.release();
}
} else {
super.userEventTriggered(ctx, evt);
pendingSendResponses = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -699,13 +699,6 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Bookie
}
}

@Override
public void flushPendingResponses() {
for (Channel c : allChannels) {
c.pipeline().fireUserEventTriggered(BookieRequestHandler.EVENT_FLUSH_ALL_PENDING_RESPONSES);
}
}

public long getWaitTimeoutOnBackpressureMillis() {
return waitTimeoutOnBackpressureMillis;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ public BookieServer(ServerConfiguration conf,
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, allocator, nettyServer.allChannels);
this.nettyServer.setRequestProcessor(this.requestProcessor);
this.bookie.setRequestProcessor(this.requestProcessor);
}

/**
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@
<zookeeper.version>3.8.1</zookeeper.version>
<snappy.version>1.1.7.7</snappy.version>
<jctools.version>2.1.2</jctools.version>
<hppc.version>0.9.1</hppc.version>
<!-- plugin dependencies -->
<apache-rat-plugin.version>0.12</apache-rat-plugin.version>
<cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
Expand Down Expand Up @@ -801,6 +802,12 @@
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>

<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>hppc</artifactId>
<version>${hppc.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down