diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 69b44808bf386..a50ac7055d9b7 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -495,6 +495,20 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors()); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of threads to use for Netty IO." + + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" + ) + private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors(); + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of threads to use for Netty Acceptor." + + " Default is set to `1`" + ) + private int numAcceptorThreads = 1; + @Deprecated @FieldContext( category = CATEGORY_PLUGIN, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index c5e04b60812a2..75f922d1f5e67 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -101,8 +101,6 @@ public class ProxyService implements Closeable { private final ScheduledExecutorService statsExecutor; - private static final int numThreads = Runtime.getRuntime().availableProcessors(); - static final Gauge activeConnections = Gauge .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create() .register(); @@ -143,8 +141,10 @@ public ProxyService(ProxyConfiguration proxyConfig, } else { proxyLogLevel = 0; } - this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory); - this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workersThreadFactory); + this.acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumAcceptorThreads(), + false, acceptorThreadFactory); + this.workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), + false, workersThreadFactory); this.authenticationService = authenticationService; statsExecutor = Executors