diff --git a/conf/broker.conf b/conf/broker.conf index f5cb1ade39616..4b481a39ec666 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -105,9 +105,6 @@ numCacheExecutorThreadPoolSize=10 # reduce the number of IO threads and BK client threads to only have few CPU cores busy. enableBusyWait=false -# Max concurrent web requests -maxConcurrentHttpRequests=1024 - # Flag to control features that are meant to be used when running in standalone mode isRunningStandalone= @@ -783,6 +780,18 @@ httpRequestsLimitEnabled=false # Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests) httpRequestsMaxPerSecond=100.0 +# Capacity for thread pool queue in the HTTP server +httpServerThreadPoolQueueSize=8192 + +# Capacity for accept queue in the HTTP server +httpServerAcceptQueueSize=8192 + +# Maximum number of inbound http connections. (0 to disable limiting) +maxHttpServerConnections=2048 + +# Max concurrent web requests +maxConcurrentHttpRequests=1024 + ### --- BookKeeper Client --- ### # Metadata service uri that bookkeeper is used for loading corresponding metadata driver diff --git a/conf/proxy.conf b/conf/proxy.conf index 19d9e69e1950e..470ce8155cd55 100644 --- a/conf/proxy.conf +++ b/conf/proxy.conf @@ -212,6 +212,17 @@ httpRequestsLimitEnabled=false # Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests) httpRequestsMaxPerSecond=100.0 +# Capacity for thread pool queue in the HTTP server +httpServerThreadPoolQueueSize=8192 + +# Capacity for accept queue in the HTTP server +httpServerAcceptQueueSize=8192 + +# Maximum number of inbound http connections. (0 to disable limiting) +maxHttpServerConnections=2048 + +# Max concurrent web requests +maxConcurrentHttpRequests=1024 ### --- Token Authentication Provider --- ### diff --git a/conf/websocket.conf b/conf/websocket.conf index eff639838fe8a..c871b202021ef 100644 --- a/conf/websocket.conf +++ b/conf/websocket.conf @@ -67,6 +67,18 @@ webSocketSessionIdleTimeoutMillis=300000 # The maximum size of a text message during parsing in WebSocket proxy webSocketMaxTextFrameSize=1048576 +# Capacity for thread pool queue in the HTTP server +httpServerThreadPoolQueueSize=8192 + +# Capacity for accept queue in the HTTP server +httpServerAcceptQueueSize=8192 + +# Maximum number of inbound http connections. (0 to disable limiting) +maxHttpServerConnections=2048 + +# Max concurrent web requests +maxConcurrentHttpRequests=1024 + ### --- Authentication --- ### # Enable authentication diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index eb3f86ac02e45..f28e086a9e9a1 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -296,6 +296,24 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests") private int maxConcurrentHttpRequests = 1024; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Capacity for thread pool queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerThreadPoolQueueSize = 8192; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Capacity for accept queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerAcceptQueueSize = 8192; + + @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. " + + "(0 to disable limiting)") + private int maxHttpServerConnections = 2048; + @FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.") private boolean delayedDeliveryEnabled = true; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java new file mode 100644 index 0000000000000..3b6bb721bcc4d --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/Filters.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import java.util.EnumSet; +import java.util.Map; +import javax.servlet.DispatcherType; +import javax.servlet.Filter; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; + +public class Filters { + private static final String MATCH_ALL = "/*"; + + /** + * Adds a filter instance to the servlet context handler. + * The filter will be used for all requests. + * + * @param context servlet context handler instance + * @param filter filter instance + */ + public static void addFilter(ServletContextHandler context, Filter filter) { + addFilterHolder(context, new FilterHolder(filter)); + } + + private static void addFilterHolder(ServletContextHandler context, FilterHolder filter) { + context.addFilter(filter, + MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + } + + /** + * Adds a filter to the servlet context handler which gets instantiated and configured when the server starts. + * + * @param context servlet context handler instance + * @param filter filter class + * @param initParams initialization parameters used for configuring the filter instance + */ + public static void addFilterClass(ServletContextHandler context, Class filter, + Map initParams) { + FilterHolder holder = new FilterHolder(filter); + holder.setInitParameters(initParams); + addFilterHolder(context, holder); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java index 2a8c9e8e15c4e..c3cbe7a0ab6eb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/WebExecutorThreadPool.java @@ -20,18 +20,19 @@ import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.ThreadFactory; +import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.thread.ExecutorThreadPool; public class WebExecutorThreadPool extends ExecutorThreadPool { private final ThreadFactory threadFactory; - public WebExecutorThreadPool(String namePrefix) { - this(Runtime.getRuntime().availableProcessors(), namePrefix); + public WebExecutorThreadPool(int maxThreads, String namePrefix) { + this(maxThreads, namePrefix, 8192); } - public WebExecutorThreadPool(int maxThreads, String namePrefix) { - super(maxThreads); + public WebExecutorThreadPool(int maxThreads, String namePrefix, int queueCapacity) { + super(maxThreads, Math.min(8, maxThreads), new BlockingArrayQueue<>(queueCapacity, queueCapacity)); this.threadFactory = new DefaultThreadFactory(namePrefix); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java deleted file mode 100644 index e868f9245c343..0000000000000 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarServerConnector.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.broker.web; - -import java.io.IOException; -import java.util.concurrent.Semaphore; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.util.ssl.SslContextFactory; - -public class PulsarServerConnector extends ServerConnector { - - // Throttle down the accept rate to limit the number of active TCP connections - private final Semaphore semaphore = new Semaphore(10000); - - /** - * @param server - * @param acceptors - * @param selectors - */ - public PulsarServerConnector(Server server, int acceptors, int selectors) { - super(server, acceptors, selectors); - } - - /** - * @param server - * @param acceptors - * @param selectors - * @param sslContextFactory - */ - public PulsarServerConnector(Server server, int acceptors, int selectors, SslContextFactory sslContextFactory) { - super(server, acceptors, selectors, sslContextFactory); - } - - @Override - public void accept(int acceptorID) throws IOException { - try { - semaphore.acquire(); - super.accept(acceptorID); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - @Override - protected void onEndPointClosed(EndPoint endp) { - semaphore.release(); - super.onEndPointClosed(endp); - } -} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java index 7e6b1636a5c44..bffd82f7b264a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/WebService.java @@ -18,20 +18,22 @@ */ package org.apache.pulsar.broker.web; +import static org.apache.pulsar.broker.web.Filters.addFilter; +import static org.apache.pulsar.broker.web.Filters.addFilterClass; import com.google.common.collect.Lists; import io.prometheus.client.CollectorRegistry; import io.prometheus.client.jetty.JettyStatisticsCollector; import java.util.ArrayList; -import java.util.EnumSet; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import javax.servlet.DispatcherType; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext; +import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -42,9 +44,9 @@ import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.ResourceHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.QoSFilter; import org.eclipse.jetty.util.resource.Resource; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.jersey.media.multipart.MultiPartFeature; @@ -67,7 +69,6 @@ public class WebService implements AutoCloseable { private final Server server; private final List handlers; private final WebExecutorThreadPool webServiceExecutor; - public final int maxConcurrentRequests; private final ServerConnector httpConnector; private final ServerConnector httpsConnector; @@ -76,16 +77,20 @@ public class WebService implements AutoCloseable { public WebService(PulsarService pulsar) throws PulsarServerException { this.handlers = Lists.newArrayList(); this.pulsar = pulsar; + ServiceConfiguration config = pulsar.getConfiguration(); this.webServiceExecutor = new WebExecutorThreadPool( - pulsar.getConfiguration().getNumHttpServerThreads(), - "pulsar-web"); + config.getNumHttpServerThreads(), + "pulsar-web", + config.getHttpServerThreadPoolQueueSize()); this.server = new Server(webServiceExecutor); - this.maxConcurrentRequests = pulsar.getConfiguration().getMaxConcurrentHttpRequests(); + if (config.getMaxHttpServerConnections() > 0) { + server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); + } List connectors = new ArrayList<>(); - Optional port = pulsar.getConfiguration().getWebServicePort(); + Optional port = config.getWebServicePort(); if (port.isPresent()) { - httpConnector = new PulsarServerConnector(server, 1, 1); + httpConnector = new ServerConnector(server); httpConnector.setPort(port.get()); httpConnector.setHost(pulsar.getBindAddress()); connectors.add(httpConnector); @@ -93,11 +98,10 @@ public WebService(PulsarService pulsar) throws PulsarServerException { httpConnector = null; } - Optional tlsPort = pulsar.getConfiguration().getWebServicePortTls(); + Optional tlsPort = config.getWebServicePortTls(); if (tlsPort.isPresent()) { try { SslContextFactory sslCtxFactory; - ServiceConfiguration config = pulsar.getConfiguration(); if (config.isTlsEnabledWithKeyStore()) { sslCtxFactory = KeyStoreSSLContext.createSslContextFactory( config.getTlsProvider(), @@ -122,7 +126,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException { config.isTlsRequireTrustedClientCertOnConnect(), true, config.getTlsCertRefreshCheckDurationSec()); } - httpsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory); + httpsConnector = new ServerConnector(server, sslCtxFactory); httpsConnector.setPort(tlsPort.get()); httpsConnector.setHost(pulsar.getBindAddress()); connectors.add(httpsConnector); @@ -134,7 +138,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException { } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.forEach(c -> c.setAcceptQueueSize(maxConcurrentRequests / connectors.size())); + connectors.forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } @@ -160,42 +164,42 @@ public void addServlet(String path, ServletHolder servletHolder, boolean require }); } - if (!pulsar.getConfig().getBrokerInterceptors().isEmpty() - || !pulsar.getConfig().isDisableBrokerInterceptors()) { + ServiceConfiguration config = pulsar.getConfig(); + + if (config.getMaxConcurrentHttpRequests() > 0) { + addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests", + String.valueOf(config.getMaxConcurrentHttpRequests()))); + } + + if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) { + addFilter(context, + new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())); + } + + if (!config.getBrokerInterceptors().isEmpty() + || !config.isDisableBrokerInterceptors()) { ExceptionHandler handler = new ExceptionHandler(); // Enable PreInterceptFilter only when interceptors are enabled - context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)), - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); - context.addFilter(new FilterHolder(new ProcessHandlerFilter(pulsar)), - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + addFilter(context, new PreInterceptFilter(pulsar.getBrokerInterceptor(), handler)); + addFilter(context, new ProcessHandlerFilter(pulsar)); } if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) { - FilterHolder filter = new FilterHolder(new AuthenticationFilter( + addFilter(context, new AuthenticationFilter( pulsar.getBrokerService().getAuthenticationService())); - context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); } - if (pulsar.getConfig().isDisableHttpDebugMethods()) { - FilterHolder filter = new FilterHolder(new DisableDebugHttpMethodFilter(pulsar.getConfig())); - context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); - } - - if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) { - context.addFilter( - new FilterHolder(new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())), - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + if (config.isDisableHttpDebugMethods()) { + addFilter(context, new DisableDebugHttpMethodFilter(config)); } - if (pulsar.getConfig().getHttpMaxRequestSize() > 0) { - context.addFilter(new FilterHolder( + if (config.getHttpMaxRequestSize() > 0) { + addFilter(context, new MaxRequestSizeFilter( - pulsar.getConfig().getHttpMaxRequestSize())), - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + config.getHttpMaxRequestSize())); } - FilterHolder responseFilter = new FilterHolder(new ResponseHandlerFilter(pulsar)); - context.addFilter(responseFilter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + addFilter(context, new ResponseHandlerFilter(pulsar)); handlers.add(context); } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 8a247c9cfa59e..73ba951177710 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -138,6 +138,27 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { ) private double httpRequestsMaxPerSecond = 100.0; + @FieldContext(category = CATEGORY_WORKER, doc = "Max concurrent web requests") + private int maxConcurrentHttpRequests = 1024; + + @FieldContext( + category = CATEGORY_WORKER, + doc = "Capacity for thread pool queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerThreadPoolQueueSize = 8192; + + @FieldContext( + category = CATEGORY_WORKER, + doc = "Capacity for accept queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerAcceptQueueSize = 8192; + + @FieldContext(category = CATEGORY_WORKER, doc = "Maximum number of inbound http connections. " + + "(0 to disable limiting)") + private int maxHttpServerConnections = 2048; + @FieldContext( category = CATEGORY_WORKER, required = false, diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index d1eda2acc4983..cef4624a91e32 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -103,6 +103,11 @@ jetty-servlet + + org.eclipse.jetty + jetty-servlets + + org.apache.distributedlog distributedlog-core diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index 0c6821fd9e1e9..feec5e3d4e508 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -18,15 +18,16 @@ */ package org.apache.pulsar.functions.worker.rest; +import static org.apache.pulsar.broker.web.Filters.addFilter; import io.prometheus.client.jetty.JettyStatisticsCollector; import java.util.ArrayList; -import java.util.EnumSet; +import java.util.Collections; import java.util.List; import java.util.Optional; -import javax.servlet.DispatcherType; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.web.AuthenticationFilter; +import org.apache.pulsar.broker.web.Filters; import org.apache.pulsar.broker.web.JettyRequestLogFactory; import org.apache.pulsar.broker.web.RateLimitingFilter; import org.apache.pulsar.broker.web.WebExecutorThreadPool; @@ -35,6 +36,7 @@ import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource; import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStatsApiV2Resource; +import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -43,9 +45,9 @@ import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.QoSFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; @@ -57,7 +59,6 @@ public class WorkerServer { private final WorkerService workerService; private final AuthenticationService authenticationService; private static final String MATCH_ALL = "/*"; - private static final int MAX_CONCURRENT_REQUESTS = 1024; private final WebExecutorThreadPool webServerExecutor; private Server server; @@ -68,7 +69,8 @@ public WorkerServer(WorkerService workerService, AuthenticationService authentic this.workerConfig = workerService.getWorkerConfig(); this.workerService = workerService; this.authenticationService = authenticationService; - this.webServerExecutor = new WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), "function-web"); + this.webServerExecutor = new WebExecutorThreadPool(this.workerConfig.getNumHttpServerThreads(), "function-web", + this.workerConfig.getHttpServerThreadPoolQueueSize()); init(); } @@ -79,9 +81,12 @@ public void start() throws Exception { private void init() { server = new Server(webServerExecutor); + if (workerConfig.getMaxHttpServerConnections() > 0) { + server.addBean(new ConnectionLimit(workerConfig.getMaxHttpServerConnections(), server)); + } List connectors = new ArrayList<>(); - httpConnector = new ServerConnector(server, 1, 1); + httpConnector = new ServerConnector(server); httpConnector.setPort(this.workerConfig.getWorkerPort()); connectors.add(httpConnector); @@ -126,7 +131,7 @@ private void init() { this.workerConfig.isTlsRequireTrustedClientCertOnConnect(), true, this.workerConfig.getTlsCertRefreshCheckDurationSec()); - httpsConnector = new ServerConnector(server, 1, 1, sslCtxFactory); + httpsConnector = new ServerConnector(server, sslCtxFactory); httpsConnector.setPort(this.workerConfig.getWorkerPortTls()); connectors.add(httpsConnector); } catch (Exception e) { @@ -135,7 +140,7 @@ private void init() { } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.forEach(c -> c.setAcceptQueueSize(MAX_CONCURRENT_REQUESTS / connectors.size())); + connectors.forEach(c -> c.setAcceptQueueSize(workerConfig.getHttpServerAcceptQueueSize())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } @@ -161,22 +166,29 @@ public static ServletContextHandler newServletContextHandler(String contextPath, final ServletHolder apiServlet = new ServletHolder(new ServletContainer(config)); - contextHandler.addServlet(apiServlet, "/*"); - if (workerService.getWorkerConfig().isAuthenticationEnabled() && requireAuthentication) { - FilterHolder filter = new FilterHolder(new AuthenticationFilter(authenticationService)); - contextHandler.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); - } + contextHandler.addServlet(apiServlet, MATCH_ALL); + + addQosFilterIfNeeded(contextHandler, workerService.getWorkerConfig()); if (workerService.getWorkerConfig().isHttpRequestsLimitEnabled()) { - contextHandler.addFilter( - new FilterHolder( - new RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond())), - MATCH_ALL, EnumSet.allOf(DispatcherType.class)); + addFilter(contextHandler, + new RateLimitingFilter(workerService.getWorkerConfig().getHttpRequestsMaxPerSecond())); + } + + if (workerService.getWorkerConfig().isAuthenticationEnabled() && requireAuthentication) { + addFilter(contextHandler, new AuthenticationFilter(authenticationService)); } return contextHandler; } + private static void addQosFilterIfNeeded(ServletContextHandler context, WorkerConfig workerConfig) { + if (workerConfig.getMaxConcurrentHttpRequests() > 0) { + Filters.addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests", + String.valueOf(workerConfig.getMaxConcurrentHttpRequests()))); + } + } + public void stop() { if (this.server != null) { try { diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java index e387401dc612a..e64d51c161abb 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java @@ -607,6 +607,27 @@ public class ProxyConfiguration implements PulsarConfiguration { ) private int httpNumThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors()); + @FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests") + private int maxConcurrentHttpRequests = 1024; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Capacity for thread pool queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerThreadPoolQueueSize = 8192; + + @FieldContext( + category = CATEGORY_SERVER, + doc = "Capacity for accept queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerAcceptQueueSize = 8192; + + @FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. " + + "(0 to disable limiting)") + private int maxHttpServerConnections = 2048; + @FieldContext( category = CATEGORY_SERVER, doc = "Number of threads to use for Netty IO." diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java index 84b1153b38629..35f3c9a709d39 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/WebServer.java @@ -18,16 +18,16 @@ */ package org.apache.pulsar.proxy.server; +import static org.apache.pulsar.broker.web.Filters.addFilter; +import static org.apache.pulsar.broker.web.Filters.addFilterClass; import io.prometheus.client.jetty.JettyStatisticsCollector; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.EnumSet; import java.util.List; import java.util.Optional; -import javax.servlet.DispatcherType; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.web.AuthenticationFilter; @@ -37,6 +37,7 @@ import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.apache.pulsar.common.util.SecurityUtility; import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext; +import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; @@ -48,9 +49,9 @@ import org.eclipse.jetty.server.handler.HandlerCollection; import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.QoSFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; @@ -77,8 +78,12 @@ public class WebServer { private ServerConnector connectorTls; public WebServer(ProxyConfiguration config, AuthenticationService authenticationService) { - this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web"); + this.webServiceExecutor = new WebExecutorThreadPool(config.getHttpNumThreads(), "pulsar-external-web", + config.getHttpServerThreadPoolQueueSize()); this.server = new Server(webServiceExecutor); + if (config.getMaxHttpServerConnections() > 0) { + server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); + } this.authenticationService = authenticationService; this.config = config; @@ -89,7 +94,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication if (config.getWebServicePort().isPresent()) { this.externalServicePort = config.getWebServicePort().get(); - connector = new ServerConnector(server, 1, 1, new HttpConnectionFactory(httpConfig)); + connector = new ServerConnector(server, new HttpConnectionFactory(httpConfig)); connector.setHost(config.getBindAddress()); connector.setPort(externalServicePort); connectors.add(connector); @@ -122,7 +127,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication true, config.getTlsCertRefreshCheckDurationSec()); } - connectorTls = new ServerConnector(server, 1, 1, sslCtxFactory); + connectorTls = new ServerConnector(server, sslCtxFactory); connectorTls.setPort(config.getWebServicePortTls().get()); connectorTls.setHost(config.getBindAddress()); connectors.add(connectorTls); @@ -132,7 +137,7 @@ public WebServer(ProxyConfiguration config, AuthenticationService authentication } // Limit number of concurrent HTTP connections to avoid getting out of file descriptors - connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size())); + connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } @@ -159,23 +164,31 @@ public void addServlet(String basePath, ServletHolder servletHolder, ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); - context.addServlet(servletHolder, "/*"); + context.addServlet(servletHolder, MATCH_ALL); for (Pair attribute : attributes) { context.setAttribute(attribute.getLeft(), attribute.getRight()); } - if (config.isAuthenticationEnabled() && requireAuthentication) { - FilterHolder filter = new FilterHolder(new AuthenticationFilter(authenticationService)); - context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class)); - } + + addQosFilterIfNeeded(context); if (config.isHttpRequestsLimitEnabled()) { - context.addFilter(new FilterHolder(new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())), MATCH_ALL, - EnumSet.allOf(DispatcherType.class)); + addFilter(context, new RateLimitingFilter(config.getHttpRequestsMaxPerSecond())); + } + + if (config.isAuthenticationEnabled() && requireAuthentication) { + addFilter(context, new AuthenticationFilter(authenticationService)); } handlers.add(context); } + private void addQosFilterIfNeeded(ServletContextHandler context) { + if (config.getMaxConcurrentHttpRequests() > 0) { + addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests", + String.valueOf(config.getMaxConcurrentHttpRequests()))); + } + } + public void addRestResources(String basePath, String javaPackages, String attribute, Object attributeValue) { ResourceConfig config = new ResourceConfig(); config.packages("jersey.config.server.provider.packages", javaPackages); @@ -184,7 +197,7 @@ public void addRestResources(String basePath, String javaPackages, String attrib servletHolder.setAsyncSupported(true); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); - context.addServlet(servletHolder, "/*"); + context.addServlet(servletHolder, MATCH_ALL); context.setAttribute(attribute, attributeValue); handlers.add(context); } diff --git a/pulsar-websocket/pom.xml b/pulsar-websocket/pom.xml index bf61e3be1854f..4024530084404 100644 --- a/pulsar-websocket/pom.xml +++ b/pulsar-websocket/pom.xml @@ -117,6 +117,11 @@ javax-websocket-client-impl ${jetty.version} + + org.eclipse.jetty + jetty-servlets + ${jetty.version} + org.hdrhistogram HdrHistogram diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java index 6a22e98482757..924f8226f56d3 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/ProxyServer.java @@ -18,9 +18,11 @@ */ package org.apache.pulsar.websocket.service; +import static org.apache.pulsar.broker.web.Filters.addFilterClass; import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -33,6 +35,7 @@ import org.apache.pulsar.broker.web.WebExecutorThreadPool; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.SecurityUtility; +import org.eclipse.jetty.server.ConnectionLimit; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; @@ -42,6 +45,7 @@ import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.QoSFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; @@ -49,6 +53,7 @@ import org.slf4j.LoggerFactory; public class ProxyServer { + private static final String MATCH_ALL = "/*"; private final Server server; private final List handlers = new ArrayList<>(); private final WebSocketProxyConfiguration conf; @@ -60,8 +65,12 @@ public class ProxyServer { public ProxyServer(WebSocketProxyConfiguration config) throws PulsarClientException, MalformedURLException, PulsarServerException { this.conf = config; - executorService = new WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web"); + executorService = new WebExecutorThreadPool(config.getNumHttpServerThreads(), "pulsar-websocket-web", + config.getHttpServerThreadPoolQueueSize()); this.server = new Server(executorService); + if (config.getMaxHttpServerConnections() > 0) { + server.addBean(new ConnectionLimit(config.getMaxHttpServerConnections(), server)); + } List connectors = new ArrayList<>(); if (config.getWebServicePort().isPresent()) { @@ -80,7 +89,7 @@ public ProxyServer(WebSocketProxyConfiguration config) config.isTlsRequireTrustedClientCertOnConnect(), true, config.getTlsCertRefreshCheckDurationSec()); - connectorTls = new ServerConnector(server, -1, -1, sslCtxFactory); + connectorTls = new ServerConnector(server, sslCtxFactory); connectorTls.setPort(config.getWebServicePortTls().get()); connectors.add(connectorTls); } catch (Exception e) { @@ -90,7 +99,7 @@ public ProxyServer(WebSocketProxyConfiguration config) // Limit number of concurrent HTTP connections to avoid getting out of // file descriptors - connectors.stream().forEach(c -> c.setAcceptQueueSize(1024 / connectors.size())); + connectors.stream().forEach(c -> c.setAcceptQueueSize(config.getHttpServerAcceptQueueSize())); server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()])); } @@ -99,7 +108,8 @@ public void addWebSocketServlet(String basePath, Servlet socketServlet) ServletHolder servletHolder = new ServletHolder("ws-events", socketServlet); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); - context.addServlet(servletHolder, "/*"); + context.addServlet(servletHolder, MATCH_ALL); + addQosFilterIfNeeded(context); handlers.add(context); } @@ -111,11 +121,19 @@ public void addRestResources(String basePath, String javaPackages, String attrib servletHolder.setAsyncSupported(true); ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); context.setContextPath(basePath); - context.addServlet(servletHolder, "/*"); + context.addServlet(servletHolder, MATCH_ALL); context.setAttribute(attribute, attributeValue); + addQosFilterIfNeeded(context); handlers.add(context); } + private void addQosFilterIfNeeded(ServletContextHandler context) { + if (conf.getMaxConcurrentHttpRequests() > 0) { + addFilterClass(context, QoSFilter.class, Collections.singletonMap("maxRequests", + String.valueOf(conf.getMaxConcurrentHttpRequests()))); + } + } + public void start() throws PulsarServerException { log.info("Starting web socket proxy at port {}", Arrays.stream(server.getConnectors()) .map(ServerConnector.class::cast).map(ServerConnector::getPort).map(Object::toString) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java index e69400ef04abb..0b29dd15e0772 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketProxyConfiguration.java @@ -135,6 +135,23 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration { @FieldContext(doc = "Number of threads used by Websocket service") private int webSocketNumServiceThreads = 20; + @FieldContext(doc = "Max concurrent web requests") + private int maxConcurrentHttpRequests = 1024; + + @FieldContext(doc = "Capacity for thread pool queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerThreadPoolQueueSize = 8192; + + @FieldContext(doc = "Capacity for accept queue in the HTTP server" + + " Default is set to 8192." + ) + private int httpServerAcceptQueueSize = 8192; + + @FieldContext(doc = "Maximum number of inbound http connections. " + + "(0 to disable limiting)") + private int maxHttpServerConnections = 2048; + @FieldContext(doc = "Number of connections per broker in Pulsar client used in WebSocket proxy") private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();