From 34bb94843b7eb6c1407a0728e39f38fb90b4f62c Mon Sep 17 00:00:00 2001 From: Jason Eric Klaes Hoetger Date: Fri, 4 Sep 2015 18:25:36 -0700 Subject: [PATCH] Refactored ServerGroup. Made acceptor and worker thread pools configurable. Eliminated synchronization when servicing requests. Extracted static inner classes. --- .../littleshoot/proxy/HttpProxyServer.java | 5 +- .../proxy/HttpProxyServerBootstrap.java | 9 + .../proxy/UnknownTransportProtocolError.java | 14 - .../UnknownTransportProtocolException.java | 12 + .../proxy/impl/CategorizedThreadFactory.java | 52 ++ .../proxy/impl/DefaultHttpProxyServer.java | 508 +++++++----------- .../proxy/impl/ProxyThreadPools.java | 64 +++ .../proxy/impl/ProxyToServerConnection.java | 7 +- .../littleshoot/proxy/impl/ServerGroup.java | 284 ++++++++++ .../proxy/impl/ThreadPoolConfiguration.java | 62 +++ .../littleshoot/proxy/ClonedProxyTest.java | 120 +++++ .../littleshoot/proxy/ServerGroupTest.java | 153 ++++++ 12 files changed, 956 insertions(+), 334 deletions(-) delete mode 100644 src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java create mode 100644 src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java create mode 100644 src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java create mode 100644 src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java create mode 100644 src/main/java/org/littleshoot/proxy/impl/ServerGroup.java create mode 100644 src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java create mode 100644 src/test/java/org/littleshoot/proxy/ClonedProxyTest.java create mode 100644 src/test/java/org/littleshoot/proxy/ServerGroupTest.java diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServer.java b/src/main/java/org/littleshoot/proxy/HttpProxyServer.java index 7014ce924..5f2a57832 100644 --- a/src/main/java/org/littleshoot/proxy/HttpProxyServer.java +++ b/src/main/java/org/littleshoot/proxy/HttpProxyServer.java @@ -14,13 +14,14 @@ public interface HttpProxyServer { /** *

* Clone the existing server, with a port 1 higher and everything else the - * same. + * same. If the proxy was started with port 0 (JVM-assigned port), the cloned proxy will also use a JVM-assigned + * port. *

* *

* The new server will share event loops with the original server. The event * loops will use whatever name was given to the first server in the clone - * group. + * group. The server group will not terminate until the original server and all clones terminate. *

* * @return a bootstrap that allows customizing and starting the cloned diff --git a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java index c4241e7ad..cad274a84 100644 --- a/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java +++ b/src/main/java/org/littleshoot/proxy/HttpProxyServerBootstrap.java @@ -1,5 +1,7 @@ package org.littleshoot.proxy; +import org.littleshoot.proxy.impl.ThreadPoolConfiguration; + import java.net.InetSocketAddress; /** @@ -309,4 +311,11 @@ HttpProxyServerBootstrap withConnectTimeout( */ HttpProxyServer start(); + /** + * Set the configuration parameters for the proxy's thread pools. + * + * @param configuration thread pool configuration + * @return proxy server bootstrap for chaining + */ + HttpProxyServerBootstrap withThreadPoolConfiguration(ThreadPoolConfiguration configuration); } \ No newline at end of file diff --git a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java deleted file mode 100644 index 9efe836b9..000000000 --- a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolError.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.littleshoot.proxy; - -/** - * This error indicates that the system was asked to use a TransportProtocol - * that it didn't know how to handle. - */ -public class UnknownTransportProtocolError extends Error { - private static final long serialVersionUID = 1L; - - public UnknownTransportProtocolError(TransportProtocol transportProtocol) { - super(String.format("Unknown TransportProtocol: %1$s", - transportProtocol)); - } -} diff --git a/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java new file mode 100644 index 000000000..b264b818b --- /dev/null +++ b/src/main/java/org/littleshoot/proxy/UnknownTransportProtocolException.java @@ -0,0 +1,12 @@ +package org.littleshoot.proxy; + +/** + * This exception indicates that the system was asked to use a TransportProtocol that it didn't know how to handle. + */ +public class UnknownTransportProtocolException extends RuntimeException { + private static final long serialVersionUID = 1L; + + public UnknownTransportProtocolException(TransportProtocol transportProtocol) { + super(String.format("Unknown TransportProtocol: %1$s", transportProtocol)); + } +} diff --git a/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java b/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java new file mode 100644 index 000000000..8ddbc39e1 --- /dev/null +++ b/src/main/java/org/littleshoot/proxy/impl/CategorizedThreadFactory.java @@ -0,0 +1,52 @@ +package org.littleshoot.proxy.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * A ThreadFactory that adds LittleProxy-specific information to the threads' names. + */ +public class CategorizedThreadFactory implements ThreadFactory { + private static final Logger log = LoggerFactory.getLogger(CategorizedThreadFactory.class); + + private final String name; + private final String category; + private final int uniqueServerGroupId; + + private AtomicInteger threadCount = new AtomicInteger(0); + + /** + * Exception handler for proxy threads. Logs the name of the thread and the exception that was caught. + */ + private static final Thread.UncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + log.error("Uncaught throwable in thread: {}", t.getName(), e); + } + }; + + + /** + * @param name the user-supplied name of this proxy + * @param category the type of threads this factory is creating (acceptor, client-to-proxy worker, proxy-to-server worker) + * @param uniqueServerGroupId a unique number for the server group creating this thread factory, to differentiate multiple proxy instances with the same name + */ + public CategorizedThreadFactory(String name, String category, int uniqueServerGroupId) { + this.category = category; + this.name = name; + this.uniqueServerGroupId = uniqueServerGroupId; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, name + "-" + uniqueServerGroupId + "-" + category + "-" + threadCount.getAndIncrement()); + + t.setUncaughtExceptionHandler(UNCAUGHT_EXCEPTION_HANDLER); + + return t; + } + +} diff --git a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java index 05e5d2658..3c8afe663 100644 --- a/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java +++ b/src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java @@ -1,6 +1,5 @@ package org.littleshoot.proxy.impl; -import static org.littleshoot.proxy.TransportProtocol.*; import io.netty.bootstrap.ChannelFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; @@ -13,33 +12,10 @@ import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.udt.nio.NioUdtProvider; import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.util.concurrent.GlobalEventExecutor; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.lang.Thread.UncaughtExceptionHandler; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.nio.channels.spi.SelectorProvider; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLEngine; - import org.apache.commons.io.IOUtils; import org.littleshoot.proxy.ActivityTracker; import org.littleshoot.proxy.ChainedProxyManager; @@ -55,21 +31,34 @@ import org.littleshoot.proxy.ProxyAuthenticator; import org.littleshoot.proxy.SslEngineSource; import org.littleshoot.proxy.TransportProtocol; -import org.littleshoot.proxy.UnknownTransportProtocolError; +import org.littleshoot.proxy.UnknownTransportProtocolException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + /** *

* Primary implementation of an {@link HttpProxyServer}. *

- * + * *

* {@link DefaultHttpProxyServer} is bootstrapped by calling * {@link #bootstrap()} or {@link #bootstrapFromFile(String)}, and then calling * {@link DefaultHttpProxyServerBootstrap#start()}. For example: *

- * + * *
  * DefaultHttpProxyServer server =
  *         DefaultHttpProxyServer
@@ -77,7 +66,7 @@
  *                 .withPort(8090)
  *                 .start();
  * 
- * + * */ public class DefaultHttpProxyServer implements HttpProxyServer { @@ -87,8 +76,7 @@ public class DefaultHttpProxyServer implements HttpProxyServer { */ private static final long TRAFFIC_SHAPING_CHECK_INTERVAL_MS = 250L; - private static final Logger LOG = LoggerFactory - .getLogger(DefaultHttpProxyServer.class); + private static final Logger LOG = LoggerFactory.getLogger(DefaultHttpProxyServer.class); /** * Our {@link ServerGroup}. Multiple proxy servers can share the same @@ -119,14 +107,35 @@ public class DefaultHttpProxyServer implements HttpProxyServer { private final HostResolver serverResolver; private volatile GlobalTrafficShapingHandler globalTrafficShapingHandler; + /** + * True when the proxy has already been stopped by calling {@link #stop()} or {@link #abort()}. + */ + private final AtomicBoolean stopped = new AtomicBoolean(false); + /** * Track all ActivityTrackers for tracking proxying activity. */ private final Collection activityTrackers = new ConcurrentLinkedQueue(); + /** + * Keep track of all channels created by this proxy server for later shutdown when the proxy is stopped. + */ + private final ChannelGroup allChannels = new DefaultChannelGroup("HTTP-Proxy-Server", GlobalEventExecutor.INSTANCE); + + /** + * JVM shutdown hook to shutdown this proxy server. Declared as a class-level variable to allow removing the shutdown hook when the + * proxy server is stopped normally. + */ + private final Thread jvmShutdownHook = new Thread(new Runnable() { + @Override + public void run() { + abort(); + } + }, "LittleProxy-JVM-shutdown-hook"); + /** * Bootstrap a new {@link DefaultHttpProxyServer} starting from scratch. - * + * * @return */ public static HttpProxyServerBootstrap bootstrap() { @@ -136,7 +145,7 @@ public static HttpProxyServerBootstrap bootstrap() { /** * Bootstrap a new {@link DefaultHttpProxyServer} using defaults from the * given file. - * + * * @param path * @return */ @@ -161,7 +170,7 @@ public static HttpProxyServerBootstrap bootstrapFromFile(String path) { /** * Creates a new proxy server. - * + * * @param serverGroup * our ServerGroup for shared thread pools and such * @param transportProtocol @@ -204,22 +213,22 @@ public static HttpProxyServerBootstrap bootstrapFromFile(String path) { * write throttle bandwidth */ private DefaultHttpProxyServer(ServerGroup serverGroup, - TransportProtocol transportProtocol, - InetSocketAddress requestedAddress, - SslEngineSource sslEngineSource, - boolean authenticateSslClients, - ProxyAuthenticator proxyAuthenticator, - ChainedProxyManager chainProxyManager, - MitmManager mitmManager, - HttpFiltersSource filtersSource, - boolean transparent, - int idleConnectionTimeout, - Collection activityTrackers, - int connectTimeout, - HostResolver serverResolver, - long readThrottleBytesPerSecond, - long writeThrottleBytesPerSecond, - InetSocketAddress localAddress) { + TransportProtocol transportProtocol, + InetSocketAddress requestedAddress, + SslEngineSource sslEngineSource, + boolean authenticateSslClients, + ProxyAuthenticator proxyAuthenticator, + ChainedProxyManager chainProxyManager, + MitmManager mitmManager, + HttpFiltersSource filtersSource, + boolean transparent, + int idleConnectionTimeout, + Collection activityTrackers, + int connectTimeout, + HostResolver serverResolver, + long readThrottleBytesPerSecond, + long writeThrottleBytesPerSecond, + InetSocketAddress localAddress) { this.serverGroup = serverGroup; this.transportProtocol = transportProtocol; this.requestedAddress = requestedAddress; @@ -314,54 +323,123 @@ public long getWriteThrottle() { @Override public HttpProxyServerBootstrap clone() { - return new DefaultHttpProxyServerBootstrap(this, transportProtocol, + return new DefaultHttpProxyServerBootstrap(serverGroup, + transportProtocol, new InetSocketAddress(requestedAddress.getAddress(), requestedAddress.getPort() == 0 ? 0 : requestedAddress.getPort() + 1), - sslEngineSource, - authenticateSslClients, - proxyAuthenticator, - chainProxyManager, - mitmManager, - filtersSource, - transparent, - idleConnectionTimeout, - activityTrackers, - connectTimeout, - serverResolver, - globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getReadLimit() : 0, - globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getWriteLimit() : 0, - localAddress); + sslEngineSource, + authenticateSslClients, + proxyAuthenticator, + chainProxyManager, + mitmManager, + filtersSource, + transparent, + idleConnectionTimeout, + activityTrackers, + connectTimeout, + serverResolver, + globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getReadLimit() : 0, + globalTrafficShapingHandler != null ? globalTrafficShapingHandler.getWriteLimit() : 0, + localAddress); } @Override public void stop() { - serverGroup.stop(true); + doStop(true); } @Override public void abort() { - serverGroup.stop(false); + doStop(false); } - private HttpProxyServer start() { - LOG.info("Starting proxy at address: " + this.requestedAddress); - - synchronized (serverGroup) { - if (!serverGroup.stopped) { - doStart(); + /** + * Performs cleanup necessary to stop the server. Closes all channels opened by the server and unregisters this + * server from the server group. + * + * @param graceful when true, waits for requests to terminate before stopping the server + */ + protected void doStop(boolean graceful) { + // only stop the server if it hasn't already been stopped + if (stopped.compareAndSet(false, true)) { + if (graceful) { + LOG.info("Shutting down proxy server gracefully"); } else { - throw new Error("Already stopped"); + LOG.info("Shutting down proxy server immediately (non-graceful)"); } + + closeAllChannels(graceful); + + serverGroup.unregisterProxyServer(this, graceful); + + // remove the shutdown hook that was added when the proxy was started, since it has now been stopped + try { + Runtime.getRuntime().removeShutdownHook(jvmShutdownHook); + } catch (IllegalStateException e) { + // ignore -- IllegalStateException means the VM is already shutting down + } + + LOG.info("Done shutting down proxy server"); + } + } + + /** + * Register a new {@link Channel} with this server, for later closing. + * + * @param channel + */ + protected void registerChannel(Channel channel) { + allChannels.add(channel); + } + + /** + * Closes all channels opened by this proxy server. + * + * @param graceful when false, attempts to shutdown all channels immediately and ignores any channel-closing exceptions + */ + protected void closeAllChannels(boolean graceful) { + LOG.info("Closing all channels " + (graceful ? "(graceful)" : "(non-graceful)")); + + ChannelGroupFuture future = allChannels.close(); + + // if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them. + if (graceful) { + try { + future.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + LOG.warn("Interrupted while waiting for channels to shut down gracefully."); + } + + if (!future.isSuccess()) { + for (ChannelFuture cf : future) { + if (!cf.isSuccess()) { + LOG.info("Unable to close channel. Cause of failure for {} is {}", cf.channel(), cf.cause()); + } + } + } + } + } + + private HttpProxyServer start() { + if (!serverGroup.isStopped()) { + LOG.info("Starting proxy at address: " + this.requestedAddress); + + serverGroup.registerProxyServer(this); + + doStart(); + } else { + throw new IllegalStateException("Attempted to start proxy, but proxy's server group is already stopped"); } return this; } private void doStart() { - serverGroup.ensureProtocol(transportProtocol); ServerBootstrap serverBootstrap = new ServerBootstrap().group( - serverGroup.clientToProxyBossPools.get(transportProtocol), - serverGroup.clientToProxyWorkerPools.get(transportProtocol)); + serverGroup.getClientToProxyAcceptorPoolForTransport(transportProtocol), + serverGroup.getClientToProxyWorkerPoolForTransport(transportProtocol)); ChannelInitializer initializer = new ChannelInitializer() { protected void initChannel(Channel ch) throws Exception { @@ -374,23 +452,23 @@ protected void initChannel(Channel ch) throws Exception { }; }; switch (transportProtocol) { - case TCP: - LOG.info("Proxy listening with TCP transport"); - serverBootstrap.channelFactory(new ChannelFactory() { - @Override - public ServerChannel newChannel() { - return new NioServerSocketChannel(); - } - }); - break; - case UDT: - LOG.info("Proxy listening with UDT transport"); - serverBootstrap.channelFactory(NioUdtProvider.BYTE_ACCEPTOR) - .option(ChannelOption.SO_BACKLOG, 10) - .option(ChannelOption.SO_REUSEADDR, true); - break; - default: - throw new UnknownTransportProtocolError(transportProtocol); + case TCP: + LOG.info("Proxy listening with TCP transport"); + serverBootstrap.channelFactory(new ChannelFactory() { + @Override + public ServerChannel newChannel() { + return new NioServerSocketChannel(); + } + }); + break; + case UDT: + LOG.info("Proxy listening with UDT transport"); + serverBootstrap.channelFactory(NioUdtProvider.BYTE_ACCEPTOR) + .option(ChannelOption.SO_BACKLOG, 10) + .option(ChannelOption.SO_REUSEADDR, true); + break; + default: + throw new UnknownTransportProtocolException(transportProtocol); } serverBootstrap.childHandler(initializer); ChannelFuture future = serverBootstrap.bind(requestedAddress) @@ -411,15 +489,8 @@ public void operationComplete(ChannelFuture future) this.boundAddress = ((InetSocketAddress) future.channel().localAddress()); LOG.info("Proxy started at address: " + this.boundAddress); - } - /** - * Register a new {@link Channel} with this server, for later closing. - * - * @param channel - */ - protected void registerChannel(Channel channel) { - this.serverGroup.allChannels.add(channel); + Runtime.getRuntime().addShutdownHook(jvmShutdownHook); } protected ChainedProxyManager getChainProxyManager() { @@ -446,216 +517,15 @@ protected Collection getActivityTrackers() { return activityTrackers; } - protected EventLoopGroup getProxyToServerWorkerFor( - TransportProtocol transportProtocol) { - synchronized (serverGroup) { - serverGroup.ensureProtocol(transportProtocol); - return serverGroup.proxyToServerWorkerPools.get(transportProtocol); - } - } - - /** - * Represents a group of servers that share thread pools. - */ - private static class ServerGroup { - private static final int INCOMING_ACCEPTOR_THREADS = 2; - private static final int INCOMING_WORKER_THREADS = 8; - private static final int OUTGOING_WORKER_THREADS = 8; - - /** - * A name for this ServerGroup to use in naming threads. - */ - private final String name; - - /** - * Keep track of all channels for later shutdown. - */ - private final ChannelGroup allChannels = new DefaultChannelGroup( - "HTTP-Proxy-Server", GlobalEventExecutor.INSTANCE); - - /** - * These {@link EventLoopGroup}s accept incoming connections to the - * proxies. A different EventLoopGroup is used for each - * TransportProtocol, since these have to be configured differently. - * - * Thread safety: Only accessed while synchronized on the server group. - */ - private final Map clientToProxyBossPools = new HashMap(); - - /** - * These {@link EventLoopGroup}s process incoming requests to the - * proxies. A different EventLoopGroup is used for each - * TransportProtocol, since these have to be configured differently. - * - * Thread safety: Only accessed while synchronized on the server group. - * * - */ - private final Map clientToProxyWorkerPools = new HashMap(); - - /** - * These {@link EventLoopGroup}s are used for making outgoing - * connections to servers. A different EventLoopGroup is used for each - * TransportProtocol, since these have to be configured differently. - */ - private final Map proxyToServerWorkerPools = new HashMap(); - - private volatile boolean stopped = false; - - /** - * JVM shutdown hook to stop this server group. Declared as a class-level variable to allow removing the shutdown hook when the - * server is stopped normally. - */ - private final Thread serverGroupShutdownHook = new Thread(new Runnable() { - @Override - public void run() { - stop(false); - } - }); - - private ServerGroup(String name) { - this.name = name; - - Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { - public void uncaughtException(final Thread t, final Throwable e) { - LOG.error("Uncaught throwable", e); - } - }); - - Runtime.getRuntime().addShutdownHook(serverGroupShutdownHook); - } - - public synchronized void ensureProtocol( - TransportProtocol transportProtocol) { - if (!clientToProxyWorkerPools.containsKey(transportProtocol)) { - initializeTransport(transportProtocol); - } - } - - private void initializeTransport(TransportProtocol transportProtocol) { - SelectorProvider selectorProvider = null; - switch (transportProtocol) { - case TCP: - selectorProvider = SelectorProvider.provider(); - break; - case UDT: - selectorProvider = NioUdtProvider.BYTE_PROVIDER; - break; - default: - throw new UnknownTransportProtocolError(transportProtocol); - } - - NioEventLoopGroup inboundAcceptorGroup = new NioEventLoopGroup( - INCOMING_ACCEPTOR_THREADS, - new CategorizedThreadFactory("ClientToProxyAcceptor"), - selectorProvider); - NioEventLoopGroup inboundWorkerGroup = new NioEventLoopGroup( - INCOMING_WORKER_THREADS, - new CategorizedThreadFactory("ClientToProxyWorker"), - selectorProvider); - inboundWorkerGroup.setIoRatio(90); - NioEventLoopGroup outboundWorkerGroup = new NioEventLoopGroup( - OUTGOING_WORKER_THREADS, - new CategorizedThreadFactory("ProxyToServerWorker"), - selectorProvider); - outboundWorkerGroup.setIoRatio(90); - this.clientToProxyBossPools.put(transportProtocol, - inboundAcceptorGroup); - this.clientToProxyWorkerPools.put(transportProtocol, - inboundWorkerGroup); - this.proxyToServerWorkerPools.put(transportProtocol, - outboundWorkerGroup); - } - - synchronized private void stop(boolean graceful) { - if (graceful) { - LOG.info("Shutting down proxy gracefully"); - } else { - LOG.info("Shutting down proxy immediately (non-graceful)"); - } - - if (stopped) { - LOG.info("Already stopped"); - return; - } - - LOG.info("Closing all channels..."); - - final ChannelGroupFuture future = allChannels.close(); - - // if this is a graceful shutdown, log any channel closing failures. if this isn't a graceful shutdown, ignore them. - if (graceful) { - future.awaitUninterruptibly(10 * 1000); - - if (!future.isSuccess()) { - final Iterator iter = future.iterator(); - while (iter.hasNext()) { - final ChannelFuture cf = iter.next(); - if (!cf.isSuccess()) { - LOG.info( - "Unable to close channel. Cause of failure for {} is {}", - cf.channel(), - cf.cause()); - } - } - } - } - - LOG.info("Shutting down event loops"); - List allEventLoopGroups = new ArrayList(); - allEventLoopGroups.addAll(clientToProxyBossPools.values()); - allEventLoopGroups.addAll(clientToProxyWorkerPools.values()); - allEventLoopGroups.addAll(proxyToServerWorkerPools.values()); - for (EventLoopGroup group : allEventLoopGroups) { - if (graceful) { - group.shutdownGracefully(); - } else { - group.shutdownGracefully(0, 0, TimeUnit.SECONDS); - } - } - - if (graceful) { - for (EventLoopGroup group : allEventLoopGroups) { - try { - group.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - LOG.warn("Interrupted while shutting down event loop"); - } - } - } - - // remove the shutdown hook that was added when the server group was started, since it has now been stopped - try { - Runtime.getRuntime().removeShutdownHook(serverGroupShutdownHook); - } catch (IllegalStateException e) { - // ignore -- IllegalStateException means the VM is already shutting down - } - - stopped = true; - - LOG.info("Done shutting down proxy"); - } - - private class CategorizedThreadFactory implements ThreadFactory { - private String category; - private int num = 0; - - public CategorizedThreadFactory(String category) { - super(); - this.category = category; - } - - public Thread newThread(final Runnable r) { - final Thread t = new Thread(r, - name + "-" + category + "-" + num++); - return t; - } - } + protected EventLoopGroup getProxyToServerWorkerFor(TransportProtocol transportProtocol) { + return serverGroup.getProxyToServerWorkerPoolForTransport(transportProtocol); } - private static class DefaultHttpProxyServerBootstrap implements - HttpProxyServerBootstrap { + // TODO: refactor bootstrap into a separate class + private static class DefaultHttpProxyServerBootstrap implements HttpProxyServerBootstrap { private String name = "LittleProxy"; - private TransportProtocol transportProtocol = TCP; + private ServerGroup serverGroup = null; + private TransportProtocol transportProtocol = TransportProtocol.TCP; private InetSocketAddress requestedAddress; private int port = 8080; private boolean allowLocalOnly = true; @@ -668,19 +538,21 @@ private static class DefaultHttpProxyServerBootstrap implements private HttpFiltersSource filtersSource = new HttpFiltersSourceAdapter(); private boolean transparent = false; private int idleConnectionTimeout = 70; - private DefaultHttpProxyServer original; private Collection activityTrackers = new ConcurrentLinkedQueue(); private int connectTimeout = 40000; private HostResolver serverResolver = new DefaultHostResolver(); private long readThrottleBytesPerSecond; private long writeThrottleBytesPerSecond; private InetSocketAddress localAddress; + private int clientToProxyAcceptorThreads = ServerGroup.DEFAULT_INCOMING_ACCEPTOR_THREADS; + private int clientToProxyWorkerThreads = ServerGroup.DEFAULT_INCOMING_WORKER_THREADS; + private int proxyToServerWorkerThreads = ServerGroup.DEFAULT_OUTGOING_WORKER_THREADS; private DefaultHttpProxyServerBootstrap() { } private DefaultHttpProxyServerBootstrap( - DefaultHttpProxyServer original, + ServerGroup serverGroup, TransportProtocol transportProtocol, InetSocketAddress requestedAddress, SslEngineSource sslEngineSource, @@ -695,7 +567,7 @@ private DefaultHttpProxyServerBootstrap( long readThrottleBytesPerSecond, long writeThrottleBytesPerSecond, InetSocketAddress localAddress) { - this.original = original; + this.serverGroup = serverGroup; this.transportProtocol = transportProtocol; this.requestedAddress = requestedAddress; this.port = requestedAddress.getPort(); @@ -881,14 +753,22 @@ public HttpProxyServer start() { return build().start(); } + @Override + public HttpProxyServerBootstrap withThreadPoolConfiguration(ThreadPoolConfiguration configuration) { + this.clientToProxyAcceptorThreads = configuration.getAcceptorThreads(); + this.clientToProxyWorkerThreads = configuration.getClientToProxyWorkerThreads(); + this.proxyToServerWorkerThreads = configuration.getProxyToServerWorkerThreads(); + return this; + } + private DefaultHttpProxyServer build() { final ServerGroup serverGroup; - if (original != null) { - serverGroup = original.serverGroup; + if (this.serverGroup != null) { + serverGroup = this.serverGroup; } else { - serverGroup = new ServerGroup(name); + serverGroup = new ServerGroup(name, clientToProxyAcceptorThreads, clientToProxyWorkerThreads, proxyToServerWorkerThreads); } return new DefaultHttpProxyServer(serverGroup, diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java b/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java new file mode 100644 index 000000000..2ca4f0fd9 --- /dev/null +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyThreadPools.java @@ -0,0 +1,64 @@ +package org.littleshoot.proxy.impl; + +import com.google.common.collect.ImmutableList; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.nio.channels.spi.SelectorProvider; +import java.util.List; + +/** + * Encapsulates the thread pools used by the proxy. Contains the acceptor thread pool as well as the client-to-proxy and + * proxy-to-server thread pools. + */ +public class ProxyThreadPools { + /** + * These {@link EventLoopGroup}s accept incoming connections to the + * proxies. A different EventLoopGroup is used for each + * TransportProtocol, since these have to be configured differently. + */ + private final NioEventLoopGroup clientToProxyAcceptorPool; + + /** + * These {@link EventLoopGroup}s process incoming requests to the + * proxies. A different EventLoopGroup is used for each + * TransportProtocol, since these have to be configured differently. + */ + private final NioEventLoopGroup clientToProxyWorkerPool; + + /** + * These {@link EventLoopGroup}s are used for making outgoing + * connections to servers. A different EventLoopGroup is used for each + * TransportProtocol, since these have to be configured differently. + */ + private final NioEventLoopGroup proxyToServerWorkerPool; + + public ProxyThreadPools(SelectorProvider selectorProvider, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads, String serverGroupName, int serverGroupId) { + clientToProxyAcceptorPool = new NioEventLoopGroup(incomingAcceptorThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyAcceptor", serverGroupId), selectorProvider); + + clientToProxyWorkerPool = new NioEventLoopGroup(incomingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ClientToProxyWorker", serverGroupId), selectorProvider); + clientToProxyWorkerPool.setIoRatio(90); + + proxyToServerWorkerPool = new NioEventLoopGroup(outgoingWorkerThreads, new CategorizedThreadFactory(serverGroupName, "ProxyToServerWorker", serverGroupId), selectorProvider); + proxyToServerWorkerPool.setIoRatio(90); + } + + /** + * Returns all event loops (acceptor and worker thread pools) in this pool. + */ + public List getAllEventLoops() { + return ImmutableList.of(clientToProxyAcceptorPool, clientToProxyWorkerPool, proxyToServerWorkerPool); + } + + public NioEventLoopGroup getClientToProxyAcceptorPool() { + return clientToProxyAcceptorPool; + } + + public NioEventLoopGroup getClientToProxyWorkerPool() { + return clientToProxyWorkerPool; + } + + public NioEventLoopGroup getProxyToServerWorkerPool() { + return proxyToServerWorkerPool; + } +} diff --git a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java index 7d7fa5c4d..b4e23823b 100644 --- a/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java +++ b/src/main/java/org/littleshoot/proxy/impl/ProxyToServerConnection.java @@ -33,7 +33,7 @@ import org.littleshoot.proxy.HttpFilters; import org.littleshoot.proxy.MitmManager; import org.littleshoot.proxy.TransportProtocol; -import org.littleshoot.proxy.UnknownTransportProtocolError; +import org.littleshoot.proxy.UnknownTransportProtocolException; import org.slf4j.spi.LocationAwareLogger; import javax.net.ssl.SSLSession; @@ -584,8 +584,7 @@ boolean shouldExecuteOnEventLoop() { @Override protected Future execute() { - Bootstrap cb = new Bootstrap().group(proxyServer - .getProxyToServerWorkerFor(transportProtocol)); + Bootstrap cb = new Bootstrap().group(proxyServer.getProxyToServerWorkerFor(transportProtocol)); switch (transportProtocol) { case TCP: @@ -603,7 +602,7 @@ public Channel newChannel() { .option(ChannelOption.SO_REUSEADDR, true); break; default: - throw new UnknownTransportProtocolError(transportProtocol); + throw new UnknownTransportProtocolException(transportProtocol); } cb.handler(new ChannelInitializer() { diff --git a/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java new file mode 100644 index 000000000..ee10e85c5 --- /dev/null +++ b/src/main/java/org/littleshoot/proxy/impl/ServerGroup.java @@ -0,0 +1,284 @@ +package org.littleshoot.proxy.impl; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.udt.nio.NioUdtProvider; +import org.littleshoot.proxy.HttpProxyServer; +import org.littleshoot.proxy.TransportProtocol; +import org.littleshoot.proxy.UnknownTransportProtocolException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.channels.spi.SelectorProvider; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manages thread pools for one or more proxy server instances. When servers are created, they must register with the + * ServerGroup using {@link #registerProxyServer(HttpProxyServer)}, and when they shut down, must unregister with the + * ServerGroup using {@link #unregisterProxyServer(HttpProxyServer, boolean)}. + */ +public class ServerGroup { + private static final Logger log = LoggerFactory.getLogger(ServerGroup.class); + + /** + * The default number of threads to accept incoming requests from clients. (Requests are serviced by worker threads, + * not acceptor threads.) + */ + public static final int DEFAULT_INCOMING_ACCEPTOR_THREADS = 2; + + /** + * The default number of threads to service incoming requests from clients. + */ + public static final int DEFAULT_INCOMING_WORKER_THREADS = 8; + + /** + * The default number of threads to service outgoing requests to servers. + */ + public static final int DEFAULT_OUTGOING_WORKER_THREADS = 8; + + /** + * Global counter for the {@link #serverGroupId}. + */ + private static final AtomicInteger serverGroupCount = new AtomicInteger(0); + + /** + * A name for this ServerGroup to use in naming threads. + */ + private final String name; + + /** + * The ID of this server group. Forms part of the name of each thread created for this server group. Useful for + * differentiating threads when multiple proxy instances are running. + */ + private final int serverGroupId; + + private final int incomingAcceptorThreads; + private final int incomingWorkerThreads; + private final int outgoingWorkerThreads; + + /** + * List of all servers registered to use this ServerGroup. Any access to this list should be synchronized using the + * {@link #SERVER_REGISTRATION_LOCK}. + */ + public final List registeredServers = new ArrayList(1); + + /** + * A mapping of {@link TransportProtocol}s to their initialized {@link ProxyThreadPools}. Each transport uses a + * different thread pool, since the initialization parameters are different. + */ + private final EnumMap protocolThreadPools = new EnumMap(TransportProtocol.class); + + /** + * A mapping of selector providers to transport protocols. Avoids special-casing each transport protocol during + * transport protocol initialization. + */ + private static final EnumMap TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS = new EnumMap(TransportProtocol.class); + static { + TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.put(TransportProtocol.TCP, SelectorProvider.provider()); + TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.put(TransportProtocol.UDT, NioUdtProvider.BYTE_PROVIDER); + } + + /** + * True when this ServerGroup is stopped. + */ + private final AtomicBoolean stopped = new AtomicBoolean(false); + + /** + * Creates a new ServerGroup instance for a proxy. Threads created for this ServerGroup will have the specified + * ServerGroup name in the Thread name. This constructor does not actually initialize any thread pools; instead, + * thread pools for specific transport protocols are lazily initialized as needed. + * + * @param name ServerGroup name to include in thread names + * @param incomingAcceptorThreads number of acceptor threads per protocol + * @param incomingWorkerThreads number of client-to-proxy worker threads per protocol + * @param outgoingWorkerThreads number of proxy-to-server worker threads per protocol + */ + public ServerGroup(String name, int incomingAcceptorThreads, int incomingWorkerThreads, int outgoingWorkerThreads) { + this.name = name; + this.serverGroupId = serverGroupCount.getAndIncrement(); + this.incomingAcceptorThreads = incomingAcceptorThreads; + this.incomingWorkerThreads = incomingWorkerThreads; + this.outgoingWorkerThreads = outgoingWorkerThreads; + } + + /** + * Lock for initializing any transport protocols. + */ + private final Object THREAD_POOL_INIT_LOCK = new Object(); + + /** + * Retrieves the {@link ProxyThreadPools} for the specified transport protocol. Lazily initializes the thread pools + * for the transport protocol if they have not yet been initialized. If the protocol has already been initialized, + * this method returns immediately, without synchronization. If initialization is necessary, the initialization + * process creates the acceptor and worker threads necessary to service requests to/from the proxy. + *

+ * This method is thread-safe; no external locking is necessary. + * + * @param protocol transport protocol to retrieve thread pools for + * @return thread pools for the specified transport protocol + */ + private ProxyThreadPools getThreadPoolsForProtocol(TransportProtocol protocol) { + // if the thread pools have not been initialized for this protocol, initialize them + if (protocolThreadPools.get(protocol) == null) { + synchronized (THREAD_POOL_INIT_LOCK) { + if (protocolThreadPools.get(protocol) == null) { + log.debug("Initializing thread pools for {} with {} acceptor threads, {} incoming worker threads, and {} outgoing worker threads", + protocol, incomingAcceptorThreads, incomingWorkerThreads, outgoingWorkerThreads); + + SelectorProvider selectorProvider = TRANSPORT_PROTOCOL_SELECTOR_PROVIDERS.get(protocol); + if (selectorProvider == null) { + throw new UnknownTransportProtocolException(protocol); + } + + ProxyThreadPools threadPools = new ProxyThreadPools(selectorProvider, + incomingAcceptorThreads, + incomingWorkerThreads, + outgoingWorkerThreads, + name, + serverGroupId); + protocolThreadPools.put(protocol, threadPools); + } + } + } + + return protocolThreadPools.get(protocol); + } + + /** + * Lock controlling access to the {@link #registerProxyServer(HttpProxyServer)} and {@link #unregisterProxyServer(HttpProxyServer, boolean)} + * methods. + */ + private final Object SERVER_REGISTRATION_LOCK = new Object(); + + /** + * Registers the specified proxy server as a consumer of this server group. The server group will not be shut down + * until the proxy unregisters itself. + * + * @param proxyServer proxy server instance to register + */ + public void registerProxyServer(HttpProxyServer proxyServer) { + synchronized (SERVER_REGISTRATION_LOCK) { + registeredServers.add(proxyServer); + } + } + + /** + * Unregisters the specified proxy server from this server group. If this was the last registered proxy server, the + * server group will be shut down. + * + * @param proxyServer proxy server instance to unregister + * @param graceful when true, the server group shutdown (if necessary) will be graceful + */ + public void unregisterProxyServer(HttpProxyServer proxyServer, boolean graceful) { + synchronized (SERVER_REGISTRATION_LOCK) { + boolean wasRegistered = registeredServers.remove(proxyServer); + if (!wasRegistered) { + log.warn("Attempted to unregister proxy server from ServerGroup that it was not registered with. Was the proxy unregistered twice?"); + } + + if (registeredServers.isEmpty()) { + log.debug("Proxy server unregistered from ServerGroup. No proxy servers remain registered, so shutting down ServerGroup."); + + shutdown(graceful); + } else { + log.debug("Proxy server unregistered from ServerGroup. Not shutting down ServerGroup ({} proxy servers remain registered).", registeredServers.size()); + } + } + } + + /** + * Shuts down all event loops owned by this server group. + * + * @param graceful when true, event loops will "gracefully" terminate, waiting for submitted tasks to finish + */ + private void shutdown(boolean graceful) { + if (!stopped.compareAndSet(false, true)) { + log.info("Shutdown requested, but ServerGroup is already stopped. Doing nothing."); + + return; + } + + log.info("Shutting down server group event loops " + (graceful ? "(graceful)" : "(non-graceful)")); + + // loop through all event loops managed by this server group. this includes acceptor and worker event loops + // for both TCP and UDP transport protocols. + List allEventLoopGroups = new ArrayList(); + + for (ProxyThreadPools threadPools : protocolThreadPools.values()) { + allEventLoopGroups.addAll(threadPools.getAllEventLoops()); + } + + for (EventLoopGroup group : allEventLoopGroups) { + if (graceful) { + group.shutdownGracefully(); + } else { + group.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } + } + + if (graceful) { + for (EventLoopGroup group : allEventLoopGroups) { + try { + group.awaitTermination(60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + log.warn("Interrupted while shutting down event loop"); + } + } + } + + log.debug("Done shutting down server group"); + } + + /** + * Retrieves the client-to-proxy acceptor thread pool for the specified protocol. Initializes the pool if it has not + * yet been initialized. + *

+ * This method is thread-safe; no external locking is necessary. + * + * @param protocol transport protocol to retrieve the thread pool for + * @return the client-to-proxy acceptor thread pool + */ + public EventLoopGroup getClientToProxyAcceptorPoolForTransport(TransportProtocol protocol) { + return getThreadPoolsForProtocol(protocol).getClientToProxyAcceptorPool(); + } + + /** + * Retrieves the client-to-proxy acceptor worker pool for the specified protocol. Initializes the pool if it has not + * yet been initialized. + *

+ * This method is thread-safe; no external locking is necessary. + * + * @param protocol transport protocol to retrieve the thread pool for + * @return the client-to-proxy worker thread pool + */ + public EventLoopGroup getClientToProxyWorkerPoolForTransport(TransportProtocol protocol) { + return getThreadPoolsForProtocol(protocol).getClientToProxyWorkerPool(); + } + + /** + * Retrieves the proxy-to-server worker thread pool for the specified protocol. Initializes the pool if it has not + * yet been initialized. + *

+ * This method is thread-safe; no external locking is necessary. + * + * @param protocol transport protocol to retrieve the thread pool for + * @return the proxy-to-server worker thread pool + */ + public EventLoopGroup getProxyToServerWorkerPoolForTransport(TransportProtocol protocol) { + return getThreadPoolsForProtocol(protocol).getProxyToServerWorkerPool(); + } + + /** + * @return true if this ServerGroup has already been stopped + */ + public boolean isStopped() { + return stopped.get(); + } + +} diff --git a/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java b/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java new file mode 100644 index 000000000..b8e7b0582 --- /dev/null +++ b/src/main/java/org/littleshoot/proxy/impl/ThreadPoolConfiguration.java @@ -0,0 +1,62 @@ +package org.littleshoot.proxy.impl; + +/** + * Configuration object for the proxy's thread pools. Controls the number of acceptor and worker threads in the Netty + * {@link io.netty.channel.EventLoopGroup} used by the proxy. + */ +public class ThreadPoolConfiguration { + private int acceptorThreads = ServerGroup.DEFAULT_INCOMING_ACCEPTOR_THREADS; + private int clientToProxyWorkerThreads = ServerGroup.DEFAULT_INCOMING_WORKER_THREADS; + private int proxyToServerWorkerThreads = ServerGroup.DEFAULT_OUTGOING_WORKER_THREADS; + + public int getClientToProxyWorkerThreads() { + return clientToProxyWorkerThreads; + } + + /** + * Set the number of client-to-proxy worker threads to create. Worker threads perform the actual processing of + * client requests. The default value is {@link ServerGroup#DEFAULT_INCOMING_WORKER_THREADS}. + * + * @param clientToProxyWorkerThreads number of client-to-proxy worker threads to create + * @return this thread pool configuration instance, for chaining + */ + public ThreadPoolConfiguration withClientToProxyWorkerThreads(int clientToProxyWorkerThreads) { + this.clientToProxyWorkerThreads = clientToProxyWorkerThreads; + return this; + } + + public int getAcceptorThreads() { + return acceptorThreads; + } + + /** + * Set the number of acceptor threads to create. Acceptor threads accept HTTP connections from the client and queue + * them for processing by client-to-proxy worker threads. The default value is + * {@link ServerGroup#DEFAULT_INCOMING_ACCEPTOR_THREADS}. + * + * @param acceptorThreads number of acceptor threads to create + * @return this thread pool configuration instance, for chaining + */ + public ThreadPoolConfiguration withAcceptorThreads(int acceptorThreads) { + this.acceptorThreads = acceptorThreads; + return this; + } + + public int getProxyToServerWorkerThreads() { + return proxyToServerWorkerThreads; + } + + /** + * Set the number of proxy-to-server worker threads to create. Proxy-to-server worker threads make requests to + * upstream servers and process responses from the server. The default value is + * {@link ServerGroup#DEFAULT_OUTGOING_WORKER_THREADS}. + * + * @param proxyToServerWorkerThreads number of proxy-to-server worker threads to create + * @return this thread pool configuration instance, for chaining + */ + public ThreadPoolConfiguration withProxyToServerWorkerThreads(int proxyToServerWorkerThreads) { + this.proxyToServerWorkerThreads = proxyToServerWorkerThreads; + return this; + } + +} diff --git a/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java b/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java new file mode 100644 index 000000000..3abbc939e --- /dev/null +++ b/src/test/java/org/littleshoot/proxy/ClonedProxyTest.java @@ -0,0 +1,120 @@ +package org.littleshoot.proxy; + +import org.apache.http.HttpResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.littleshoot.proxy.impl.DefaultHttpProxyServer; +import org.littleshoot.proxy.test.HttpClientUtil; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.matchers.Times; + +import static org.junit.Assert.assertEquals; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class ClonedProxyTest { + private ClientAndServer mockServer; + private int mockServerPort; + + private HttpProxyServer originalProxy; + private HttpProxyServer clonedProxy; + + @Before + public void setUp() { + mockServer = new ClientAndServer(0); + mockServerPort = mockServer.getPort(); + } + + @After + public void tearDown() { + try { + if (mockServer != null) { + mockServer.stop(); + } + } finally { + try { + if (originalProxy != null) { + originalProxy.abort(); + } + } finally { + if (clonedProxy != null) { + clonedProxy.abort(); + } + } + } + } + + @Test + public void testClonedProxyHandlesRequests() { + originalProxy = DefaultHttpProxyServer.bootstrap() + .withPort(0) + .withName("original") + .start(); + clonedProxy = originalProxy.clone() + .withName("clone") + .start(); + + mockServer.when(request() + .withMethod("GET") + .withPath("/testClonedProxyHandlesRequests"), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("success") + ); + + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", clonedProxy); + assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode()); + } + + @Test + public void testStopClonedProxyDoesNotStopOriginalServer() { + originalProxy = DefaultHttpProxyServer.bootstrap() + .withPort(0) + .withName("original") + .start(); + clonedProxy = originalProxy.clone() + .withName("clone") + .start(); + + clonedProxy.abort(); + + mockServer.when(request() + .withMethod("GET") + .withPath("/testClonedProxyHandlesRequests"), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("success") + ); + + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", originalProxy); + assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode()); + } + + @Test + public void testStopOriginalServerDoesNotStopClonedServer() { + originalProxy = DefaultHttpProxyServer.bootstrap() + .withPort(0) + .withName("original") + .start(); + clonedProxy = originalProxy.clone() + .withName("clone") + .start(); + + originalProxy.abort(); + + mockServer.when(request() + .withMethod("GET") + .withPath("/testClonedProxyHandlesRequests"), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("success") + ); + + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + "/testClonedProxyHandlesRequests", clonedProxy); + assertEquals("Expected to receive a 200 when making a request using the cloned proxy server", 200, response.getStatusLine().getStatusCode()); + } +} diff --git a/src/test/java/org/littleshoot/proxy/ServerGroupTest.java b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java new file mode 100644 index 000000000..629a47c2f --- /dev/null +++ b/src/test/java/org/littleshoot/proxy/ServerGroupTest.java @@ -0,0 +1,153 @@ +package org.littleshoot.proxy; + +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.littleshoot.proxy.impl.DefaultHttpProxyServer; +import org.littleshoot.proxy.impl.ThreadPoolConfiguration; +import org.littleshoot.proxy.test.HttpClientUtil; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.matchers.Times; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.mockserver.model.HttpRequest.request; +import static org.mockserver.model.HttpResponse.response; + +public class ServerGroupTest { + private ClientAndServer mockServer; + private int mockServerPort; + + private HttpProxyServer proxyServer; + + @Before + public void setUp() { + mockServer = new ClientAndServer(0); + mockServerPort = mockServer.getPort(); + } + + @After + public void tearDown() { + try { + if (mockServer != null) { + mockServer.stop(); + } + } finally { + if (proxyServer != null) { + proxyServer.abort(); + } + } + } + + @Test + public void testSingleWorkerThreadPoolConfiguration() throws ExecutionException, InterruptedException { + final String firstRequestPath = "/testSingleThreadFirstRequest"; + final String secondRequestPath = "/testSingleThreadSecondRequest"; + + // set up two server responses that will execute more or less simultaneously. the first request has a small + // delay, to reduce the chance that the first request will finish entirely before the second request is finished + // (and thus be somewhat more likely to be serviced by the same thread, even if the ThreadPoolConfiguration is + // not behaving properly). + mockServer.when(request() + .withMethod("GET") + .withPath(firstRequestPath), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("first") + .withDelay(TimeUnit.MILLISECONDS, 500) + ); + + mockServer.when(request() + .withMethod("GET") + .withPath(secondRequestPath), + Times.exactly(1)) + .respond(response() + .withStatusCode(200) + .withBody("second") + ); + + // save the names of the threads that execute the filter methods. filter methods are executed by the worker thread + // handling the request/response, so if there is only one worker thread, the filter methods should be executed + // by the same thread. + final AtomicReference firstClientThreadName = new AtomicReference(); + final AtomicReference secondClientThreadName = new AtomicReference(); + + final AtomicReference firstProxyThreadName = new AtomicReference(); + final AtomicReference secondProxyThreadName = new AtomicReference(); + + proxyServer = DefaultHttpProxyServer.bootstrap() + .withPort(0) + .withFiltersSource(new HttpFiltersSourceAdapter() { + @Override + public HttpFilters filterRequest(HttpRequest originalRequest) { + return new HttpFiltersAdapter(originalRequest) { + @Override + public io.netty.handler.codec.http.HttpResponse clientToProxyRequest(HttpObject httpObject) { + if (originalRequest.getUri().endsWith(firstRequestPath)) { + firstClientThreadName.set(Thread.currentThread().getName()); + } else if (originalRequest.getUri().endsWith(secondRequestPath)) { + secondClientThreadName.set(Thread.currentThread().getName()); + } + + return super.clientToProxyRequest(httpObject); + } + + @Override + public void serverToProxyResponseReceived() { + if (originalRequest.getUri().endsWith(firstRequestPath)) { + firstProxyThreadName.set(Thread.currentThread().getName()); + } else if (originalRequest.getUri().endsWith(secondRequestPath)) { + secondProxyThreadName.set(Thread.currentThread().getName()); + } + } + }; + } + }) + .withThreadPoolConfiguration(new ThreadPoolConfiguration() + .withAcceptorThreads(1) + .withClientToProxyWorkerThreads(1) + .withProxyToServerWorkerThreads(1)) + .start(); + + // execute both requests in parallel, to increase the chance of blocking due to the single-threaded ThreadPoolConfiguration + + Runnable firstRequest = new Runnable() { + @Override + public void run() { + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + firstRequestPath, proxyServer); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + }; + + Runnable secondRequest = new Runnable () { + @Override + public void run() { + HttpResponse response = HttpClientUtil.performHttpGet("http://localhost:" + mockServerPort + secondRequestPath, proxyServer); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + }; + + ExecutorService executor = Executors.newFixedThreadPool(2); + Future firstFuture = executor.submit(firstRequest); + Future secondFuture = executor.submit(secondRequest); + + firstFuture.get(); + secondFuture.get(); + + Thread.sleep(500); + + assertEquals("Expected clientToProxy filter methods to be executed on the same thread for both requests", firstClientThreadName.get(), secondClientThreadName.get()); + assertEquals("Expected serverToProxy filter methods to be executed on the same thread for both requests", firstProxyThreadName.get(), secondProxyThreadName.get()); + } + +}