diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java index a92959596b..f602c638ba 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ConcurrentUtils.java @@ -20,6 +20,7 @@ import org.apache.ratis.util.function.CheckedFunction; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -83,4 +84,34 @@ static ExecutorService newCachedThreadPool(int maximumPoolSize, ThreadFactory th return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); } + + /** + * Create a new {@link ExecutorService} with a maximum pool size. + * If it is cached, this method is similar to {@link #newCachedThreadPool(int, ThreadFactory)}. + * Otherwise, this method is similar to {@link java.util.concurrent.Executors#newFixedThreadPool(int)}. + * + * @param cached Use cached thread pool? If not, use a fixed thread pool. + * @param maximumPoolSize the maximum number of threads to allow in the pool. + * @param namePrefix the prefix used in the name of the threads created. + * @return a new {@link ExecutorService}. + */ + static ExecutorService newThreadPoolWithMax(boolean cached, int maximumPoolSize, String namePrefix) { + final ThreadFactory f = newThreadFactory(namePrefix); + return cached ? newCachedThreadPool(maximumPoolSize, f) + : Executors.newFixedThreadPool(maximumPoolSize, f); + } + + /** + * Shutdown the given executor and wait for its termination. + * + * @param executor The executor to be shut down. + */ + static void shutdownAndWait(ExecutorService executor) { + try { + executor.shutdown(); + Preconditions.assertTrue(executor.awaitTermination(1, TimeUnit.DAYS)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } } diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java index eca51ad6b5..a9dddbb0a9 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java @@ -157,6 +157,27 @@ static void setPort(RaftProperties properties, int port) { setInt(properties::setInt, PORT_KEY, port); } + String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX + ".async.request.thread.pool.cached"; + boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = true; + static boolean asyncRequestThreadPoolCached(RaftProperties properties) { + return getBoolean(properties::getBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY, + ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog()); + } + static void setAsyncRequestThreadPoolCached(RaftProperties properties, boolean useCached) { + setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY, useCached); + } + + String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size"; + int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32; + static int asyncRequestThreadPoolSize(RaftProperties properties) { + return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, + ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(), + requireMin(0), requireMax(65536)); + } + static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) { + setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port); + } + String TLS_CONF_PARAMETER = PREFIX + ".tls.conf"; Class TLS_CONF_CLASS = TLS.CONF_CLASS; static GrpcTlsConfig tlsConf(Parameters parameters) { diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java similarity index 95% rename from ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java rename to ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java index 7cd8c08898..9c19684677 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.ratis.grpc.client; +package org.apache.ratis.grpc.server; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.grpc.GrpcUtil; @@ -41,14 +41,15 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; -public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase { - public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class); +class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase { + private static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class); private static class PendingOrderedRequest implements SlidingWindow.ServerSideRequest { private final RaftClientRequest request; @@ -131,12 +132,15 @@ void closeAllExisting(RaftGroupId groupId) { private final Supplier idSupplier; private final RaftClientAsynchronousProtocol protocol; + private final ExecutorService executor; private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers(); - public GrpcClientProtocolService(Supplier idSupplier, RaftClientAsynchronousProtocol protocol) { + GrpcClientProtocolService(Supplier idSupplier, RaftClientAsynchronousProtocol protocol, + ExecutorService executor) { this.idSupplier = idSupplier; this.protocol = protocol; + this.executor = executor; } RaftPeerId getId() { @@ -150,7 +154,7 @@ public StreamObserver ordered(StreamObserver processClientRequest(RaftClientRequest request, Consumer replyHandler) { try { - String errMsg = LOG.isDebugEnabled() ? "processClientRequest for " + request.toString() : ""; + final String errMsg = LOG.isDebugEnabled() ? "processClientRequest for " + request : ""; return protocol.submitClientRequestAsync(request - ).thenAcceptAsync(replyHandler + ).thenAcceptAsync(replyHandler, executor ).exceptionally(exception -> { // TODO: the exception may be from either raft or state machine. // Currently we skip all the following responses when getting an diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java index 0fbdbfc00b..3ce4fc51f1 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java @@ -17,9 +17,9 @@ */ package org.apache.ratis.grpc.server; +import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; -import org.apache.ratis.grpc.client.GrpcClientProtocolService; import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeerId; @@ -44,6 +44,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL; @@ -107,6 +108,7 @@ public static Builder newBuilder() { private final Supplier clientServerAddressSupplier; private final Supplier adminServerAddressSupplier; + private final ExecutorService executor; private final GrpcClientProtocolService clientProtocolService; private final MetricServerInterceptor serverInterceptor; @@ -143,7 +145,12 @@ private GrpcService(RaftServer raftServer, Supplier idSupplier, + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax); } - this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer); + final RaftProperties properties = raftServer.getProperties(); + this.executor = ConcurrentUtils.newThreadPoolWithMax( + GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties), + GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties), + getId() + "-request-"); + this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor); this.serverInterceptor = new MetricServerInterceptor( idSupplier, @@ -272,6 +279,7 @@ public void closeImpl() throws IOException { } serverInterceptor.close(); + ConcurrentUtils.shutdownAndWait(executor); } @Override diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 31634bbcc3..37cf90e541 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -68,7 +68,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -219,21 +218,13 @@ StreamInfo remove(ClientInvocationId key) { this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); final RaftProperties properties = server.getProperties(); - Boolean useCachedThreadPool = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties); - if(useCachedThreadPool) { - this.requestExecutor = ConcurrentUtils.newCachedThreadPool( + final boolean useCachedThreadPool = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties); + this.requestExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool, RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties), - ConcurrentUtils.newThreadFactory(name + "-request-")); - this.writeExecutor = ConcurrentUtils.newCachedThreadPool( + name + "-request-"); + this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool, RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties), - ConcurrentUtils.newThreadFactory(name + "-write-")); - } else { - this.requestExecutor = Executors.newFixedThreadPool( - RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties)); - this.writeExecutor = Executors.newFixedThreadPool( - RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties)); - } - + name + "-write-"); } private CompletableFuture computeDataStreamIfAbsent(RaftClientRequest request) throws IOException { diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java index aa69fa4b5c..8861e2aced 100644 --- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java @@ -108,7 +108,6 @@ private void checkLog(RaftLog raftLog, long expectedCommittedIndex, final String message = "log " + entry + " " + log.getLogEntryBodyCase() + " " + StringUtils.bytes2HexString(logData) + ", expected=" + StringUtils.bytes2HexString(expected); - LOG.info(message); Assert.assertArrayEquals(message, expected, logData); count++; } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java index 77c311c7f1..87cf758647 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java @@ -34,8 +34,11 @@ public class TestGrpcOutputStream extends OutputStreamBaseTest implements MiniRaftClusterWithGrpc.FactoryGet { - static { - Log4jUtils.setLogLevel(GrpcClientStreamer.LOG, Level.ALL); + + { + final RaftProperties p = getProperties(); + GrpcConfigKeys.Server.setAsyncRequestThreadPoolCached(p, false); + GrpcConfigKeys.Server.setAsyncRequestThreadPoolSize(p, 8); } @Override diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java index 4cc82eefaf..fc5c91d757 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftOutputStreamWithGrpc.java @@ -17,18 +17,11 @@ */ package org.apache.ratis.grpc; -import org.apache.log4j.Level; import org.apache.ratis.OutputStreamBaseTest; -import org.apache.ratis.grpc.client.GrpcClientProtocolService; -import org.apache.ratis.util.Log4jUtils; public class TestRaftOutputStreamWithGrpc extends OutputStreamBaseTest implements MiniRaftClusterWithGrpc.FactoryGet { - { - Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE); - } - @Override public int getGlobalTimeoutSeconds() { return 30; diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java index 230c48872b..8dcff758b4 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java @@ -39,7 +39,6 @@ import org.apache.ratis.client.impl.RaftClientTestUtil; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.grpc.client.GrpcClientProtocolClient; -import org.apache.ratis.grpc.client.GrpcClientProtocolService; import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; @@ -78,7 +77,6 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet { { - Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL); Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL); } diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java index 2861102a1b..65c6fabe24 100644 --- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java +++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java @@ -21,7 +21,6 @@ import org.apache.ratis.WatchRequestTests; import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.impl.UnorderedAsync; -import org.apache.ratis.grpc.client.GrpcClientProtocolService; import org.apache.ratis.grpc.client.GrpcClientRpc; import org.apache.ratis.util.Log4jUtils; @@ -29,7 +28,6 @@ public class TestWatchRequestWithGrpc extends WatchRequestTests implements MiniRaftClusterWithGrpc.FactoryGet { { - Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL); Log4jUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL); Log4jUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL); Log4jUtils.setLogLevel(RaftClient.LOG, Level.ALL);