diff --git a/commands.py b/commands.py index 55dad56..dafe349 100644 --- a/commands.py +++ b/commands.py @@ -1,4 +1,3 @@ - if play_command == ('netty:run'): check_application() load_modules() @@ -13,4 +12,85 @@ java_cmd.insert(2, '-Dplay.debug=yes') subprocess.call(java_cmd, env=os.environ) print - sys.exit(0) \ No newline at end of file + sys.exit(0) +if play_command == ('netty:test'): + play_id = 'test' + check_application() + load_modules() + do_classpath() + disable_check_jpda = False + if remaining_args.count('-f') == 1: + disable_check_jpda = True + remaining_args.remove('-f') + do_java('play.modules.netty.Server') + print "~ Running in test mode" + print "~ Ctrl+C to stop" + print "~ " + check_jpda() + java_cmd.insert(2, '-Xdebug') + java_cmd.insert(2, '-Xrunjdwp:transport=dt_socket,address=%s,server=y,suspend=n' % jpda_port) + java_cmd.insert(2, '-Dplay.debug=yes') + subprocess.call(java_cmd, env=os.environ) + print + sys.exit(0) +if play_command == 'netty:auto-test': + play_id = 'test' + check_application() + load_modules() + do_classpath() + do_java('play.modules.netty.Server') + print "~ Running in test mode" + print "~ Ctrl+C to stop" + print "~ " + print "~ Deleting %s" % os.path.normpath(os.path.join(application_path, 'tmp')) + if os.path.exists(os.path.join(application_path, 'tmp')): + shutil.rmtree(os.path.join(application_path, 'tmp')) + print "~" + test_result = os.path.join(application_path, 'test-result') + if os.path.exists(test_result): + shutil.rmtree(test_result) + sout = open(os.path.join(log_path, 'system.out'), 'w') + play_process = subprocess.Popen(java_cmd, env=os.environ, stdout=sout) + soutint = open(os.path.join(log_path, 'system.out'), 'r') + while True: + if play_process.poll(): + print "~" + print "~ Oops, application has not started?" + print "~" + sys.exit(-1) + line = soutint.readline().strip() + if line: + print line + if line.find('Listening for HTTP') > -1: + soutint.close() + break + # Launch the browser + print "~" + print "~ Loading the test runner at %s ..." % ('http://localhost:%s/@tests' % http_port) + try: + proxy_handler = urllib2.ProxyHandler({}) + opener = urllib2.build_opener(proxy_handler) + opener.open('http://localhost:%s/@tests' % http_port); + except urllib2.HTTPError, e: + print "~" + print "~ There are compilation errors... (%s)" % (e.code) + print "~" + kill(play_process.pid) + sys.exit(-1) + print "~ Launching a web browser at http://localhost:%s/@tests?select=all&auto=yes ..." % http_port + webbrowser.open('http://localhost:%s/@tests?select=all&auto=yes' % http_port) + while True: + time.sleep(1) + if os.path.exists(os.path.join(application_path, 'test-result/result.passed')): + print "~" + print "~ All tests passed" + print "~" + kill(play_process.pid) + break + if os.path.exists(os.path.join(application_path, 'test-result/result.failed')): + print "~" + print "~ Some tests have failed. See file://%s for results" % test_result + print "~" + kill(play_process.pid) + break + sys.exit(0) diff --git a/documentation/manual/home.textile b/documentation/manual/home.textile index 29d0e80..ac6e066 100644 --- a/documentation/manual/home.textile +++ b/documentation/manual/home.textile @@ -1,10 +1,10 @@ h1. Netty module -p(note). **Warning! This version contains JBoss Netty 3.2.0 alpha 4. Be sure to remove the previous version of Netty in your module directory** +p(note). **Warning! This version contains JBoss Netty 3.1.5 GA. Be sure to remove the previous version of Netty in your module directory** The netty module allows you to use JBoss Netty:http://jboss.org/netty as a play! server. -Version 3.2.0 alpha 4 of Netty. +Version 3.1.5 GA of Netty. h2. Enable the Netty module for the application diff --git a/lib/netty-3.1.5.GA.jar b/lib/netty-3.1.5.GA.jar new file mode 100644 index 0000000..3729ddd Binary files /dev/null and b/lib/netty-3.1.5.GA.jar differ diff --git a/lib/netty-3.2.0.ALPHA4.jar b/lib/netty-3.2.0.ALPHA4.jar deleted file mode 100644 index efe7c4d..0000000 Binary files a/lib/netty-3.2.0.ALPHA4.jar and /dev/null differ diff --git a/src/play/modules/netty/FileChannelBuffer.java b/src/play/modules/netty/FileChannelBuffer.java index 5a9f77f..08908e9 100644 --- a/src/play/modules/netty/FileChannelBuffer.java +++ b/src/play/modules/netty/FileChannelBuffer.java @@ -150,6 +150,10 @@ public int getBytes(int index, GatheringByteChannel out, int length) return out.write(bb); } + public void setByte(int i, int i1) { + throw new RuntimeException(); + } + public void getBytes(int index, OutputStream out, int length) throws IOException { byte[] b = new byte[length]; @@ -315,6 +319,10 @@ public void readBytes(OutputStream out, int length) throws IOException { } public String toString(int q,int a,java.lang.String b) { - return null; + throw new RuntimeException(); } + + public void setShort(int a,int b) { + throw new RuntimeException(); + } } diff --git a/src/play/modules/netty/HttpServerPipelineFactory.java b/src/play/modules/netty/HttpServerPipelineFactory.java index a8d2e22..5e456b9 100644 --- a/src/play/modules/netty/HttpServerPipelineFactory.java +++ b/src/play/modules/netty/HttpServerPipelineFactory.java @@ -15,24 +15,24 @@ public class HttpServerPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { Integer max = Integer.valueOf(Play.configuration.getProperty("play.module.netty.maxContentLength", "-1")); - if (max == -1) { - max = Integer.MAX_VALUE; - } +// if (max == -1) { +// max = Integer.MAX_VALUE; +// } Integer threshold = Integer.valueOf(Play.configuration.getProperty("play.module.netty.threshold", "8192")); - if (max == -1) { - max = Integer.MAX_VALUE; - } + ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("aggregator", new StreamChunkAggregator(max)); - pipeline.addLast("encoder", new HttpResponseEncoder()); //pipeline.addLast("aggregator", new HttpChunkAggregator(max)); + pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + pipeline.addLast("handler", new PlayHandler()); + return pipeline; } } diff --git a/src/play/modules/netty/PlayHandler.java b/src/play/modules/netty/PlayHandler.java index f659972..e119efe 100644 --- a/src/play/modules/netty/PlayHandler.java +++ b/src/play/modules/netty/PlayHandler.java @@ -6,7 +6,7 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.*; import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.stream.ChunkedNioFile; +import org.jboss.netty.handler.stream.ChunkedFile; import org.jboss.netty.handler.stream.ChunkedStream; import play.Invoker; import play.Logger; @@ -43,8 +43,14 @@ public class PlayHandler extends SimpleChannelUpstreamHandler { private final static String signature = "Play! Framework;" + Play.version + ";" + Play.mode.name().toLowerCase(); + private HttpRequest originalRequest; + private boolean readingChunks; + + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + Logger.trace("messageReceived: begin"); + final Object msg = e.getMessage(); if (msg instanceof HttpRequest) { final HttpRequest nettyRequest = (HttpRequest) msg; @@ -64,13 +70,14 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex if (raw) { copyResponse(ctx, request, response, nettyRequest); } else { - Invoker.invoke(new NettyInvocation(request, response, ctx, nettyRequest)); + Invoker.invoke(new NettyInvocation(request, response, ctx, nettyRequest, e)); } + } catch (Exception ex) { serve500(ex, ctx, nettyRequest); - return; } } + Logger.trace("messageReceived: end"); } private static Map staticPathsCache = new HashMap(); @@ -82,16 +89,19 @@ public class NettyInvocation extends Invoker.DirectInvocation { private final Request request; private final Response response; private final HttpRequest nettyRequest; + private final MessageEvent e; - public NettyInvocation(Request request, Response response, ChannelHandlerContext ctx, HttpRequest nettyRequest) { + public NettyInvocation(Request request, Response response, ChannelHandlerContext ctx, HttpRequest nettyRequest, MessageEvent e) { this.ctx = ctx; this.request = request; this.response = response; this.nettyRequest = nettyRequest; + this.e = e; } @Override public boolean init() { + Logger.trace("init: begin"); Request.current.set(request); Response.current.set(response); // Patch favicon.ico @@ -103,13 +113,15 @@ public boolean init() { synchronized (staticPathsCache) { rs = staticPathsCache.get(request.path); } - serveStatic(rs, ctx, request, response, nettyRequest); + serveStatic(rs, ctx, request, response, nettyRequest, e); + Logger.trace("init: end false"); return false; } try { Router.routeOnlyStatic(request); } catch (NotFound e) { serve404(e, ctx, request, nettyRequest); + Logger.trace("init: end false"); return false; } catch (RenderStatic e) { if (Play.mode == Play.Mode.PROD) { @@ -117,27 +129,33 @@ public boolean init() { staticPathsCache.put(request.path, e); } } - serveStatic(e, ctx, request, response, nettyRequest); + serveStatic(e, ctx, request, response, nettyRequest, this.e); + Logger.trace("init: end false"); return false; } + Logger.trace("init: end true"); return true; } @Override public void run() { try { + Logger.trace("run: begin"); super.run(); } catch (Exception e) { e.printStackTrace(); serve500(e, ctx, nettyRequest); } + Logger.trace("run: end"); } @Override public void execute() throws Exception { + Logger.trace("execute: begin"); ActionInvoker.invoke(request, response); - saveExceededSizeError(nettyRequest, response); + //saveExceededSizeError(nettyRequest, response); copyResponse(ctx, request, response, nettyRequest); + Logger.trace("execute: end"); } } @@ -173,6 +191,7 @@ protected static void addToResponse(Response response, HttpResponse nettyRespons } } Map cookies = response.cookies; + for (Http.Cookie cookie : cookies.values()) { CookieEncoder encoder = new CookieEncoder(true); Cookie c = new DefaultCookie(cookie.name, cookie.value); @@ -187,23 +206,34 @@ protected static void addToResponse(Response response, HttpResponse nettyRespons encoder.addCookie(c); nettyResponse.addHeader(SET_COOKIE, encoder.encode()); } + if (!response.headers.containsKey(CACHE_CONTROL)) { nettyResponse.setHeader(CACHE_CONTROL, "no-cache"); } } - protected static void writeResponse(ChannelHandlerContext ctx, Response response, HttpResponse nettyResponse) throws IOException { - byte[] content = response.out.toByteArray(); + protected static void writeResponse(ChannelHandlerContext ctx, Response response, HttpResponse nettyResponse, HttpRequest nettyRequest) throws IOException { + final byte[] content = response.out.toByteArray(); + final boolean keepAlive = isKeepAlive(nettyResponse); + + ChannelBuffer buf = ChannelBuffers.copiedBuffer(content); nettyResponse.setContent(buf); - + //if (keepAlive) { + setContentLength(nettyResponse, nettyResponse.getContent().readableBytes()); + //} ChannelFuture f = ctx.getChannel().write(nettyResponse); - f.addListener(ChannelFutureListener.CLOSE); + // Decide whether to close the connection or not. + if (!keepAlive) { + // Close the connection when the whole content is written out. + f.addListener(ChannelFutureListener.CLOSE); + } } public static void copyResponse(ChannelHandlerContext ctx, Request request, Response response, HttpRequest nettyRequest) throws Exception { + Logger.trace("copyResponse: begin"); response.out.flush(); // Decide whether to close the connection or not. @@ -228,13 +258,28 @@ public static void copyResponse(ChannelHandlerContext ctx, Request request, Resp is = (InputStream) obj; } - if ((file != null) && file.isFile()) { + final boolean keepAlive = isKeepAlive(nettyResponse); + if (file != null && file.isFile()) { try { + addEtag(request, nettyResponse, file); - nettyResponse.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(file.length())); - ctx.getChannel().write(nettyResponse); - ChannelFuture writeFuture = ctx.getChannel().write(new ChunkedNioFile(file)); - if (!nettyResponse.isKeepAlive()) { + + nettyResponse.setHeader(CONTENT_TYPE, (MimeTypes.getContentType(file.getName(), "text/plain"))); + RandomAccessFile raf; + raf = new RandomAccessFile(file, "r"); + long fileLength = raf.length(); + + Logger.trace("file length is [" + fileLength + "]"); + setContentLength(nettyResponse, fileLength); + + Channel ch = ctx.getChannel(); + + // Write the initial line and the header. + ch.write(nettyResponse); + + // Write the content. + ChannelFuture writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); + if (!keepAlive) { // Close the connection when the whole content is written out. writeFuture.addListener(ChannelFutureListener.CLOSE); } @@ -244,11 +289,13 @@ public static void copyResponse(ChannelHandlerContext ctx, Request request, Resp } else if (is != null) { ctx.getChannel().write(nettyResponse); ChannelFuture writeFuture = ctx.getChannel().write(new ChunkedStream(is)); - writeFuture.addListener(ChannelFutureListener.CLOSE); + if (!keepAlive) { + writeFuture.addListener(ChannelFutureListener.CLOSE); + } } else { - writeResponse(ctx, response, nettyResponse); + writeResponse(ctx, response, nettyResponse, nettyRequest); } - + Logger.trace("copyResponse: end"); } private static String escapeIllegalCharacters(String uri) throws Exception { @@ -257,7 +304,7 @@ private static String escapeIllegalCharacters(String uri) throws Exception { } public static Request parseRequest(ChannelHandlerContext ctx, HttpRequest nettyRequest) throws Exception { - + Logger.trace("parseRequest: begin"); final URI uri = new URI(escapeIllegalCharacters(nettyRequest.getUri())); final Request request = new Request(); @@ -279,7 +326,6 @@ public static Request parseRequest(ChannelHandlerContext ctx, HttpRequest nettyR // An error occured Integer max = Integer.valueOf(Play.configuration.getProperty("play.module.netty.maxContentLength", "-1")); if (max == -1 || buffer.getInputStream().available() < max) { - request.body = buffer.getInputStream(); } else { request.body = new ByteArrayInputStream(new byte[0]); @@ -324,6 +370,7 @@ public static Request parseRequest(ChannelHandlerContext ctx, HttpRequest nettyR request._init(); + Logger.trace("parseRequest: end"); return request; } @@ -364,6 +411,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) } public static void serve404(NotFound e, ChannelHandlerContext ctx, Request request, HttpRequest nettyRequest) { + Logger.trace("serve404: begin"); HttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); nettyResponse.setHeader(SERVER, signature); @@ -386,6 +434,7 @@ public static void serve404(NotFound e, ChannelHandlerContext ctx, Request reque } catch (UnsupportedEncodingException fex) { Logger.error(fex, "(utf-8 ?)"); } + Logger.trace("serve404: end"); } protected static Map getBindingForErrors(Exception e, boolean isError) { @@ -412,7 +461,7 @@ protected static Map getBindingForErrors(Exception e, boolean is // TODO: add request and response as parameter public static void serve500(Exception e, ChannelHandlerContext ctx, HttpRequest nettyRequest) { - + Logger.trace("serve500: begin"); HttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); nettyResponse.setHeader(SERVER, signature); @@ -441,8 +490,9 @@ public static void serve500(Exception e, ChannelHandlerContext ctx, HttpRequest } encoder.addCookie(c); nettyResponse.addHeader(SET_COOKIE, encoder.encode()); + //nettyResponse.setHeader("Cookie", encoder.encode()); } - //nettyResponse.setHeader("Cookie", encoder.encode()); + } catch (Exception exx) { Logger.error(e, "Trying to flush cookies"); // humm ? @@ -490,9 +540,11 @@ public static void serve500(Exception e, ChannelHandlerContext ctx, HttpRequest } throw new RuntimeException(exxx); } + Logger.trace("serve500: end"); } - public static void serveStatic(RenderStatic renderStatic, ChannelHandlerContext ctx, Request request, Response response, HttpRequest nettyRequest) { + public static void serveStatic(RenderStatic renderStatic, ChannelHandlerContext ctx, Request request, Response response, HttpRequest nettyRequest, MessageEvent e) { + Logger.trace("serveStatic: begin"); HttpResponse nettyResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(response.status)); nettyResponse.setHeader("Server", signature); try { @@ -516,26 +568,67 @@ public static void serveStatic(RenderStatic renderStatic, ChannelHandlerContext if (raw) { copyResponse(ctx, request, response, nettyRequest); } else { - addEtag(request, nettyResponse, file.getRealFile()); - nettyResponse.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(file.length())); - nettyResponse.setHeader(CONTENT_TYPE, (MimeTypes.getContentType(file.getName(), "text/plain"))); - ctx.getChannel().write(nettyResponse); - ChannelFuture writeFuture = ctx.getChannel().write(new ChunkedNioFile(file.getRealFile())); - writeFuture.addListener(ChannelFutureListener.CLOSE); + final File localFile = file.getRealFile(); + final boolean keepAlive = isKeepAlive(nettyRequest); + addEtag(request, nettyResponse, localFile); + + RandomAccessFile raf; + raf = new RandomAccessFile(localFile, "r"); + long fileLength = raf.length(); + + Logger.trace("file length " + fileLength); + Logger.trace("keep alive " + keepAlive); + Logger.trace("content type " + (MimeTypes.getContentType(localFile.getName(), "text/plain"))); + + if (isKeepAlive(nettyRequest)) { + setContentLength(nettyResponse, fileLength); + } + nettyResponse.setHeader(CONTENT_TYPE, (MimeTypes.getContentType(localFile.getName(), "text/plain"))); + + Channel ch = e.getChannel(); + + // Write the initial line and the header. + ch.write(nettyResponse); + + // Write the content. + ChannelFuture writeFuture; + + writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); +// +// // No encryption - use zero-copy. +// final FileRegion region = +// new DefaultFileRegion(raf.getChannel(), 0, fileLength); +// writeFuture = ch.write(region); +// writeFuture.addListener(new ChannelFutureProgressListener() { +// public void operationComplete(ChannelFuture future) { +// region.releaseExternalResources(); +// } +// +// public void operationProgressed( +// ChannelFuture future, long amount, long current, long total) { +// System.out.printf("%s: %d / %d (+%d)%n", localFile.getPath(), current, total, amount); +// +// } +// }); + if (!isKeepAlive(nettyRequest)) { + // Close the connection when the whole content is written out. + writeFuture.addListener(ChannelFutureListener.CLOSE); + } } } - } catch (Exception e) { - Logger.error(e, "serveStatic for request %s", request.method + " " + request.url); + } catch (Exception ez) { + Logger.error(ez, "serveStatic for request %s", request.method + " " + request.url); try { ChannelBuffer buf = ChannelBuffers.copiedBuffer("Internal Error (check logs)".getBytes("utf-8")); nettyResponse.setContent(buf); ChannelFuture future = ctx.getChannel().write(nettyResponse); future.addListener(ChannelFutureListener.CLOSE); } catch (Exception ex) { - Logger.error(e, "serveStatic for request %s", request.method + " " + request.url); + Logger.error(ez, "serveStatic for request %s", request.method + " " + request.url); } } + Logger.trace("serveStatic: end"); } public static boolean isModified(String etag, long last, Request request) { @@ -590,9 +683,19 @@ private static void addEtag(Request request, HttpResponse httpResponse, File fil if (useEtag) { httpResponse.setHeader(ETAG, etag); } - httpResponse.setHeader(CONTENT_TYPE, MimeTypes.getContentType(file.getName())); - httpResponse.setHeader(CONTENT_LENGTH, String.valueOf(file.length())); } } + public static boolean isKeepAlive(HttpMessage message) { + boolean close = + HttpHeaders.Values.CLOSE.equalsIgnoreCase(message.getHeader(HttpHeaders.Names.CONNECTION)) || + message.getProtocolVersion().equals(HttpVersion.HTTP_1_0) && + !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(message.getHeader(HttpHeaders.Names.CONNECTION)); + return !close; + } + + public static void setContentLength(HttpMessage message, long contentLength) { + message.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength)); + } + } diff --git a/src/play/modules/netty/Server.java b/src/play/modules/netty/Server.java index 402c132..d3ddaa2 100644 --- a/src/play/modules/netty/Server.java +++ b/src/play/modules/netty/Server.java @@ -41,6 +41,8 @@ public Server() { bootstrap.setPipelineFactory(new HttpServerPipelineFactory()); bootstrap.bind(new InetSocketAddress(address, httpPort)); + bootstrap.setOption("child.tcpNoDelay", true); + bootstrap.setOption("child.keepAlive", true); try { if (Play.mode == Mode.DEV) {