Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1497. Avoid using ForkJoinPool.commonPool() in GrpcClientProtocolService #587

Merged
merged 4 commits into from Jan 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -20,6 +20,7 @@
import org.apache.ratis.util.function.CheckedFunction;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -83,4 +84,20 @@ static ExecutorService newCachedThreadPool(int maximumPoolSize, ThreadFactory th
return new ThreadPoolExecutor(0, maximumPoolSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), threadFactory);
}

/**
* Create a new {@link ExecutorService} with a maximum pool size.
* If it is cached, this method is similar to {@link #newCachedThreadPool(int, ThreadFactory)}.
* Otherwise, this method is similar to {@link java.util.concurrent.Executors#newFixedThreadPool(int)}.
*
* @param cached Use cached thread pool? If not, use a fixed thread pool.
* @param maximumPoolSize the maximum number of threads to allow in the pool.
* @param namePrefix the prefix used in the name of the threads created.
* @return a new {@link ExecutorService}.
*/
static ExecutorService newThreadPoolWithMax(boolean cached, int maximumPoolSize, String namePrefix) {
final ThreadFactory f = newThreadFactory(namePrefix);
return cached ? newCachedThreadPool(maximumPoolSize, f)
: Executors.newFixedThreadPool(maximumPoolSize, f);
}
}
21 changes: 21 additions & 0 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
Expand Up @@ -157,6 +157,27 @@ static void setPort(RaftProperties properties, int port) {
setInt(properties::setInt, PORT_KEY, port);
}

String ASYNC_REQUEST_THREAD_POOL_CACHED_KEY = PREFIX + ".async.request.thread.pool.cached";
boolean ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT = true;
static boolean asyncRequestThreadPoolCached(RaftProperties properties) {
return getBoolean(properties::getBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY,
ASYNC_REQUEST_THREAD_POOL_CACHED_DEFAULT, getDefaultLog());
}
static void setAsyncRequestThreadPoolCached(RaftProperties properties, boolean useCached) {
setBoolean(properties::setBoolean, ASYNC_REQUEST_THREAD_POOL_CACHED_KEY, useCached);
}

String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size";
int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
static int asyncRequestThreadPoolSize(RaftProperties properties) {
return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,
ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
requireMin(0), requireMax(65536));
}
static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) {
setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
}

String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
Expand Down
Expand Up @@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.grpc.client;
package org.apache.ratis.grpc.server;

import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.grpc.GrpcUtil;
Expand All @@ -41,14 +41,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
private static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);

private static class PendingOrderedRequest implements SlidingWindow.ServerSideRequest<RaftClientReply> {
private final RaftClientRequest request;
Expand Down Expand Up @@ -131,12 +132,15 @@ void closeAllExisting(RaftGroupId groupId) {

private final Supplier<RaftPeerId> idSupplier;
private final RaftClientAsynchronousProtocol protocol;
private final ExecutorService executor;

private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers();

public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol,
ExecutorService executor) {
this.idSupplier = idSupplier;
this.protocol = protocol;
this.executor = executor;
}

RaftPeerId getId() {
Expand All @@ -150,7 +154,7 @@ public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientR
return so;
}

public void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
void closeAllOrderedRequestStreamObservers(RaftGroupId groupId) {
LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
orderedStreamObservers.closeAllExisting(groupId);
}
Expand Down Expand Up @@ -220,7 +224,7 @@ CompletableFuture<Void> processClientRequest(RaftClientRequest request, Consumer
try {
String errMsg = LOG.isDebugEnabled() ? "processClientRequest for " + request.toString() : "";
return protocol.submitClientRequestAsync(request
).thenAcceptAsync(replyHandler
).thenAcceptAsync(replyHandler, executor
).exceptionally(exception -> {
// TODO: the exception may be from either raft or state machine.
// Currently we skip all the following responses when getting an
Expand Down
Expand Up @@ -17,9 +17,9 @@
*/
package org.apache.ratis.grpc.server;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
Expand All @@ -44,6 +44,7 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;
Expand Down Expand Up @@ -107,6 +108,7 @@ public static Builder newBuilder() {
private final Supplier<InetSocketAddress> clientServerAddressSupplier;
private final Supplier<InetSocketAddress> adminServerAddressSupplier;

private final ExecutorService executor;
private final GrpcClientProtocolService clientProtocolService;

private final MetricServerInterceptor serverInterceptor;
Expand Down Expand Up @@ -143,7 +145,12 @@ private GrpcService(RaftServer raftServer, Supplier<RaftPeerId> idSupplier,
+ " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
}

this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer);
final RaftProperties properties = raftServer.getProperties();
this.executor = ConcurrentUtils.newThreadPoolWithMax(
GrpcConfigKeys.Server.asyncRequestThreadPoolCached(properties),
GrpcConfigKeys.Server.asyncRequestThreadPoolSize(properties),
getId() + "-request-");
this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer, executor);

this.serverInterceptor = new MetricServerInterceptor(
idSupplier,
Expand Down Expand Up @@ -257,6 +264,7 @@ public void startImpl() {

@Override
public void closeImpl() throws IOException {
executor.shutdown();
for (Server server : servers) {
final String name = getId() + ": shutdown server with port " + server.getPort();
LOG.info("{} now", name);
Expand Down
Expand Up @@ -68,7 +68,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -219,21 +218,13 @@ StreamInfo remove(ClientInvocationId key) {
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());

final RaftProperties properties = server.getProperties();
Boolean useCachedThreadPool = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
if(useCachedThreadPool) {
this.requestExecutor = ConcurrentUtils.newCachedThreadPool(
final boolean useCachedThreadPool = RaftServerConfigKeys.DataStream.asyncRequestThreadPoolCached(properties);
this.requestExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties),
ConcurrentUtils.newThreadFactory(name + "-request-"));
this.writeExecutor = ConcurrentUtils.newCachedThreadPool(
name + "-request-");
this.writeExecutor = ConcurrentUtils.newThreadPoolWithMax(useCachedThreadPool,
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties),
ConcurrentUtils.newThreadFactory(name + "-write-"));
} else {
this.requestExecutor = Executors.newFixedThreadPool(
RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
this.writeExecutor = Executors.newFixedThreadPool(
RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
}

name + "-write-");
}

private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
Expand Down
Expand Up @@ -17,18 +17,11 @@
*/
package org.apache.ratis.grpc;

import org.apache.log4j.Level;
import org.apache.ratis.OutputStreamBaseTest;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.util.Log4jUtils;

public class TestRaftOutputStreamWithGrpc
extends OutputStreamBaseTest<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
{
Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.TRACE);
}

@Override
public int getGlobalTimeoutSeconds() {
return 30;
Expand Down
Expand Up @@ -39,7 +39,6 @@
import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
Expand Down Expand Up @@ -78,7 +77,6 @@

public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
{
Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
Log4jUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
}

Expand Down
Expand Up @@ -21,15 +21,13 @@
import org.apache.ratis.WatchRequestTests;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.UnorderedAsync;
import org.apache.ratis.grpc.client.GrpcClientProtocolService;
import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.util.Log4jUtils;

public class TestWatchRequestWithGrpc
extends WatchRequestTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
{
Log4jUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
Log4jUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL);
Log4jUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL);
Log4jUtils.setLogLevel(RaftClient.LOG, Level.ALL);
Expand Down