diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServer.java b/src/main/java/org/littleshoot/proxy/HttpProxyServer.java
index 7014ce924..5f2a57832 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServer.java
@@ -14,13 +14,14 @@ public interface HttpProxyServer {
/**
*
* Clone the existing server, with a port 1 higher and everything else the
- * same.
+ * same. If the proxy was started with port 0 (JVM-assigned port), the cloned proxy will also use a JVM-assigned
+ * port.
*
*
*
* The new server will share event loops with the original server. The event
* loops will use whatever name was given to the first server in the clone
- * group.
+ * group. The server group will not terminate until the original server and all clones terminate.
*
*
* @return a bootstrap that allows customizing and starting the cloned
diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
index c4241e7ad..cad274a84 100644
--- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
+++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java
@@ -1,5 +1,7 @@
package org.littleshoot.proxy;
+import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
+
import java.net.InetSocketAddress;
/**
@@ -309,4 +311,11 @@ HttpProxyServerBootstrap withConnectTimeout(
*/
HttpProxyServer start();
+ /**
+ * Set the configuration parameters for the proxy's thread pools.
+ *
+ * @param configuration thread pool configuration
+ * @return proxy server bootstrap for chaining
+ */
+ HttpProxyServerBootstrap withThreadPoolConfiguration(ThreadPoolConfiguration configuration);
}
\ No newline at end of file
diff --git a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java
deleted file mode 100644
index 9efe836b9..000000000
--- a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package org.littleshoot.proxy;
-
-/**
- * This error indicates that the system was asked to use a TransportProtocol
- * that it didn't know how to handle.
- */
-public class UnknownTransportProtocolError extends Error {
- private static final long serialVersionUID = 1L;
-
- public UnknownTransportProtocolError(TransportProtocol transportProtocol) {
- super(String.format("Unknown TransportProtocol: %1$s",
- transportProtocol));
- }
-}
diff --git a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java
new file mode 100644
index 000000000..b264b818b
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java
@@ -0,0 +1,12 @@
+package org.littleshoot.proxy;
+
+/**
+ * This exception indicates that the system was asked to use a TransportProtocol that it didn't know how to handle.
+ */
+public class UnknownTransportProtocolException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public UnknownTransportProtocolException(TransportProtocol transportProtocol) {
+ super(String.format("Unknown TransportProtocol: %1$s", transportProtocol));
+ }
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java b/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java
new file mode 100644
index 000000000..8ddbc39e1
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java
@@ -0,0 +1,52 @@
+package org.littleshoot.proxy.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A ThreadFactory that adds LittleProxy-specific information to the threads' names.
+ */
+public class CategorizedThreadFactory implements ThreadFactory {
+ private static final Logger log = LoggerFactory.getLogger(CategorizedThreadFactory.class);
+
+ private final String name;
+ private final String category;
+ private final int uniqueServerGroupId;
+
+ private AtomicInteger threadCount = new AtomicInteger(0);
+
+ /**
+ * Exception handler for proxy threads. Logs the name of the thread and the exception that was caught.
+ */
+ private static final Thread.UncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = new Thread.UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ log.error("Uncaught throwable in thread: {}", t.getName(), e);
+ }
+ };
+
+
+ /**
+ * @param name the user-supplied name of this proxy
+ * @param category the type of threads this factory is creating (acceptor, client-to-proxy worker, proxy-to-server worker)
+ * @param uniqueServerGroupId a unique number for the server group creating this thread factory, to differentiate multiple proxy instances with the same name
+ */
+ public CategorizedThreadFactory(String name, String category, int uniqueServerGroupId) {
+ this.category = category;
+ this.name = name;
+ this.uniqueServerGroupId = uniqueServerGroupId;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, name + "-" + uniqueServerGroupId + "-" + category + "-" + threadCount.getAndIncrement());
+
+ t.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER);
+
+ return t;
+ }
+
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
index 05e5d2658..3c8afe663 100644
--- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
+++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
@@ -1,6 +1,5 @@
package org.littleshoot.proxy.impl;
-import static org.littleshoot.proxy.TransportProtocol.*;
import io.netty.bootstrap.ChannelFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
@@ -13,33 +12,10 @@
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.udt.nio.NioUdtProvider;
import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.util.concurrent.GlobalEventExecutor;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLEngine;
-
import org.apache.commons.io.IOUtils;
import org.littleshoot.proxy.ActivityTracker;
import org.littleshoot.proxy.ChainedProxyManager;
@@ -55,21 +31,34 @@
import org.littleshoot.proxy.ProxyAuthenticator;
import org.littleshoot.proxy.SslEngineSource;
import org.littleshoot.proxy.TransportProtocol;
-import org.littleshoot.proxy.UnknownTransportProtocolError;
+import org.littleshoot.proxy.UnknownTransportProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLEngine;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
*
* Primary implementation of an {@link HttpProxyServer}.
*
- *
+ *
*
* {@link DefaultHttpProxyServer} is bootstrapped by calling
* {@link #bootstrap()} or {@link #bootstrapFromFile(String)}, and then calling
* {@link DefaultHttpProxyServerBootstrap#start()}. For example:
*
- *
+ *
*
* DefaultHttpProxyServer server =
* DefaultHttpProxyServer
@@ -77,7 +66,7 @@
* .withPort(8090)
* .start();
*
- *
+ *
*/
public class DefaultHttpProxyServer implements HttpProxyServer {
@@ -87,8 +76,7 @@ public class DefaultHttpProxyServer implements HttpProxyServer {
*/
private static final long TRAFFIC_SHAPING_CHECK_INTERVAL_MS = 250L;
- private static final Logger LOG = LoggerFactory
- .getLogger(DefaultHttpProxyServer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpProxyServer.class);
/**
* Our {@link ServerGroup}. Multiple proxy servers can share the same
@@ -119,14 +107,35 @@ public class DefaultHttpProxyServer implements HttpProxyServer {
private final HostResolver serverResolver;
private volatile GlobalTrafficShapingHandler globalTrafficShapingHandler;
+ /**
+ * True when the proxy has already been stopped by calling {@link #stop()} or {@link #abort()}.
+ */
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+
/**
* Track all ActivityTrackers for tracking proxying activity.
*/
private final Collection activityTrackers = new ConcurrentLinkedQueue();
+ /**
+ * Keep track of all channels created by this proxy server for later shutdown when the proxy is stopped.
+ */
+ private final ChannelGroup allChannels = new DefaultChannelGroup("HTTP-Proxy-Server", GlobalEventExecutor.INSTANCE);
+
+ /**
+ * JVM shutdown hook to shutdown this proxy server. Declared as a class-level variable to allow removing the shutdown hook when the
+ * proxy server is stopped normally.
+ */
+ private final Thread jvmShutdownHook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ abort();
+ }
+ }, "LittleProxy-JVM-shutdown-hook");
+
/**
* Bootstrap a new {@link DefaultHttpProxyServer} starting from scratch.
- *
+ *
* @return
*/
public static HttpProxyServerBootstrap bootstrap() {
@@ -136,7 +145,7 @@ public static HttpProxyServerBootstrap bootstrap() {
/**
* Bootstrap a new {@link DefaultHttpProxyServer} using defaults from the
* given file.
- *
+ *
* @param path
* @return
*/
@@ -161,7 +170,7 @@ public static HttpProxyServerBootstrap bootstrapFromFile(String path) {
/**
* Creates a new proxy server.
- *
+ *
* @param serverGroup
* our ServerGroup for shared thread pools and such
* @param transportProtocol
@@ -204,22 +213,22 @@ public static HttpProxyServerBootstrap bootstrapFromFile(String path) {
* write throttle bandwidth
*/
private DefaultHttpProxyServer(ServerGroup serverGroup,
- TransportProtocol transportProtocol,
- InetSocketAddress requestedAddress,
- SslEngineSource sslEngineSource,
- boolean authenticateSslClients,
- ProxyAuthenticator proxyAuthenticator,
- ChainedProxyManager chainProxyManager,
- MitmManager mitmManager,
- HttpFiltersSource filtersSource,
- boolean transparent,
- int idleConnectionTimeout,
- Collection activityTrackers,
- int connectTimeout,
- HostResolver serverResolver,
- long readThrottleBytesPerSecond,
- long writeThrottleBytesPerSecond,
- InetSocketAddress localAddress) {
+ TransportProtocol transportProtocol,
+ InetSocketAddress requestedAddress,
+ SslEngineSource sslEngineSource,
+ boolean authenticateSslClients,
+ ProxyAuthenticator proxyAuthenticator,
+ ChainedProxyManager chainProxyManager,
+ MitmManager mitmManager,
+ HttpFiltersSource filtersSource,
+ boolean transparent,
+ int idleConnectionTimeout,
+ Collection activityTrackers,
+ int connectTimeout,
+ HostResolver serverResolver,
+ long readThrottleBytesPerSecond,
+ long writeThrottleBytesPerSecond,
+ InetSocketAddress localAddress) {
this.serverGroup = serverGroup;
this.transportProtocol = transportProtocol;
this.requestedAddress = requestedAddress;
@@ -314,54 +323,123 @@ public long getWriteThrottle() {
@Override
public HttpProxyServerBootstrap clone() {
- return new DefaultHttpProxyServerBootstrap(this, transportProtocol,
+ return new DefaultHttpProxyServerBootstrap(serverGroup,
+ transportProtocol,
new InetSocketAddress(requestedAddress.getAddress(),
requestedAddress.getPort() == 0 ? 0 : requestedAddress.getPort() + 1),
- sslEngineSource,
- authenticateSslClients,
- proxyAuthenticator,
- chainProxyManager,
- mitmManager,
- filtersSource,
- transparent,
- idleConnectionTimeout,
- activityTrackers,
- connectTimeout,
- serverResolver,
- globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getReadLimit() : 0,
- globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getWriteLimit() : 0,
- localAddress);
+ sslEngineSource,
+ authenticateSslClients,
+ proxyAuthenticator,
+ chainProxyManager,
+ mitmManager,
+ filtersSource,
+ transparent,
+ idleConnectionTimeout,
+ activityTrackers,
+ connectTimeout,
+ serverResolver,
+ globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getReadLimit() : 0,
+ globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getWriteLimit() : 0,
+ localAddress);
}
@Override
public void stop() {
- serverGroup.stop(true);
+ doStop(true);
}
@Override
public void abort() {
- serverGroup.stop(false);
+ doStop(false);
}
- private HttpProxyServer start() {
- LOG.info("Starting proxy at address: " + this.requestedAddress);
-
- synchronized (serverGroup) {
- if (!serverGroup.stopped) {
- doStart();
+ /**
+ * Performs cleanup necessary to stop the server. Closes all channels opened by the server and unregisters this
+ * server from the server group.
+ *
+ * @param graceful when true, waits for requests to terminate before stopping the server
+ */
+ protected void doStop(boolean graceful) {
+ // only stop the server if it hasn't already been stopped
+ if (stopped.compareAndSet(false, true)) {
+ if (graceful) {
+ LOG.info("Shutting down proxy server gracefully");
} else {
- throw new Error("Already stopped");
+ LOG.info("Shutting down proxy server immediately (non-graceful)");
}
+
+ closeAllChannels(graceful);
+
+ serverGroup.unregisterProxyServer(this, graceful);
+
+ // remove the shutdown hook that was added when the proxy was started, since it has now been stopped
+ try {
+ Runtime.getRuntime().removeShutdownHook(jvmShutdownHook);
+ } catch (IllegalStateException e) {
+ // ignore -- IllegalStateException means the VM is already shutting down
+ }
+
+ LOG.info("Done shutting down proxy server");
+ }
+ }
+
+ /**
+ * Register a new {@link Channel} with this server, for later closing.
+ *
+ * @param channel
+ */
+ protected void registerChannel(Channel channel) {
+ allChannels.add(channel);
+ }
+
+ /**
+ * Closes all channels opened by this proxy server.
+ *
+ * @param graceful when false, attempts to shutdown all channels immediately and ignores any channel-closing exceptions
+ */
+ protected void closeAllChannels(boolean graceful) {
+ LOG.info("Closing all channels " + (graceful ? "(graceful)" : "(non-graceful)"));
+
+ ChannelGroupFuture future = allChannels.close();
+
+ // if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them.
+ if (graceful) {
+ try {
+ future.await(10, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ LOG.warn("Interrupted while waiting for channels to shut down gracefully.");
+ }
+
+ if (!future.isSuccess()) {
+ for (ChannelFuture cf : future) {
+ if (!cf.isSuccess()) {
+ LOG.info("Unable to close channel. Cause of failure for {} is {}", cf.channel(), cf.cause());
+ }
+ }
+ }
+ }
+ }
+
+ private HttpProxyServer start() {
+ if (!serverGroup.isStopped()) {
+ LOG.info("Starting proxy at address: " + this.requestedAddress);
+
+ serverGroup.registerProxyServer(this);
+
+ doStart();
+ } else {
+ throw new IllegalStateException("Attempted to start proxy, but proxy's server group is already stopped");
}
return this;
}
private void doStart() {
- serverGroup.ensureProtocol(transportProtocol);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(
- serverGroup.clientToProxyBossPools.get(transportProtocol),
- serverGroup.clientToProxyWorkerPools.get(transportProtocol));
+ serverGroup.getClientToProxyAcceptorPoolForTransport(transportProtocol),
+ serverGroup.getClientToProxyWorkerPoolForTransport(transportProtocol));
ChannelInitializer initializer = new ChannelInitializer() {
protected void initChannel(Channel ch) throws Exception {
@@ -374,23 +452,23 @@ protected void initChannel(Channel ch) throws Exception {
};
};
switch (transportProtocol) {
- case TCP:
- LOG.info("Proxy listening with TCP transport");
- serverBootstrap.channelFactory(new ChannelFactory() {
- @Override
- public ServerChannel newChannel() {
- return new NioServerSocketChannel();
- }
- });
- break;
- case UDT:
- LOG.info("Proxy listening with UDT transport");
- serverBootstrap.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
- .option(ChannelOption.SO_BACKLOG, 10)
- .option(ChannelOption.SO_REUSEADDR, true);
- break;
- default:
- throw new UnknownTransportProtocolError(transportProtocol);
+ case TCP:
+ LOG.info("Proxy listening with TCP transport");
+ serverBootstrap.channelFactory(new ChannelFactory() {
+ @Override
+ public ServerChannel newChannel() {
+ return new NioServerSocketChannel();
+ }
+ });
+ break;
+ case UDT:
+ LOG.info("Proxy listening with UDT transport");
+ serverBootstrap.channelFactory(NioUdtProvider.BYTE_ACCEPTOR)
+ .option(ChannelOption.SO_BACKLOG, 10)
+ .option(ChannelOption.SO_REUSEADDR, true);
+ break;
+ default:
+ throw new UnknownTransportProtocolException(transportProtocol);
}
serverBootstrap.childHandler(initializer);
ChannelFuture future = serverBootstrap.bind(requestedAddress)
@@ -411,15 +489,8 @@ public void operationComplete(ChannelFuture future)
this.boundAddress = ((InetSocketAddress) future.channel().localAddress());
LOG.info("Proxy started at address: " + this.boundAddress);
- }
- /**
- * Register a new {@link Channel} with this server, for later closing.
- *
- * @param channel
- */
- protected void registerChannel(Channel channel) {
- this.serverGroup.allChannels.add(channel);
+ Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
}
protected ChainedProxyManager getChainProxyManager() {
@@ -446,216 +517,15 @@ protected Collection getActivityTrackers() {
return activityTrackers;
}
- protected EventLoopGroup getProxyToServerWorkerFor(
- TransportProtocol transportProtocol) {
- synchronized (serverGroup) {
- serverGroup.ensureProtocol(transportProtocol);
- return serverGroup.proxyToServerWorkerPools.get(transportProtocol);
- }
- }
-
- /**
- * Represents a group of servers that share thread pools.
- */
- private static class ServerGroup {
- private static final int INCOMING_ACCEPTOR_THREADS = 2;
- private static final int INCOMING_WORKER_THREADS = 8;
- private static final int OUTGOING_WORKER_THREADS = 8;
-
- /**
- * A name for this ServerGroup to use in naming threads.
- */
- private final String name;
-
- /**
- * Keep track of all channels for later shutdown.
- */
- private final ChannelGroup allChannels = new DefaultChannelGroup(
- "HTTP-Proxy-Server", GlobalEventExecutor.INSTANCE);
-
- /**
- * These {@link EventLoopGroup}s accept incoming connections to the
- * proxies. A different EventLoopGroup is used for each
- * TransportProtocol, since these have to be configured differently.
- *
- * Thread safety: Only accessed while synchronized on the server group.
- */
- private final Map clientToProxyBossPools = new HashMap();
-
- /**
- * These {@link EventLoopGroup}s process incoming requests to the
- * proxies. A different EventLoopGroup is used for each
- * TransportProtocol, since these have to be configured differently.
- *
- * Thread safety: Only accessed while synchronized on the server group.
- * *
- */
- private final Map clientToProxyWorkerPools = new HashMap();
-
- /**
- * These {@link EventLoopGroup}s are used for making outgoing
- * connections to servers. A different EventLoopGroup is used for each
- * TransportProtocol, since these have to be configured differently.
- */
- private final Map proxyToServerWorkerPools = new HashMap();
-
- private volatile boolean stopped = false;
-
- /**
- * JVM shutdown hook to stop this server group. Declared as a class-level variable to allow removing the shutdown hook when the
- * server is stopped normally.
- */
- private final Thread serverGroupShutdownHook = new Thread(new Runnable() {
- @Override
- public void run() {
- stop(false);
- }
- });
-
- private ServerGroup(String name) {
- this.name = name;
-
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.error("Uncaught throwable", e);
- }
- });
-
- Runtime.getRuntime().addShutdownHook(serverGroupShutdownHook);
- }
-
- public synchronized void ensureProtocol(
- TransportProtocol transportProtocol) {
- if (!clientToProxyWorkerPools.containsKey(transportProtocol)) {
- initializeTransport(transportProtocol);
- }
- }
-
- private void initializeTransport(TransportProtocol transportProtocol) {
- SelectorProvider selectorProvider = null;
- switch (transportProtocol) {
- case TCP:
- selectorProvider = SelectorProvider.provider();
- break;
- case UDT:
- selectorProvider = NioUdtProvider.BYTE_PROVIDER;
- break;
- default:
- throw new UnknownTransportProtocolError(transportProtocol);
- }
-
- NioEventLoopGroup inboundAcceptorGroup = new NioEventLoopGroup(
- INCOMING_ACCEPTOR_THREADS,
- new CategorizedThreadFactory("ClientToProxyAcceptor"),
- selectorProvider);
- NioEventLoopGroup inboundWorkerGroup = new NioEventLoopGroup(
- INCOMING_WORKER_THREADS,
- new CategorizedThreadFactory("ClientToProxyWorker"),
- selectorProvider);
- inboundWorkerGroup.setIoRatio(90);
- NioEventLoopGroup outboundWorkerGroup = new NioEventLoopGroup(
- OUTGOING_WORKER_THREADS,
- new CategorizedThreadFactory("ProxyToServerWorker"),
- selectorProvider);
- outboundWorkerGroup.setIoRatio(90);
- this.clientToProxyBossPools.put(transportProtocol,
- inboundAcceptorGroup);
- this.clientToProxyWorkerPools.put(transportProtocol,
- inboundWorkerGroup);
- this.proxyToServerWorkerPools.put(transportProtocol,
- outboundWorkerGroup);
- }
-
- synchronized private void stop(boolean graceful) {
- if (graceful) {
- LOG.info("Shutting down proxy gracefully");
- } else {
- LOG.info("Shutting down proxy immediately (non-graceful)");
- }
-
- if (stopped) {
- LOG.info("Already stopped");
- return;
- }
-
- LOG.info("Closing all channels...");
-
- final ChannelGroupFuture future = allChannels.close();
-
- // if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them.
- if (graceful) {
- future.awaitUninterruptibly(10 * 1000);
-
- if (!future.isSuccess()) {
- final Iterator iter = future.iterator();
- while (iter.hasNext()) {
- final ChannelFuture cf = iter.next();
- if (!cf.isSuccess()) {
- LOG.info(
- "Unable to close channel. Cause of failure for {} is {}",
- cf.channel(),
- cf.cause());
- }
- }
- }
- }
-
- LOG.info("Shutting down event loops");
- List allEventLoopGroups = new ArrayList();
- allEventLoopGroups.addAll(clientToProxyBossPools.values());
- allEventLoopGroups.addAll(clientToProxyWorkerPools.values());
- allEventLoopGroups.addAll(proxyToServerWorkerPools.values());
- for (EventLoopGroup group : allEventLoopGroups) {
- if (graceful) {
- group.shutdownGracefully();
- } else {
- group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
- }
- }
-
- if (graceful) {
- for (EventLoopGroup group : allEventLoopGroups) {
- try {
- group.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- LOG.warn("Interrupted while shutting down event loop");
- }
- }
- }
-
- // remove the shutdown hook that was added when the server group was started, since it has now been stopped
- try {
- Runtime.getRuntime().removeShutdownHook(serverGroupShutdownHook);
- } catch (IllegalStateException e) {
- // ignore -- IllegalStateException means the VM is already shutting down
- }
-
- stopped = true;
-
- LOG.info("Done shutting down proxy");
- }
-
- private class CategorizedThreadFactory implements ThreadFactory {
- private String category;
- private int num = 0;
-
- public CategorizedThreadFactory(String category) {
- super();
- this.category = category;
- }
-
- public Thread newThread(final Runnable r) {
- final Thread t = new Thread(r,
- name + "-" + category + "-" + num++);
- return t;
- }
- }
+ protected EventLoopGroup getProxyToServerWorkerFor(TransportProtocol transportProtocol) {
+ return serverGroup.getProxyToServerWorkerPoolForTransport(transportProtocol);
}
- private static class DefaultHttpProxyServerBootstrap implements
- HttpProxyServerBootstrap {
+ // TODO: refactor bootstrap into a separate class
+ private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerBootstrap {
private String name = "LittleProxy";
- private TransportProtocol transportProtocol = TCP;
+ private ServerGroup serverGroup = null;
+ private TransportProtocol transportProtocol = TransportProtocol.TCP;
private InetSocketAddress requestedAddress;
private int port = 8080;
private boolean allowLocalOnly = true;
@@ -668,19 +538,21 @@ private static class DefaultHttpProxyServerBootstrap implements
private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter();
private boolean transparent = false;
private int idleConnectionTimeout = 70;
- private DefaultHttpProxyServer original;
private Collection activityTrackers = new ConcurrentLinkedQueue();
private int connectTimeout = 40000;
private HostResolver serverResolver = new DefaultHostResolver();
private long readThrottleBytesPerSecond;
private long writeThrottleBytesPerSecond;
private InetSocketAddress localAddress;
+ private int clientToProxyAcceptorThreads = ServerGroup.DEFAULT_INCOMING_ACCEPTOR_THREADS;
+ private int clientToProxyWorkerThreads = ServerGroup.DEFAULT_INCOMING_WORKER_THREADS;
+ private int proxyToServerWorkerThreads = ServerGroup.DEFAULT_OUTGOING_WORKER_THREADS;
private DefaultHttpProxyServerBootstrap() {
}
private DefaultHttpProxyServerBootstrap(
- DefaultHttpProxyServer original,
+ ServerGroup serverGroup,
TransportProtocol transportProtocol,
InetSocketAddress requestedAddress,
SslEngineSource sslEngineSource,
@@ -695,7 +567,7 @@ private DefaultHttpProxyServerBootstrap(
long readThrottleBytesPerSecond,
long writeThrottleBytesPerSecond,
InetSocketAddress localAddress) {
- this.original = original;
+ this.serverGroup = serverGroup;
this.transportProtocol = transportProtocol;
this.requestedAddress = requestedAddress;
this.port = requestedAddress.getPort();
@@ -881,14 +753,22 @@ public HttpProxyServer start() {
return build().start();
}
+ @Override
+ public HttpProxyServerBootstrap withThreadPoolConfiguration(ThreadPoolConfiguration configuration) {
+ this.clientToProxyAcceptorThreads = configuration.getAcceptorThreads();
+ this.clientToProxyWorkerThreads = configuration.getClientToProxyWorkerThreads();
+ this.proxyToServerWorkerThreads = configuration.getProxyToServerWorkerThreads();
+ return this;
+ }
+
private DefaultHttpProxyServer build() {
final ServerGroup serverGroup;
- if (original != null) {
- serverGroup = original.serverGroup;
+ if (this.serverGroup != null) {
+ serverGroup = this.serverGroup;
}
else {
- serverGroup = new ServerGroup(name);
+ serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads);
}
return new DefaultHttpProxyServer(serverGroup,
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java b/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java
new file mode 100644
index 000000000..2ca4f0fd9
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java
@@ -0,0 +1,64 @@
+package org.littleshoot.proxy.impl;
+
+import com.google.common.collect.ImmutableList;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.nio.channels.spi.SelectorProvider;
+import java.util.List;
+
+/**
+ * Encapsulates the thread pools used by the proxy. Contains the acceptor thread pool as well as the client-to-proxy and
+ * proxy-to-server thread pools.
+ */
+public class ProxyThreadPools {
+ /**
+ * These {@link EventLoopGroup}s accept incoming connections to the
+ * proxies. A different EventLoopGroup is used for each
+ * TransportProtocol, since these have to be configured differently.
+ */
+ private final NioEventLoopGroup clientToProxyAcceptorPool;
+
+ /**
+ * These {@link EventLoopGroup}s process incoming requests to the
+ * proxies. A different EventLoopGroup is used for each
+ * TransportProtocol, since these have to be configured differently.
+ */
+ private final NioEventLoopGroup clientToProxyWorkerPool;
+
+ /**
+ * These {@link EventLoopGroup}s are used for making outgoing
+ * connections to servers. A different EventLoopGroup is used for each
+ * TransportProtocol, since these have to be configured differently.
+ */
+ private final NioEventLoopGroup proxyToServerWorkerPool;
+
+ public ProxyThreadPools(SelectorProvider selectorProvider, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads, String serverGroupName, int serverGroupId) {
+ clientToProxyAcceptorPool = new NioEventLoopGroup(incomingAcceptorThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyAcceptor", serverGroupId), selectorProvider);
+
+ clientToProxyWorkerPool = new NioEventLoopGroup(incomingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyWorker", serverGroupId), selectorProvider);
+ clientToProxyWorkerPool.setIoRatio(90);
+
+ proxyToServerWorkerPool = new NioEventLoopGroup(outgoingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ProxyToServerWorker", serverGroupId), selectorProvider);
+ proxyToServerWorkerPool.setIoRatio(90);
+ }
+
+ /**
+ * Returns all event loops (acceptor and worker thread pools) in this pool.
+ */
+ public List getAllEventLoops() {
+ return ImmutableList.of(clientToProxyAcceptorPool, clientToProxyWorkerPool, proxyToServerWorkerPool);
+ }
+
+ public NioEventLoopGroup getClientToProxyAcceptorPool() {
+ return clientToProxyAcceptorPool;
+ }
+
+ public NioEventLoopGroup getClientToProxyWorkerPool() {
+ return clientToProxyWorkerPool;
+ }
+
+ public NioEventLoopGroup getProxyToServerWorkerPool() {
+ return proxyToServerWorkerPool;
+ }
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
index 7d7fa5c4d..b4e23823b 100644
--- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
+++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java
@@ -33,7 +33,7 @@
import org.littleshoot.proxy.HttpFilters;
import org.littleshoot.proxy.MitmManager;
import org.littleshoot.proxy.TransportProtocol;
-import org.littleshoot.proxy.UnknownTransportProtocolError;
+import org.littleshoot.proxy.UnknownTransportProtocolException;
import org.slf4j.spi.LocationAwareLogger;
import javax.net.ssl.SSLSession;
@@ -584,8 +584,7 @@ boolean shouldExecuteOnEventLoop() {
@Override
protected Future> execute() {
- Bootstrap cb = new Bootstrap().group(proxyServer
- .getProxyToServerWorkerFor(transportProtocol));
+ Bootstrap cb = new Bootstrap().group(proxyServer.getProxyToServerWorkerFor(transportProtocol));
switch (transportProtocol) {
case TCP:
@@ -603,7 +602,7 @@ public Channel newChannel() {
.option(ChannelOption.SO_REUSEADDR, true);
break;
default:
- throw new UnknownTransportProtocolError(transportProtocol);
+ throw new UnknownTransportProtocolException(transportProtocol);
}
cb.handler(new ChannelInitializer() {
diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
new file mode 100644
index 000000000..ee10e85c5
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java
@@ -0,0 +1,284 @@
+package org.littleshoot.proxy.impl;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.udt.nio.NioUdtProvider;
+import org.littleshoot.proxy.HttpProxyServer;
+import org.littleshoot.proxy.TransportProtocol;
+import org.littleshoot.proxy.UnknownTransportProtocolException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Manages thread pools for one or more proxy server instances. When servers are created, they must register with the
+ * ServerGroup using {@link #registerProxyServer(HttpProxyServer)}, and when they shut down, must unregister with the
+ * ServerGroup using {@link #unregisterProxyServer(HttpProxyServer, boolean)}.
+ */
+public class ServerGroup {
+ private static final Logger log = LoggerFactory.getLogger(ServerGroup.class);
+
+ /**
+ * The default number of threads to accept incoming requests from clients. (Requests are serviced by worker threads,
+ * not acceptor threads.)
+ */
+ public static final int DEFAULT_INCOMING_ACCEPTOR_THREADS = 2;
+
+ /**
+ * The default number of threads to service incoming requests from clients.
+ */
+ public static final int DEFAULT_INCOMING_WORKER_THREADS = 8;
+
+ /**
+ * The default number of threads to service outgoing requests to servers.
+ */
+ public static final int DEFAULT_OUTGOING_WORKER_THREADS = 8;
+
+ /**
+ * Global counter for the {@link #serverGroupId}.
+ */
+ private static final AtomicInteger serverGroupCount = new AtomicInteger(0);
+
+ /**
+ * A name for this ServerGroup to use in naming threads.
+ */
+ private final String name;
+
+ /**
+ * The ID of this server group. Forms part of the name of each thread created for this server group. Useful for
+ * differentiating threads when multiple proxy instances are running.
+ */
+ private final int serverGroupId;
+
+ private final int incomingAcceptorThreads;
+ private final int incomingWorkerThreads;
+ private final int outgoingWorkerThreads;
+
+ /**
+ * List of all servers registered to use this ServerGroup. Any access to this list should be synchronized using the
+ * {@link #SERVER_REGISTRATION_LOCK}.
+ */
+ public final List registeredServers = new ArrayList(1);
+
+ /**
+ * A mapping of {@link TransportProtocol}s to their initialized {@link ProxyThreadPools}. Each transport uses a
+ * different thread pool, since the initialization parameters are different.
+ */
+ private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class);
+
+ /**
+ * A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during
+ * transport protocol initialization.
+ */
+ private static final EnumMap TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS = new EnumMap(TransportProtocol.class);
+ static {
+ TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.put(TransportProtocol.TCP, SelectorProvider.provider());
+ TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.put(TransportProtocol.UDT, NioUdtProvider.BYTE_PROVIDER);
+ }
+
+ /**
+ * True when this ServerGroup is stopped.
+ */
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+ /**
+ * Creates a new ServerGroup instance for a proxy. Threads created for this ServerGroup will have the specified
+ * ServerGroup name in the Thread name. This constructor does not actually initialize any thread pools; instead,
+ * thread pools for specific transport protocols are lazily initialized as needed.
+ *
+ * @param name ServerGroup name to include in thread names
+ * @param incomingAcceptorThreads number of acceptor threads per protocol
+ * @param incomingWorkerThreads number of client-to-proxy worker threads per protocol
+ * @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol
+ */
+ public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) {
+ this.name = name;
+ this.serverGroupId = serverGroupCount.getAndIncrement();
+ this.incomingAcceptorThreads = incomingAcceptorThreads;
+ this.incomingWorkerThreads = incomingWorkerThreads;
+ this.outgoingWorkerThreads = outgoingWorkerThreads;
+ }
+
+ /**
+ * Lock for initializing any transport protocols.
+ */
+ private final Object THREAD_POOL_INIT_LOCK = new Object();
+
+ /**
+ * Retrieves the {@link ProxyThreadPools} for the specified transport protocol. Lazily initializes the thread pools
+ * for the transport protocol if they have not yet been initialized. If the protocol has already been initialized,
+ * this method returns immediately, without synchronization. If initialization is necessary, the initialization
+ * process creates the acceptor and worker threads necessary to service requests to/from the proxy.
+ *
+ * This method is thread-safe; no external locking is necessary.
+ *
+ * @param protocol transport protocol to retrieve thread pools for
+ * @return thread pools for the specified transport protocol
+ */
+ private ProxyThreadPools getThreadPoolsForProtocol(TransportProtocol protocol) {
+ // if the thread pools have not been initialized for this protocol, initialize them
+ if (protocolThreadPools.get(protocol) == null) {
+ synchronized (THREAD_POOL_INIT_LOCK) {
+ if (protocolThreadPools.get(protocol) == null) {
+ log.debug("Initializing thread pools for {} with {} acceptor threads, {} incoming worker threads, and {} outgoing worker threads",
+ protocol, incomingAcceptorThreads, incomingWorkerThreads, outgoingWorkerThreads);
+
+ SelectorProvider selectorProvider = TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.get(protocol);
+ if (selectorProvider == null) {
+ throw new UnknownTransportProtocolException(protocol);
+ }
+
+ ProxyThreadPools threadPools = new ProxyThreadPools(selectorProvider,
+ incomingAcceptorThreads,
+ incomingWorkerThreads,
+ outgoingWorkerThreads,
+ name,
+ serverGroupId);
+ protocolThreadPools.put(protocol, threadPools);
+ }
+ }
+ }
+
+ return protocolThreadPools.get(protocol);
+ }
+
+ /**
+ * Lock controlling access to the {@link #registerProxyServer(HttpProxyServer)} and {@link #unregisterProxyServer(HttpProxyServer, boolean)}
+ * methods.
+ */
+ private final Object SERVER_REGISTRATION_LOCK = new Object();
+
+ /**
+ * Registers the specified proxy server as a consumer of this server group. The server group will not be shut down
+ * until the proxy unregisters itself.
+ *
+ * @param proxyServer proxy server instance to register
+ */
+ public void registerProxyServer(HttpProxyServer proxyServer) {
+ synchronized (SERVER_REGISTRATION_LOCK) {
+ registeredServers.add(proxyServer);
+ }
+ }
+
+ /**
+ * Unregisters the specified proxy server from this server group. If this was the last registered proxy server, the
+ * server group will be shut down.
+ *
+ * @param proxyServer proxy server instance to unregister
+ * @param graceful when true, the server group shutdown (if necessary) will be graceful
+ */
+ public void unregisterProxyServer(HttpProxyServer proxyServer, boolean graceful) {
+ synchronized (SERVER_REGISTRATION_LOCK) {
+ boolean wasRegistered = registeredServers.remove(proxyServer);
+ if (!wasRegistered) {
+ log.warn("Attempted to unregister proxy server from ServerGroup that it was not registered with. Was the proxy unregistered twice?");
+ }
+
+ if (registeredServers.isEmpty()) {
+ log.debug("Proxy server unregistered from ServerGroup. No proxy servers remain registered, so shutting down ServerGroup.");
+
+ shutdown(graceful);
+ } else {
+ log.debug("Proxy server unregistered from ServerGroup. Not shutting down ServerGroup ({} proxy servers remain registered).", registeredServers.size());
+ }
+ }
+ }
+
+ /**
+ * Shuts down all event loops owned by this server group.
+ *
+ * @param graceful when true, event loops will "gracefully" terminate, waiting for submitted tasks to finish
+ */
+ private void shutdown(boolean graceful) {
+ if (!stopped.compareAndSet(false, true)) {
+ log.info("Shutdown requested, but ServerGroup is already stopped. Doing nothing.");
+
+ return;
+ }
+
+ log.info("Shutting down server group event loops " + (graceful ? "(graceful)" : "(non-graceful)"));
+
+ // loop through all event loops managed by this server group. this includes acceptor and worker event loops
+ // for both TCP and UDP transport protocols.
+ List allEventLoopGroups = new ArrayList();
+
+ for (ProxyThreadPools threadPools : protocolThreadPools.values()) {
+ allEventLoopGroups.addAll(threadPools.getAllEventLoops());
+ }
+
+ for (EventLoopGroup group : allEventLoopGroups) {
+ if (graceful) {
+ group.shutdownGracefully();
+ } else {
+ group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
+ }
+ }
+
+ if (graceful) {
+ for (EventLoopGroup group : allEventLoopGroups) {
+ try {
+ group.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ log.warn("Interrupted while shutting down event loop");
+ }
+ }
+ }
+
+ log.debug("Done shutting down server group");
+ }
+
+ /**
+ * Retrieves the client-to-proxy acceptor thread pool for the specified protocol. Initializes the pool if it has not
+ * yet been initialized.
+ *
+ * This method is thread-safe; no external locking is necessary.
+ *
+ * @param protocol transport protocol to retrieve the thread pool for
+ * @return the client-to-proxy acceptor thread pool
+ */
+ public EventLoopGroup getClientToProxyAcceptorPoolForTransport(TransportProtocol protocol) {
+ return getThreadPoolsForProtocol(protocol).getClientToProxyAcceptorPool();
+ }
+
+ /**
+ * Retrieves the client-to-proxy acceptor worker pool for the specified protocol. Initializes the pool if it has not
+ * yet been initialized.
+ *
+ * This method is thread-safe; no external locking is necessary.
+ *
+ * @param protocol transport protocol to retrieve the thread pool for
+ * @return the client-to-proxy worker thread pool
+ */
+ public EventLoopGroup getClientToProxyWorkerPoolForTransport(TransportProtocol protocol) {
+ return getThreadPoolsForProtocol(protocol).getClientToProxyWorkerPool();
+ }
+
+ /**
+ * Retrieves the proxy-to-server worker thread pool for the specified protocol. Initializes the pool if it has not
+ * yet been initialized.
+ *
+ * This method is thread-safe; no external locking is necessary.
+ *
+ * @param protocol transport protocol to retrieve the thread pool for
+ * @return the proxy-to-server worker thread pool
+ */
+ public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol protocol) {
+ return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool();
+ }
+
+ /**
+ * @return true if this ServerGroup has already been stopped
+ */
+ public boolean isStopped() {
+ return stopped.get();
+ }
+
+}
diff --git a/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java b/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java
new file mode 100644
index 000000000..b8e7b0582
--- /dev/null
+++ b/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java
@@ -0,0 +1,62 @@
+package org.littleshoot.proxy.impl;
+
+/**
+ * Configuration object for the proxy's thread pools. Controls the number of acceptor and worker threads in the Netty
+ * {@link io.netty.channel.EventLoopGroup} used by the proxy.
+ */
+public class ThreadPoolConfiguration {
+ private int acceptorThreads = ServerGroup.DEFAULT_INCOMING_ACCEPTOR_THREADS;
+ private int clientToProxyWorkerThreads = ServerGroup.DEFAULT_INCOMING_WORKER_THREADS;
+ private int proxyToServerWorkerThreads = ServerGroup.DEFAULT_OUTGOING_WORKER_THREADS;
+
+ public int getClientToProxyWorkerThreads() {
+ return clientToProxyWorkerThreads;
+ }
+
+ /**
+ * Set the number of client-to-proxy worker threads to create. Worker threads perform the actual processing of
+ * client requests. The default value is {@link ServerGroup#DEFAULT_INCOMING_WORKER_THREADS}.
+ *
+ * @param clientToProxyWorkerThreads number of client-to-proxy worker threads to create
+ * @return this thread pool configuration instance, for chaining
+ */
+ public ThreadPoolConfiguration withClientToProxyWorkerThreads(int clientToProxyWorkerThreads) {
+ this.clientToProxyWorkerThreads = clientToProxyWorkerThreads;
+ return this;
+ }
+
+ public int getAcceptorThreads() {
+ return acceptorThreads;
+ }
+
+ /**
+ * Set the number of acceptor threads to create. Acceptor threads accept HTTP connections from the client and queue
+ * them for processing by client-to-proxy worker threads. The default value is
+ * {@link ServerGroup#DEFAULT_INCOMING_ACCEPTOR_THREADS}.
+ *
+ * @param acceptorThreads number of acceptor threads to create
+ * @return this thread pool configuration instance, for chaining
+ */
+ public ThreadPoolConfiguration withAcceptorThreads(int acceptorThreads) {
+ this.acceptorThreads = acceptorThreads;
+ return this;
+ }
+
+ public int getProxyToServerWorkerThreads() {
+ return proxyToServerWorkerThreads;
+ }
+
+ /**
+ * Set the number of proxy-to-server worker threads to create. Proxy-to-server worker threads make requests to
+ * upstream servers and process responses from the server. The default value is
+ * {@link ServerGroup#DEFAULT_OUTGOING_WORKER_THREADS}.
+ *
+ * @param proxyToServerWorkerThreads number of proxy-to-server worker threads to create
+ * @return this thread pool configuration instance, for chaining
+ */
+ public ThreadPoolConfiguration withProxyToServerWorkerThreads(int proxyToServerWorkerThreads) {
+ this.proxyToServerWorkerThreads = proxyToServerWorkerThreads;
+ return this;
+ }
+
+}
diff --git a/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java b/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java
new file mode 100644
index 000000000..3abbc939e
--- /dev/null
+++ b/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java
@@ -0,0 +1,120 @@
+package org.littleshoot.proxy;
+
+import org.apache.http.HttpResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
+import org.littleshoot.proxy.test.HttpClientUtil;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class ClonedProxyTest {
+ private ClientAndServer mockServer;
+ private int mockServerPort;
+
+ private HttpProxyServer originalProxy;
+ private HttpProxyServer clonedProxy;
+
+ @Before
+ public void setUp() {
+ mockServer = new ClientAndServer(0);
+ mockServerPort = mockServer.getPort();
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ } finally {
+ try {
+ if (originalProxy != null) {
+ originalProxy.abort();
+ }
+ } finally {
+ if (clonedProxy != null) {
+ clonedProxy.abort();
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testClonedProxyHandlesRequests() {
+ originalProxy = DefaultHttpProxyServer.bootstrap()
+ .withPort(0)
+ .withName("original")
+ .start();
+ clonedProxy = originalProxy.clone()
+ .withName("clone")
+ .start();
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath("/testClonedProxyHandlesRequests"),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("success")
+ );
+
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", clonedProxy);
+ assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode());
+ }
+
+ @Test
+ public void testStopClonedProxyDoesNotStopOriginalServer() {
+ originalProxy = DefaultHttpProxyServer.bootstrap()
+ .withPort(0)
+ .withName("original")
+ .start();
+ clonedProxy = originalProxy.clone()
+ .withName("clone")
+ .start();
+
+ clonedProxy.abort();
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath("/testClonedProxyHandlesRequests"),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("success")
+ );
+
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", originalProxy);
+ assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode());
+ }
+
+ @Test
+ public void testStopOriginalServerDoesNotStopClonedServer() {
+ originalProxy = DefaultHttpProxyServer.bootstrap()
+ .withPort(0)
+ .withName("original")
+ .start();
+ clonedProxy = originalProxy.clone()
+ .withName("clone")
+ .start();
+
+ originalProxy.abort();
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath("/testClonedProxyHandlesRequests"),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("success")
+ );
+
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", clonedProxy);
+ assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode());
+ }
+}
diff --git a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
new file mode 100644
index 000000000..629a47c2f
--- /dev/null
+++ b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java
@@ -0,0 +1,153 @@
+package org.littleshoot.proxy;
+
+import io.netty.handler.codec.http.HttpObject;
+import io.netty.handler.codec.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.littleshoot.proxy.impl.DefaultHttpProxyServer;
+import org.littleshoot.proxy.impl.ThreadPoolConfiguration;
+import org.littleshoot.proxy.test.HttpClientUtil;
+import org.mockserver.integration.ClientAndServer;
+import org.mockserver.matchers.Times;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockserver.model.HttpRequest.request;
+import static org.mockserver.model.HttpResponse.response;
+
+public class ServerGroupTest {
+ private ClientAndServer mockServer;
+ private int mockServerPort;
+
+ private HttpProxyServer proxyServer;
+
+ @Before
+ public void setUp() {
+ mockServer = new ClientAndServer(0);
+ mockServerPort = mockServer.getPort();
+ }
+
+ @After
+ public void tearDown() {
+ try {
+ if (mockServer != null) {
+ mockServer.stop();
+ }
+ } finally {
+ if (proxyServer != null) {
+ proxyServer.abort();
+ }
+ }
+ }
+
+ @Test
+ public void testSingleWorkerThreadPoolConfiguration() throws ExecutionException, InterruptedException {
+ final String firstRequestPath = "/testSingleThreadFirstRequest";
+ final String secondRequestPath = "/testSingleThreadSecondRequest";
+
+ // set up two server responses that will execute more or less simultaneously. the first request has a small
+ // delay, to reduce the chance that the first request will finish entirely before the second request is finished
+ // (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is
+ // not behaving properly).
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(firstRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("first")
+ .withDelay(TimeUnit.MILLISECONDS, 500)
+ );
+
+ mockServer.when(request()
+ .withMethod("GET")
+ .withPath(secondRequestPath),
+ Times.exactly(1))
+ .respond(response()
+ .withStatusCode(200)
+ .withBody("second")
+ );
+
+ // save the names of the threads that execute the filter methods. filter methods are executed by the worker thread
+ // handling the request/response, so if there is only one worker thread, the filter methods should be executed
+ // by the same thread.
+ final AtomicReference firstClientThreadName = new AtomicReference();
+ final AtomicReference secondClientThreadName = new AtomicReference();
+
+ final AtomicReference firstProxyThreadName = new AtomicReference();
+ final AtomicReference secondProxyThreadName = new AtomicReference();
+
+ proxyServer = DefaultHttpProxyServer.bootstrap()
+ .withPort(0)
+ .withFiltersSource(new HttpFiltersSourceAdapter() {
+ @Override
+ public HttpFilters filterRequest(HttpRequest originalRequest) {
+ return new HttpFiltersAdapter(originalRequest) {
+ @Override
+ public io.netty.handler.codec.http.HttpResponse clientToProxyRequest(HttpObject httpObject) {
+ if (originalRequest.getUri().endsWith(firstRequestPath)) {
+ firstClientThreadName.set(Thread.currentThread().getName());
+ } else if (originalRequest.getUri().endsWith(secondRequestPath)) {
+ secondClientThreadName.set(Thread.currentThread().getName());
+ }
+
+ return super.clientToProxyRequest(httpObject);
+ }
+
+ @Override
+ public void serverToProxyResponseReceived() {
+ if (originalRequest.getUri().endsWith(firstRequestPath)) {
+ firstProxyThreadName.set(Thread.currentThread().getName());
+ } else if (originalRequest.getUri().endsWith(secondRequestPath)) {
+ secondProxyThreadName.set(Thread.currentThread().getName());
+ }
+ }
+ };
+ }
+ })
+ .withThreadPoolConfiguration(new ThreadPoolConfiguration()
+ .withAcceptorThreads(1)
+ .withClientToProxyWorkerThreads(1)
+ .withProxyToServerWorkerThreads(1))
+ .start();
+
+ // execute both requests in parallel, to increase the chance of blocking due to the single-threaded ThreadPoolConfiguration
+
+ Runnable firstRequest = new Runnable() {
+ @Override
+ public void run() {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+ };
+
+ Runnable secondRequest = new Runnable () {
+ @Override
+ public void run() {
+ HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer);
+ assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+ };
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ Future> firstFuture = executor.submit(firstRequest);
+ Future> secondFuture = executor.submit(secondRequest);
+
+ firstFuture.get();
+ secondFuture.get();
+
+ Thread.sleep(500);
+
+ assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get());
+ assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get());
+ }
+
+}