Permalink
Browse files

Added JMX debugging properties, better RFC compliance in HTTP respons…

…es, etc
  • Loading branch information...
Adam Fisk
Adam Fisk committed May 22, 2011
1 parent 65d705d commit 4f8ab7c2741670fce0ca139d4157c1569669fa0e
View
@@ -1,3 +1,2 @@
-readThrottle=100000
-writeThrottle=100000
-throttle=false
+# Exposes proxy connection properties via JMX.
+jmx=true
@@ -0,0 +1,9 @@
+package org.littleshoot.proxy;
+
+import javax.management.MXBean;
+
+@MXBean(true)
+public interface AllConnectionData {
+
+ int getNumRequestHandlers();
+}
@@ -0,0 +1,22 @@
+package org.littleshoot.proxy;
+
+import javax.management.MXBean;
+
+@MXBean(true)
+public interface ConnectionData {
+
+ int getClientConnections();
+
+ int getTotalClientConnections();
+
+ int getOutgoingConnections();
+
+ int getRequestsSent();
+
+ int getResponsesReceived();
+
+ String getUnansweredRequests();
+
+ String getAnsweredReqeusts();
+
+}
@@ -128,17 +128,11 @@ public void messageReceived(final ChannelHandlerContext ctx,
else {
flush = true;
}
- ProxyUtils.stripHopByHopHeaders(response);
- ProxyUtils.addVia(response);
final HttpResponse filtered =
this.httpFilter.filterResponse(response);
messageToWrite = filtered;
- log.info("Response status: {}", filtered.getStatus());
- log.info("Headers sent to browser: ");
- ProxyUtils.printHeaders(filtered);
-
// An HTTP response is associated with a single request, so we
// can pop the correct request off the queue.
//
@@ -167,9 +161,9 @@ public void messageReceived(final ChannelHandlerContext ctx,
messageToWrite = chunk;
}
- if (browserToProxyChannel.isOpen()) {
+ if (browserToProxyChannel.isConnected()) {
final ChannelFuture future =
- browserToProxyChannel.write(
+ this.browserToProxyChannel.write(
new ProxyHttpResponse(this.currentHttpRequest, httpResponse,
messageToWrite));
@@ -183,6 +177,12 @@ public void messageReceived(final ChannelHandlerContext ctx,
} else {
//future.addListener(cfl);
}
+
+ if (wroteFullResponse(httpResponse)) {
+ log.info("Notifying relay");
+ this.relayListener.onRelayHttpResponse(browserToProxyChannel,
+ this.hostAndPort, this.currentHttpRequest);
+ }
if (shouldCloseRemoteConnection(this.currentHttpRequest,
httpResponse, messageToWrite)) {
log.info("Closing remote connection after writing to browser");
@@ -226,6 +226,13 @@ public void operationComplete(final ChannelFuture cf)
}
}
+ private boolean wroteFullResponse(final HttpResponse res) {
+ if (res.isChunked()) {
+ return ProxyUtils.isLastChunk(res);
+ }
+ return true;
+ }
+
private boolean shouldCloseBrowserConnection(final HttpRequest req,
final HttpResponse res, final Object msg) {
if (res.isChunked()) {
@@ -2,13 +2,23 @@
import static org.jboss.netty.channel.Channels.pipeline;
+import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.MBeanRegistrationException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+
import org.apache.commons.lang.StringUtils;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
@@ -47,7 +57,7 @@
* to host A has completed.
*/
public class HttpRequestHandler extends SimpleChannelUpstreamHandler
- implements RelayListener {
+ implements RelayListener, ConnectionData {
private final static Logger log =
LoggerFactory.getLogger(HttpRequestHandler.class);
@@ -63,8 +73,17 @@
private volatile int messagesReceived = 0;
private volatile int numWebConnections = 0;
+ private volatile int unansweredRequestCount = 0;
+
+ private volatile int requestsSent = 0;
+
+ private volatile int responsesReceived = 0;
+
private final ProxyAuthorizationManager authorizationManager;
+ private final Set<String> answeredRequests = new HashSet<String>();
+ private final Set<String> unansweredRequests = new HashSet<String>();
+
/**
* Note, we *can* receive requests for multiple different sites from the
* same connection from the browser, so the host and port most certainly
@@ -86,6 +105,8 @@
private final HttpRequestFilter requestFilter;
private final AtomicBoolean browserChannelClosed = new AtomicBoolean(false);
+ private volatile boolean receivedChannelClosed = false;
+ private final boolean useJmx;
/**
* Creates a new class for handling HTTP requests with the specified
@@ -98,26 +119,16 @@
* channels we've opened.
* @param filters HTTP filtering rules.
* @param clientChannelFactory The common channel factory for clients.
- * @param chainProxyHostAndPort upstream proxy server host and port or null
- * if none used.
- * @param requestFilter An optional filter for HTTP requests.
*/
public HttpRequestHandler(final ProxyCacheManager cacheManager,
final ProxyAuthorizationManager authorizationManager,
final ChannelGroup channelGroup,
final Map<String, HttpFilter> filters,
- final ClientSocketChannelFactory clientChannelFactory,
- final String chainProxyHostAndPort,
- final HttpRequestFilter requestFilter) {
- this.cacheManager = cacheManager;
- this.authorizationManager = authorizationManager;
- this.channelGroup = channelGroup;
- this.filters = filters;
- this.clientChannelFactory = clientChannelFactory;
- this.chainProxyHostAndPort = chainProxyHostAndPort;
- this.requestFilter = requestFilter;
+ final ClientSocketChannelFactory clientChannelFactory) {
+ this(cacheManager, authorizationManager, channelGroup, filters,
+ clientChannelFactory, null, null, false);
}
-
+
/**
* Creates a new class for handling HTTP requests with the specified
* authentication manager.
@@ -129,14 +140,55 @@ public HttpRequestHandler(final ProxyCacheManager cacheManager,
* channels we've opened.
* @param filters HTTP filtering rules.
* @param clientChannelFactory The common channel factory for clients.
+ * @param chainProxyHostAndPort upstream proxy server host and port or null
+ * if none used.
+ * @param requestFilter An optional filter for HTTP requests.
+ * @param useJmx Whether or not to expose debugging properties via JMX.
*/
public HttpRequestHandler(final ProxyCacheManager cacheManager,
final ProxyAuthorizationManager authorizationManager,
final ChannelGroup channelGroup,
final Map<String, HttpFilter> filters,
- final ClientSocketChannelFactory clientChannelFactory) {
- this(cacheManager, authorizationManager, channelGroup, filters,
- clientChannelFactory, null, null);
+ final ClientSocketChannelFactory clientChannelFactory,
+ final String chainProxyHostAndPort,
+ final HttpRequestFilter requestFilter, final boolean useJmx) {
+ this.cacheManager = cacheManager;
+ this.authorizationManager = authorizationManager;
+ this.channelGroup = channelGroup;
+ this.filters = filters;
+ this.clientChannelFactory = clientChannelFactory;
+ this.chainProxyHostAndPort = chainProxyHostAndPort;
+ this.requestFilter = requestFilter;
+ this.useJmx = useJmx;
+ if (useJmx) {
+ setupJmx();
+ }
+ }
+
+
+ private void setupJmx() {
+ final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ try {
+ final Class<? extends SimpleChannelUpstreamHandler> clazz =
+ getClass();
+ final String pack = clazz.getPackage().getName();
+ final String oName =
+ pack+":type="+clazz.getSimpleName()+"-"+clazz.getSimpleName() +
+ hashCode();
+ log.info("Registering MBean with name: {}", oName);
+ final ObjectName mxBeanName = new ObjectName(oName);
+ if(!mbs.isRegistered(mxBeanName)) {
+ mbs.registerMBean(this, mxBeanName);
+ }
+ } catch (final MalformedObjectNameException e) {
+ log.error("Could not set up JMX", e);
+ } catch (final InstanceAlreadyExistsException e) {
+ log.error("Could not set up JMX", e);
+ } catch (final MBeanRegistrationException e) {
+ log.error("Could not set up JMX", e);
+ } catch (final NotCompliantMBeanException e) {
+ log.error("Could not set up JMX", e);
+ }
}
@Override
@@ -211,7 +263,19 @@ private void processMessage(final ChannelHandlerContext ctx,
final class OnConnect {
public ChannelFuture onConnect(final ChannelFuture cf) {
if (request.getMethod() != HttpMethod.CONNECT) {
- return cf.getChannel().write(request);
+ final ChannelFuture writeFuture = cf.getChannel().write(request);
+ writeFuture.addListener(new ChannelFutureListener() {
+
+ public void operationComplete(final ChannelFuture future)
+ throws Exception {
+ if (useJmx) {
+ unansweredRequests.add(request.toString());
+ }
+ unansweredRequestCount++;
+ requestsSent++;
+ }
+ });
+ return writeFuture;
}
else {
writeConnectResponse(ctx, request, cf.getChannel());
@@ -473,7 +537,8 @@ public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
log.info("Got reader idle -- closing");
e.getChannel().close();
} else if (e.getState() == IdleState.WRITER_IDLE) {
- log.info("Got writer idle");
+ log.info("Got writer idle -- closing connection");
+ e.getChannel().close();
}
}
}
@@ -523,16 +588,18 @@ public void channelClosed(final ChannelHandlerContext ctx,
public void onRelayChannelClose(final Channel browserToProxyChannel,
final String key) {
+ this.receivedChannelClosed = true;
this.numWebConnections--;
- if (this.numWebConnections == 0) {
+ if (this.numWebConnections == 0 || this.unansweredRequestCount == 0) {
if (!browserChannelClosed.getAndSet(true)) {
log.info("Closing browser to proxy channel");
ProxyUtils.closeOnFlush(browserToProxyChannel);
}
}
else {
log.info("Not closing browser to proxy channel. Still "+
- this.numWebConnections+" connections...");
+ this.numWebConnections+" connections and awaiting "+
+ this.unansweredRequestCount + " responses");
}
this.endpointsToChannelFutures.remove(key);
@@ -545,6 +612,31 @@ public void onRelayChannelClose(final Channel browserToProxyChannel,
this.numWebConnections);
}
}
+
+
+ public void onRelayHttpResponse(final Channel browserToProxyChannel,
+ final String key, final HttpRequest httpRequest) {
+ if (this.useJmx) {
+ this.answeredRequests.add(httpRequest.toString());
+ this.unansweredRequests.remove(httpRequest.toString());
+ }
+ this.unansweredRequestCount--;
+ this.responsesReceived++;
+ // If we've received responses to all outstanding requests and one
+ // of those outgoing channels has been closed, we should close the
+ // connection to the browser.
+ if (this.unansweredRequestCount == 0 && this.receivedChannelClosed) {
+ if (!browserChannelClosed.getAndSet(true)) {
+ log.info("Closing browser to proxy channel on HTTP response");
+ ProxyUtils.closeOnFlush(browserToProxyChannel);
+ }
+ }
+ else {
+ log.info("Not closing browser to proxy channel. Still "+
+ "awaiting " + this.unansweredRequestCount+" responses..." +
+ "receivedChannelClosed="+this.receivedChannelClosed);
+ }
+ }
@Override
public void exceptionCaught(final ChannelHandlerContext ctx,
@@ -561,4 +653,32 @@ public void exceptionCaught(final ChannelHandlerContext ctx,
}
ProxyUtils.closeOnFlush(channel);
}
+
+ public int getClientConnections() {
+ return this.browserToProxyConnections;
+ }
+
+ public int getTotalClientConnections() {
+ return totalBrowserToProxyConnections;
+ }
+
+ public int getOutgoingConnections() {
+ return numWebConnections;
+ }
+
+ public int getRequestsSent() {
+ return this.requestsSent;
+ }
+
+ public int getResponsesReceived() {
+ return this.responsesReceived;
+ }
+
+ public String getUnansweredRequests() {
+ return this.unansweredRequests.toString();
+ }
+
+ public String getAnsweredReqeusts() {
+ return this.answeredRequests.toString();
+ }
}
Oops, something went wrong.

0 comments on commit 4f8ab7c

Please sign in to comment.