diff --git a/client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java b/client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java
index 7a850e5346..4381df5659 100755
--- a/client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java
+++ b/client/src/main/java/org/asynchttpclient/netty/handler/AsyncHttpClientHandler.java
@@ -84,32 +84,29 @@ public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exce
StreamedResponsePublisher publisher = (StreamedResponsePublisher) attribute;
- if (msg instanceof LastHttpContent) {
- // Remove the handler from the pipeline, this will trigger
- // it to finish
- ctx.pipeline().remove(publisher);
- // Trigger a read, just in case the last read complete
- // triggered no new read
- ctx.read();
- // Send the last content on to the protocol, so that it can
- // conclude the cleanup
- protocol.handle(channel, publisher.future(), msg);
-
- } else if (msg instanceof HttpContent) {
+ if(msg instanceof HttpContent) {
ByteBuf content = ((HttpContent) msg).content();
-
// Republish as a HttpResponseBodyPart
if (content.readableBytes() > 0) {
NettyResponseBodyPart part = config.getResponseBodyPartFactory().newResponseBodyPart(content, false);
ctx.fireChannelRead(part);
}
-
+ if (msg instanceof LastHttpContent) {
+ // Remove the handler from the pipeline, this will trigger
+ // it to finish
+ ctx.pipeline().remove(publisher);
+ // Trigger a read, just in case the last read complete
+ // triggered no new read
+ ctx.read();
+ // Send the last content on to the protocol, so that it can
+ // conclude the cleanup
+ protocol.handle(channel, publisher.future(), msg);
+ }
} else {
LOGGER.info("Received unexpected message while expecting a chunk: " + msg);
ctx.pipeline().remove((StreamedResponsePublisher) attribute);
Channels.setDiscard(channel);
}
-
} else if (attribute != DiscardEvent.INSTANCE) {
// unhandled message
LOGGER.debug("Orphan channel {} with attribute {} received message {}, closing", channel, attribute, msg);
diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServer.java b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServer.java
new file mode 100644
index 0000000000..b9a981c398
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServer.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.asynchttpclient.reactivestreams;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+
+
+public final class HttpStaticFileServer {
+ static private EventLoopGroup bossGroup;
+ static private EventLoopGroup workerGroup;
+
+ public static void start(int port) throws Exception {
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new HttpStaticFileServerInitializer());
+
+ b.bind(port).sync().channel();
+ System.err.println("Open your web browser and navigate to " +
+ ("http") + "://127.0.0.1:" + port + '/');
+ }
+
+ public static void shutdown() {
+ Future bossFuture = bossGroup.shutdownGracefully();
+ Future workerFuture = workerGroup.shutdownGracefully();
+ try {
+ bossFuture.await();
+ workerFuture.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerHandler.java b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerHandler.java
new file mode 100644
index 0000000000..ca36eba21d
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerHandler.java
@@ -0,0 +1,393 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.asynchttpclient.reactivestreams;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelProgressiveFuture;
+import io.netty.channel.ChannelProgressiveFutureListener;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpChunkedInput;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedFile;
+import io.netty.util.CharsetUtil;
+import io.netty.util.internal.SystemPropertyUtil;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+import java.util.TimeZone;
+import java.util.regex.Pattern;
+import javax.activation.MimetypesFileTypeMap;
+import org.asynchttpclient.test.TestUtils;
+
+import static io.netty.handler.codec.http.HttpHeaders.Names.*;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.*;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+
+/**
+ * A simple handler that serves incoming HTTP requests to send their respective
+ * HTTP responses. It also implements {@code 'If-Modified-Since'} header to
+ * take advantage of browser cache, as described in
+ * RFC 2616.
+ *
+ *
How Browser Caching Works
+ *
+ * Web browser caching works with HTTP headers as illustrated by the following
+ * sample:
+ *
+ * - Request #1 returns the content of {@code /file1.txt}.
+ * - Contents of {@code /file1.txt} is cached by the browser.
+ * - Request #2 for {@code /file1.txt} does return the contents of the
+ * file again. Rather, a 304 Not Modified is returned. This tells the
+ * browser to use the contents stored in its cache.
+ * - The server knows the file has not been modified because the
+ * {@code If-Modified-Since} date is the same as the file's last
+ * modified date.
+ *
+ *
+ *
+ * Request #1 Headers
+ * ===================
+ * GET /file1.txt HTTP/1.1
+ *
+ * Response #1 Headers
+ * ===================
+ * HTTP/1.1 200 OK
+ * Date: Tue, 01 Mar 2011 22:44:26 GMT
+ * Last-Modified: Wed, 30 Jun 2010 21:36:48 GMT
+ * Expires: Tue, 01 Mar 2012 22:44:26 GMT
+ * Cache-Control: private, max-age=31536000
+ *
+ * Request #2 Headers
+ * ===================
+ * GET /file1.txt HTTP/1.1
+ * If-Modified-Since: Wed, 30 Jun 2010 21:36:48 GMT
+ *
+ * Response #2 Headers
+ * ===================
+ * HTTP/1.1 304 Not Modified
+ * Date: Tue, 01 Mar 2011 22:44:28 GMT
+ *
+ *
+ */
+public class HttpStaticFileServerHandler extends SimpleChannelInboundHandler {
+
+ public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
+ public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";
+ public static final int HTTP_CACHE_SECONDS = 60;
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+ if (!request.getDecoderResult().isSuccess()) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ final String uri = request.getUri();
+ final String path = sanitizeUri(uri);
+ if (path == null) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ File file = new File(path);
+ if (file.isHidden() || !file.exists()) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+
+ if (file.isDirectory()) {
+ if (uri.endsWith("/")) {
+ sendListing(ctx, file);
+ } else {
+ sendRedirect(ctx, uri + '/');
+ }
+ return;
+ }
+
+ if (!file.isFile()) {
+ sendError(ctx, FORBIDDEN);
+ return;
+ }
+
+ // Cache Validation
+ String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE);
+ if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
+
+ // Only compare up to the second because the datetime format we send to the client
+ // does not have milliseconds
+ long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
+ long fileLastModifiedSeconds = file.lastModified() / 1000;
+ if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
+ sendNotModified(ctx);
+ return;
+ }
+ }
+
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file, "r");
+ } catch (FileNotFoundException ignore) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ long fileLength = raf.length();
+
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ HttpHeaders.setContentLength(response, fileLength);
+ setContentTypeHeader(response, file);
+ setDateAndCacheHeaders(response, file);
+ if (HttpHeaders.isKeepAlive(request)) {
+ response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ }
+
+ // Write the initial line and the header.
+ ctx.write(response);
+
+ // Write the content.
+ ChannelFuture sendFileFuture;
+ ChannelFuture lastContentFuture;
+ if (ctx.pipeline().get(SslHandler.class) == null) {
+ sendFileFuture =
+ ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
+ // Write the end marker.
+ lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+ } else {
+ sendFileFuture =
+ ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+ ctx.newProgressivePromise());
+ // HttpChunkedInput will write the end marker (LastHttpContent) for us.
+ lastContentFuture = sendFileFuture;
+ }
+
+ sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
+ @Override
+ public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
+ if (total < 0) { // total unknown
+ System.err.println(future.channel() + " Transfer progress: " + progress);
+ } else {
+ System.err.println(future.channel() + " Transfer progress: " + progress + " / " + total);
+ }
+ }
+
+ @Override
+ public void operationComplete(ChannelProgressiveFuture future) {
+ System.err.println(future.channel() + " Transfer complete.");
+ }
+ });
+
+ // Decide whether to close the connection or not.
+ if (!HttpHeaders.isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ if (ctx.channel().isActive()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
+
+ private static String sanitizeUri(String uri) {
+ // Decode the path.
+ try {
+ uri = URLDecoder.decode(uri, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new Error(e);
+ }
+
+ if (uri.isEmpty() || uri.charAt(0) != '/') {
+ return null;
+ }
+
+ // Convert file separators.
+ uri = uri.replace('/', File.separatorChar);
+
+ // Simplistic dumb security check.
+ // You will have to do something serious in the production environment.
+ if (uri.contains(File.separator + '.') ||
+ uri.contains('.' + File.separator) ||
+ uri.charAt(0) == '.' || uri.charAt(uri.length() - 1) == '.' ||
+ INSECURE_URI.matcher(uri).matches()) {
+ return null;
+ }
+
+ // Convert to absolute path.
+ return TestUtils.TMP_DIR + File.separator + uri;
+ }
+
+ private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
+
+ private static void sendListing(ChannelHandlerContext ctx, File dir) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
+ response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
+
+ String dirPath = dir.getPath();
+ StringBuilder buf = new StringBuilder()
+ .append("\r\n")
+ .append("")
+ .append("Listing of: ")
+ .append(dirPath)
+ .append("\r\n")
+
+ .append("Listing of: ")
+ .append(dirPath)
+ .append("
\r\n")
+
+ .append("")
+ .append("- ..
\r\n");
+
+ for (File f: dir.listFiles()) {
+ if (f.isHidden() || !f.canRead()) {
+ continue;
+ }
+
+ String name = f.getName();
+ if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
+ continue;
+ }
+
+ buf.append("- ")
+ .append(name)
+ .append("
\r\n");
+ }
+
+ buf.append("
\r\n");
+ ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
+ response.content().writeBytes(buffer);
+ buffer.release();
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
+ response.headers().set(LOCATION, newUri);
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ FullHttpResponse response = new DefaultFullHttpResponse(
+ HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
+ response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * When file timestamp is the same as what the browser is sending up, send a "304 Not Modified"
+ *
+ * @param ctx
+ * Context
+ */
+ private static void sendNotModified(ChannelHandlerContext ctx) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
+ setDateHeader(response);
+
+ // Close the connection as soon as the error message is sent.
+ ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ }
+
+ /**
+ * Sets the Date header for the HTTP response
+ *
+ * @param response
+ * HTTP response
+ */
+ private static void setDateHeader(FullHttpResponse response) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
+
+ Calendar time = new GregorianCalendar();
+ response.headers().set(DATE, dateFormatter.format(time.getTime()));
+ }
+
+ /**
+ * Sets the Date and Cache headers for the HTTP Response
+ *
+ * @param response
+ * HTTP response
+ * @param fileToCache
+ * file to extract content type
+ */
+ private static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
+ SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
+ dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));
+
+ // Date header
+ Calendar time = new GregorianCalendar();
+ response.headers().set(DATE, dateFormatter.format(time.getTime()));
+
+ // Add cache headers
+ time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
+ response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
+ response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
+ response.headers().set(
+ LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
+ }
+
+ /**
+ * Sets the content type header for the HTTP Response
+ *
+ * @param response
+ * HTTP response
+ * @param file
+ * file to extract content type
+ */
+ private static void setContentTypeHeader(HttpResponse response, File file) {
+ MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
+ response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
+ }
+}
diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerInitializer.java b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerInitializer.java
new file mode 100644
index 0000000000..741a2dbd47
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/reactivestreams/HttpStaticFileServerInitializer.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2012 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.asynchttpclient.reactivestreams;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+
+public class HttpStaticFileServerInitializer extends ChannelInitializer {
+
+ public HttpStaticFileServerInitializer() {
+ }
+
+ @Override
+ public void initChannel(SocketChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new HttpServerCodec());
+ pipeline.addLast(new HttpObjectAggregator(65536));
+ pipeline.addLast(new ChunkedWriteHandler());
+ pipeline.addLast(new HttpStaticFileServerHandler());
+ }
+}
diff --git a/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java
new file mode 100644
index 0000000000..e8e89bf8d7
--- /dev/null
+++ b/client/src/test/java/org/asynchttpclient/reactivestreams/ReactiveStreamsDownLoadTest.java
@@ -0,0 +1,165 @@
+package org.asynchttpclient.reactivestreams;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.HttpResponseBodyPart;
+import org.asynchttpclient.HttpResponseHeaders;
+import org.asynchttpclient.HttpResponseStatus;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.handler.StreamedAsyncHandler;
+import org.asynchttpclient.test.TestUtils;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ReactiveStreamsDownLoadTest {
+ private int serverPort = 8080;
+ private File largeFile;
+ private File smallFile;
+ @BeforeClass(alwaysRun = true)
+ public void setUpBeforeTest() throws Exception {
+ largeFile = TestUtils.createTempFile(15 * 1024);
+ smallFile = TestUtils.createTempFile(20);
+ HttpStaticFileServer.start(serverPort);
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDown() throws Exception {
+ HttpStaticFileServer.shutdown();
+ }
+
+ @Test
+ public void streamedResponseLargeFileTest() throws Throwable {
+ AsyncHttpClient c = new DefaultAsyncHttpClient();
+ String largeFileName = "http://127.0.0.1:" + serverPort + "/" + largeFile.getName();
+ ListenableFuture future = c.prepareGet(largeFileName)
+ .execute(new SimpleStreamedAsyncHandler());
+ byte[] result = future.get().getBytes();
+ System.out.println("Result file size: " + result.length);
+ //assert(result.length == largeFile.length());
+ }
+
+ @Test
+ public void streamedResponseSmallFileTest() throws Throwable {
+ AsyncHttpClient c = new DefaultAsyncHttpClient();
+ String smallFileName = "http://127.0.0.1:" + serverPort + "/" + smallFile.getName();
+ ListenableFuture future = c.prepareGet(smallFileName)
+ .execute(new SimpleStreamedAsyncHandler());
+ byte[] result = future.get().getBytes();
+ System.out.println("Result file size: " + result.length);
+ //assert(result.length == smallFile.length());
+ assert(result.length > 0);
+ }
+
+ static protected class SimpleStreamedAsyncHandler implements StreamedAsyncHandler {
+ private final SimpleSubscriber subscriber;
+
+ public SimpleStreamedAsyncHandler() {
+ this(new SimpleSubscriber());
+ }
+
+ public SimpleStreamedAsyncHandler(SimpleSubscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+ @Override
+ public State onStream(Publisher publisher) {
+ System.out.println("SimpleStreamedAsyncHandleronCompleted onStream");
+ publisher.subscribe(subscriber);
+ return State.CONTINUE;
+ }
+
+ @Override
+ public void onThrowable(Throwable t) {
+ throw new AssertionError(t);
+ }
+
+ @Override
+ public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+ System.out.println("SimpleStreamedAsyncHandleronCompleted onBodyPartReceived");
+ throw new AssertionError("Should not have received body part");
+ }
+
+ @Override
+ public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+ return State.CONTINUE;
+ }
+
+ @Override
+ public State onHeadersReceived(HttpResponseHeaders headers) throws Exception {
+ return State.CONTINUE;
+ }
+
+ @Override
+ public SimpleStreamedAsyncHandler onCompleted() throws Exception {
+ System.out.println("SimpleStreamedAsyncHandleronCompleted onSubscribe");
+ return this;
+ }
+
+ public byte[] getBytes() throws Throwable {
+ List bodyParts = subscriber.getElements();
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ for (HttpResponseBodyPart part : bodyParts) {
+ bytes.write(part.getBodyPartBytes());
+ }
+ return bytes.toByteArray();
+ }
+ }
+
+ /**
+ * Simple subscriber that requests and buffers one element at a time.
+ */
+ static protected class SimpleSubscriber implements Subscriber {
+ private volatile Subscription subscription;
+ private volatile Throwable error;
+ private final List elements = Collections.synchronizedList(new ArrayList());
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ System.out.println("SimpleSubscriber onSubscribe");
+ this.subscription = subscription;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(T t) {
+ System.out.println("SimpleSubscriber onNext");
+ elements.add(t);
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ System.out.println("SimpleSubscriber onError");
+ this.error = error;
+ latch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ System.out.println("SimpleSubscriber onComplete");
+ latch.countDown();
+ }
+
+ public List getElements() throws Throwable {
+ latch.await();
+ if (error != null) {
+ throw error;
+ } else {
+ return elements;
+ }
+ }
+ }
+
+}
diff --git a/client/src/test/java/org/asynchttpclient/test/TestUtils.java b/client/src/test/java/org/asynchttpclient/test/TestUtils.java
index 3c0d95be25..2cd9be945d 100644
--- a/client/src/test/java/org/asynchttpclient/test/TestUtils.java
+++ b/client/src/test/java/org/asynchttpclient/test/TestUtils.java
@@ -66,7 +66,7 @@ public class TestUtils {
public static final String ADMIN = "admin";
public static final String TEXT_HTML_CONTENT_TYPE_WITH_UTF_8_CHARSET = "text/html; charset=UTF-8";
public static final String TEXT_HTML_CONTENT_TYPE_WITH_ISO_8859_1_CHARSET = "text/html; charset=ISO-8859-1";
- private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"), "ahc-tests-" + UUID.randomUUID().toString().substring(0, 8));
+ public static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"), "ahc-tests-" + UUID.randomUUID().toString().substring(0, 8));
public static final byte[] PATTERN_BYTES = "FooBarBazQixFooBarBazQixFooBarBazQixFooBarBazQixFooBarBazQixFooBarBazQix".getBytes(Charset.forName("UTF-16"));
public static final File LARGE_IMAGE_FILE;
public static final byte[] LARGE_IMAGE_BYTES;