Permalink
Browse files

Adding Executor to enforce serial processing and limiting the number …

…of channel handling threads to 1 to avoid concurrency issues.

This should fix issue 1156 for the HTTP V2 engine.

Logging reveals that multiple threads can be involved in handling the same request.
The PMS code has not been designed to handle one request with multiple threads in parallel.

As a result concurrency issues arise at random locations, several ArrayOutOfBoundsExceptions
and NullPointerExceptions have been reported at places where they should not occur in a
single thread environment.

To avoid concurrency issues this commit limits the number of pool threads to 1.

Performance does not seem to take a hit from this change.
Also, it is still possible to stream to multiple clients at the same time.

See: http://docs.jboss.org/netty/3.2/api/org/jboss/netty/handler/execution/OrderedMemoryAwareThreadPoolExecutor.html
  • Loading branch information...
1 parent b5aac0a commit ae0273230c8e1cc7d9f87c1420ed8705b64f4017 @Raptor399 committed Dec 9, 2011
@@ -38,6 +38,8 @@
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,7 @@
private Channel channel;
private NetworkInterface ni = null;
private ChannelGroup group;
+ private ExecutionHandler executionHandler = null;
public InetAddress getIafinal() {
return iafinal;
@@ -110,8 +113,20 @@ public boolean start() throws IOException {
factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
+
+ // The OrderedMemoryAwareThreadPoolExecutor makes that all requests
+ // are handled sequentially in the correct order. Without it hiccups
+ // and double requests may occur. (See issue 1156)
+ //
+ // Setting corePoolSize to 1 because the PMS classes involved in
+ // streaming are not thread safe. Multiple threads handling the
+ // same request unintentionally cause ArrayOutOfBoundsExceptions
+ // and NullPointerExceptions.
+ executionHandler = new ExecutionHandler(
+ new OrderedMemoryAwareThreadPoolExecutor(1, 1048576, 1048576));
+
ServerBootstrap bootstrap = new ServerBootstrap(factory);
- HttpServerPipelineFactory pipeline = new HttpServerPipelineFactory(group);
+ HttpServerPipelineFactory pipeline = new HttpServerPipelineFactory(group, executionHandler);
bootstrap.setPipelineFactory(pipeline);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
@@ -161,6 +176,9 @@ public void stop() {
if (factory != null) {
factory.releaseExternalResources();
}
+ if (executionHandler != null) {
+ executionHandler.releaseExternalResources();
+ }
}
NetworkConfiguration.forgetConfiguration();
}
@@ -21,14 +21,15 @@
*/
package net.pms.network;
-import static org.jboss.netty.channel.Channels.*;
+import static org.jboss.netty.channel.Channels.pipeline;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.execution.ExecutionHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,9 +41,11 @@
public class HttpServerPipelineFactory implements ChannelPipelineFactory {
private static final Logger logger = LoggerFactory.getLogger(HttpServerPipelineFactory.class);
private ChannelGroup group;
+ private final ExecutionHandler executionHandler;
- public HttpServerPipelineFactory(ChannelGroup group) {
+ public HttpServerPipelineFactory(ChannelGroup group, ExecutionHandler executionHandler) {
this.group = group;
+ this.executionHandler = executionHandler;
}
public ChannelPipeline getPipeline() throws Exception {
@@ -52,6 +55,7 @@ public ChannelPipeline getPipeline() throws Exception {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); // eliminate the need to decode http chunks from the client
pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("executor", executionHandler); // Must be shared
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
pipeline.addLast("handler", new RequestHandlerV2(group));
return pipeline;

0 comments on commit ae02732

Please sign in to comment.