From 570faa1ed1b1657dcf59217504969c573d06d311 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 3 Nov 2016 05:32:40 +0900 Subject: [PATCH] BOOKKEEPER-961 - Assing read/write requests for same ledger to a single thread --- .../proto/BookieRequestProcessor.java | 33 +++++++------------ .../bookkeeper/proto/PacketProcessorBase.java | 6 ++-- .../proto/PacketProcessorBaseV3.java | 3 +- .../proto/ReadEntryProcessorV3.java | 4 +-- .../proto/WriteEntryProcessorV3.java | 4 +-- 5 files changed, 21 insertions(+), 29 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 1608328ccc6..4dec39ac40a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -20,15 +20,12 @@ */ package org.apache.bookkeeper.proto; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,12 +52,12 @@ public class BookieRequestProcessor implements RequestProcessor { /** * The threadpool used to execute all read entry requests issued to this server. */ - private final ExecutorService readThreadPool; + private final OrderedSafeExecutor readThreadPool; /** * The threadpool used to execute all add entry requests issued to this server. */ - private final ExecutorService writeThreadPool; + private final OrderedSafeExecutor writeThreadPool; // Expose Stats private final BKStats bkStats = BKStats.getInstance(); @@ -74,12 +71,8 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, StatsLogger statsLogger) { this.serverCfg = serverCfg; this.bookie = bookie; - this.readThreadPool = - createExecutor(this.serverCfg.getNumReadWorkerThreads(), - "BookieReadThread-" + serverCfg.getBookiePort() + "-%d"); - this.writeThreadPool = - createExecutor(this.serverCfg.getNumAddWorkerThreads(), - "BookieWriteThread-" + serverCfg.getBookiePort() + "-%d"); + this.readThreadPool = createExecutor(this.serverCfg.getNumReadWorkerThreads(), "BookieReadThread-" + serverCfg.getBookiePort()); + this.writeThreadPool = createExecutor(this.serverCfg.getNumAddWorkerThreads(), "BookieWriteThread-" + serverCfg.getBookiePort()); // Expose Stats this.statsEnabled = serverCfg.isStatisticsEnabled(); this.addEntryStats = statsLogger.getOpStatsLogger(ADD_ENTRY); @@ -94,16 +87,15 @@ public void close() { shutdownExecutor(readThreadPool); } - private ExecutorService createExecutor(int numThreads, String nameFormat) { + private OrderedSafeExecutor createExecutor(int numThreads, String nameFormat) { if (numThreads <= 0) { return null; } else { - return Executors.newFixedThreadPool(numThreads, - new ThreadFactoryBuilder().setNameFormat(nameFormat).build()); + return OrderedSafeExecutor.newBuilder().numThreads(numThreads).name(nameFormat).build(); } } - private void shutdownExecutor(ExecutorService service) { + private void shutdownExecutor(OrderedSafeExecutor service) { if (null != service) { service.shutdown(); } @@ -160,7 +152,7 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann if (null == writeThreadPool) { write.run(); } else { - writeThreadPool.submit(write); + writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), write); } } @@ -169,7 +161,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan if (null == readThreadPool) { read.run(); } else { - readThreadPool.submit(read); + readThreadPool.submitOrdered(r.getReadRequest().getLedgerId(), read); } } @@ -178,7 +170,7 @@ private void processAddRequest(final BookieProtocol.Request r, final Channel c) if (null == writeThreadPool) { write.run(); } else { - writeThreadPool.submit(write); + writeThreadPool.submitOrdered(r.getLedgerId(), write); } } @@ -187,8 +179,7 @@ private void processReadRequest(final BookieProtocol.Request r, final Channel c) if (null == readThreadPool) { read.run(); } else { - readThreadPool.submit(read); + readThreadPool.submitOrdered(r.getLedgerId(), read); } } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java index a2dc4d8b161..681f6c667aa 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java @@ -19,15 +19,15 @@ import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookieProtocol.Request; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.jboss.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -abstract class PacketProcessorBase implements Runnable { +abstract class PacketProcessorBase extends SafeRunnable { private final static Logger logger = LoggerFactory.getLogger(PacketProcessorBase.class); final Request request; final Channel channel; @@ -64,7 +64,7 @@ protected void sendResponse(int rc, Object response, OpStatsLogger statsLogger) } @Override - public void run() { + public void safeRun() { if (!isVersionCompatible()) { sendResponse(BookieProtocol.EBADVERSION, ResponseBuilder.buildErrorResponse(BookieProtocol.EBADVERSION, request), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java index 9ffca5370b1..85ec6cb87c3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java @@ -28,9 +28,10 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; +import org.apache.bookkeeper.util.SafeRunnable; import org.jboss.netty.channel.Channel; -public abstract class PacketProcessorBaseV3 { +public abstract class PacketProcessorBaseV3 extends SafeRunnable { final Request request; final Channel channel; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index 4073d410055..b9037c11e38 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -38,7 +38,7 @@ import com.google.protobuf.ByteString; -class ReadEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable { +class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { private final static Logger LOG = LoggerFactory.getLogger(ReadEntryProcessorV3.class); @@ -148,7 +148,7 @@ private ReadResponse getReadResponse() { } @Override - public void run() { + public void safeRun() { ReadResponse readResponse = getReadResponse(); sendResponse(readResponse); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java index 6517d8f0a3f..242ed814934 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3.java @@ -36,7 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class WriteEntryProcessorV3 extends PacketProcessorBaseV3 implements Runnable { +class WriteEntryProcessorV3 extends PacketProcessorBaseV3 { private final static Logger logger = LoggerFactory.getLogger(WriteEntryProcessorV3.class); public WriteEntryProcessorV3(Request request, Channel channel, @@ -138,7 +138,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, } @Override - public void run() { + public void safeRun() { AddResponse addResponse = getAddResponse(); if (null != addResponse) { // This means there was an error and we should send this back.