diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index 445ab6093a6de..5584ebe69d633 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -505,6 +505,20 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors()); + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of threads to use for Netty IO." + + " Default is set to `2 * Runtime.getRuntime().availableProcessors()`" + ) + private int numIOThreads = 2 * Runtime.getRuntime().availableProcessors(); + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Number of threads to use for Netty Acceptor." + + " Default is set to `1`" + ) + private int numAcceptorThreads = 1; + @Deprecated @FieldContext( category = CATEGORY_PLUGIN, diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index c0e8e761bb853..800d33a625186 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -108,8 +108,6 @@ public class ProxyService implements Closeable { private final ScheduledExecutorService statsExecutor; - private static final int numThreads = Runtime.getRuntime().availableProcessors(); - static final Gauge activeConnections = Gauge .build("pulsar_proxy_active_connections", "Number of connections currently active in the proxy").create() .register(); @@ -150,8 +148,10 @@ public ProxyService(ProxyConfiguration proxyConfig, } else { proxyLogLevel = 0; } - this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, false, acceptorThreadFactory); - this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, false, workersThreadFactory); + this.acceptorGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumAcceptorThreads(), + false, acceptorThreadFactory); + this.workerGroup = EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), + false, workersThreadFactory); this.authenticationService = authenticationService; // Initialize the message protocol handlers @@ -280,7 +280,7 @@ private void startProxyExtension(String extensionName, EventLoopUtil.enableTriggeredMode(bootstrap); DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ext-" + extensionName); EventLoopGroup dedicatedWorkerGroup = - EventLoopUtil.newEventLoopGroup(numThreads, false, defaultThreadFactory); + EventLoopUtil.newEventLoopGroup(proxyConfig.getNumIOThreads(), false, defaultThreadFactory); extensionsWorkerGroups.add(dedicatedWorkerGroup); bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup)); bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup);