diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/EventHandler.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/EventHandler.java index 2971f0a59c..fe956e4d79 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/EventHandler.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/EventHandler.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Sonatype, Inc. All rights reserved. + * Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -163,8 +163,9 @@ public void onInitialLineParsed(HttpHeader httpHeader, FilterChainContext ctx) { } } } - final GrizzlyResponseStatus responseStatus = new GrizzlyResponseStatus((HttpResponsePacket) httpHeader, context.getRequest() - .getURI(), config); + final GrizzlyResponseStatus responseStatus = + new GrizzlyResponseStatus((HttpResponsePacket) httpHeader, + context.getRequest().getURI(), config); context.setResponseStatus(responseStatus); if (context.getStatusHandler() != null) { return; diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyAsyncHttpProvider.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyAsyncHttpProvider.java index 722a1b7e88..6949ca9be3 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyAsyncHttpProvider.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyAsyncHttpProvider.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012 Sonatype, Inc. All rights reserved. + * Copyright (c) 2012-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -73,6 +73,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.glassfish.grizzly.spdy.SpdyVersion; /** * A Grizzly 2.0-based implementation of {@link AsyncHttpProvider}. @@ -441,8 +442,9 @@ void timeout(final Connection c) { // ---------------------------------------------------------- Nested Classes private static final class ProtocolNegotiator implements ClientSideNegotiator { + private static final SpdyVersion[] SUPPORTED_SPDY_VERSIONS = + {SpdyVersion.SPDY_3_1, SpdyVersion.SPDY_3}; - private static final String SPDY = "spdy/3"; private static final String HTTP = "HTTP/1.1"; private final FilterChain spdyFilterChain; @@ -465,23 +467,31 @@ public boolean wantNegotiate(SSLEngine engine) { } @Override - public String selectProtocol(SSLEngine engine, LinkedHashSet strings) { - GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selectProtocol: " + strings); + public String selectProtocol(SSLEngine engine, LinkedHashSet protocols) { + GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selectProtocol: " + protocols); final Connection connection = NextProtoNegSupport.getConnection(engine); - // Give preference to SPDY/3. If not available, check for HTTP as a - // fallback - if (strings.contains(SPDY)) { - GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + SPDY); - SSLConnectionContext sslCtx = SSLUtils.getSslConnectionContext(connection); - sslCtx.setNewConnectionFilterChain(spdyFilterChain); - final SpdySession spdySession = new SpdySession(connection, false, spdyHandlerFilter); - spdySession.setLocalInitialWindowSize(spdyHandlerFilter.getInitialWindowSize()); - spdySession.setLocalMaxConcurrentStreams(spdyHandlerFilter.getMaxConcurrentStreams()); - Utils.setSpdyConnection(connection); - SpdySession.bind(connection, spdySession); - return SPDY; - } else if (strings.contains(HTTP)) { + // Give preference to SPDY/3.1 or SPDY/3. If not available, check for HTTP as a + // fallback + for (SpdyVersion version : SUPPORTED_SPDY_VERSIONS) { + final String versionDef = version.toString(); + if (protocols.contains(versionDef)) { + GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + versionDef); + SSLConnectionContext sslCtx = SSLUtils.getSslConnectionContext(connection); + sslCtx.setNewConnectionFilterChain(spdyFilterChain); + final SpdySession spdySession = + version.newSession(connection, false, spdyHandlerFilter); + + spdySession.setLocalStreamWindowSize(spdyHandlerFilter.getInitialWindowSize()); + spdySession.setLocalMaxConcurrentStreams(spdyHandlerFilter.getMaxConcurrentStreams()); + Utils.setSpdyConnection(connection); + SpdySession.bind(connection, spdySession); + + return versionDef; + } + } + + if (protocols.contains(HTTP)) { GrizzlyAsyncHttpProvider.LOGGER.info("ProtocolSelector::selecting: " + HTTP); // Use the default HTTP FilterChain. return HTTP; diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyResponseStatus.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyResponseStatus.java index 65eb7dd1ad..8aa2e02cb4 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyResponseStatus.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/GrizzlyResponseStatus.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2012 Sonatype, Inc. All rights reserved. + * Copyright (c) 2012-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -38,7 +38,8 @@ public class GrizzlyResponseStatus extends HttpResponseStatus { private final int majorVersion; private final int minorVersion; private final String protocolText; - + private final HttpResponsePacket response; + // ------------------------------------------------------------ Constructors public GrizzlyResponseStatus(final HttpResponsePacket response, final URI uri, AsyncHttpClientConfig config) { @@ -49,6 +50,8 @@ public GrizzlyResponseStatus(final HttpResponsePacket response, final URI uri, A majorVersion = response.getProtocol().getMajorVersion(); minorVersion = response.getProtocol().getMinorVersion(); protocolText = response.getProtocolString(); + + this.response = response; } // ----------------------------------------- Methods from HttpResponseStatus @@ -105,4 +108,11 @@ public int getProtocolMinorVersion() { public String getProtocolText() { return protocolText; } + + /** + * @return internal Grizzly {@link HttpResponsePacket} + */ + public HttpResponsePacket getResponse() { + return response; + } } diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/HttpTxContext.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/HttpTxContext.java index 8ca71b1ce3..72755014c7 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/HttpTxContext.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/HttpTxContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Sonatype, Inc. All rights reserved. + * Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -33,6 +33,10 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.asynchttpclient.providers.grizzly.filters.events.GracefulCloseEvent; +import org.glassfish.grizzly.Connection; +import org.glassfish.grizzly.filterchain.FilterChain; +import org.glassfish.grizzly.http.HttpResponsePacket; public final class HttpTxContext { @@ -62,10 +66,21 @@ public final class HttpTxContext { private HandShake handshake; private ProtocolHandler protocolHandler; private WebSocket webSocket; - private CloseListener listener = new CloseListener() { + private final CloseListener listener = new CloseListener() { @Override public void onClosed(Closeable closeable, CloseType type) throws IOException { - if (CloseType.REMOTELY.equals(type)) { + if (isGracefullyFinishResponseOnClose()) { + // Connection was closed. + // This event is fired only for responses, which don't have + // associated transfer-encoding or content-length. + // We have to complete such a request-response processing gracefully. + final Connection c = responseStatus.getResponse() + .getRequest().getConnection(); + final FilterChain fc = (FilterChain) c.getProcessor(); + + fc.fireEventUpstream(c, + new GracefulCloseEvent(HttpTxContext.this), null); + } else if (CloseType.REMOTELY.equals(type)) { abort(AsyncHttpProviderUtils.REMOTELY_CLOSED_EXCEPTION); } } @@ -252,6 +267,12 @@ public void setWebSocket(WebSocket webSocket) { this.webSocket = webSocket; } + private boolean isGracefullyFinishResponseOnClose() { + final HttpResponsePacket response = responseStatus.getResponse(); + return !response.getProcessingState().isKeepAlive() && + !response.isChunked() && response.getContentLength() == -1; + } + // ------------------------------------------------- Package Private Methods public HttpTxContext copy() { diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientEventFilter.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientEventFilter.java index 4de61df7d7..077437dd7b 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientEventFilter.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientEventFilter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Sonatype, Inc. All rights reserved. + * Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -21,6 +21,10 @@ import org.glassfish.grizzly.http.HttpHeader; import java.io.IOException; +import org.asynchttpclient.providers.grizzly.filters.events.GracefulCloseEvent; +import org.glassfish.grizzly.filterchain.FilterChainEvent; +import org.glassfish.grizzly.filterchain.NextAction; +import org.glassfish.grizzly.http.HttpResponsePacket; /** * Extension of the {@link HttpClientFilter} that is responsible for handling @@ -45,6 +49,28 @@ public AsyncHttpClientEventFilter(final EventHandler eventHandler, final int max this.eventHandler = eventHandler; } + @Override + public NextAction handleEvent(final FilterChainContext ctx, + final FilterChainEvent event) throws IOException { + if (event.type() == GracefulCloseEvent.class) { + // Connection was closed. + // This event is fired only for responses, which don't have + // associated transfer-encoding or content-length. + // We have to complete such a request-response processing gracefully. + final GracefulCloseEvent closeEvent = (GracefulCloseEvent) event; + final HttpResponsePacket response = closeEvent.getHttpTxContext() + .getResponseStatus().getResponse(); + response.getProcessingState().getHttpContext().attach(ctx); + + onHttpPacketParsed(response, ctx); + + return ctx.getStopAction(); + } + + return ctx.getInvokeAction(); + } + + @Override public void exceptionOccurred(FilterChainContext ctx, Throwable error) { eventHandler.exceptionOccurred(ctx, error); diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientFilter.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientFilter.java index 490e38776d..951a206951 100644 --- a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientFilter.java +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/AsyncHttpClientFilter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 Sonatype, Inc. All rights reserved. + * Copyright (c) 2013-2014 Sonatype, Inc. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -281,8 +281,9 @@ private boolean sendAsGrizzlyRequest(final RequestInfoHolder requestInfoHolder, sendingCtx = checkAndHandleFilterChainUpdate(ctx, sendingCtx); } final Connection c = ctx.getConnection(); + final HttpContext httpCtx; if (!Utils.isSpdyConnection(c)) { - HttpContext.newInstance(ctx, c, c, c); + httpCtx = HttpContext.newInstance(c, c, c, requestPacketLocal); } else { SpdySession session = SpdySession.get(c); final Lock lock = session.getNewClientStreamLock(); @@ -290,12 +291,16 @@ private boolean sendAsGrizzlyRequest(final RequestInfoHolder requestInfoHolder, lock.lock(); SpdyStream stream = session.openStream(requestPacketLocal, session.getNextLocalStreamId(), 0, 0, 0, false, !requestPacketLocal.isExpectContent()); - HttpContext.newInstance(ctx, stream, stream, stream); + httpCtx = HttpContext.newInstance(stream, stream, stream, requestPacketLocal); } finally { lock.unlock(); } } + httpCtx.attach(ctx); HttpTxContext.set(ctx, httpTxContext); + requestPacketLocal.getProcessingState().setHttpContext(httpCtx); + requestPacketLocal.setConnection(c); + return sendRequest(sendingCtx, request, requestPacketLocal); } diff --git a/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/events/GracefulCloseEvent.java b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/events/GracefulCloseEvent.java new file mode 100644 index 0000000000..35252b610b --- /dev/null +++ b/providers/grizzly/src/main/java/org/asynchttpclient/providers/grizzly/filters/events/GracefulCloseEvent.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2014 Sonatype, Inc. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package org.asynchttpclient.providers.grizzly.filters.events; + +import org.asynchttpclient.providers.grizzly.HttpTxContext; +import org.glassfish.grizzly.filterchain.FilterChainEvent; + +/** + * {@link FilterChainEvent} to gracefully complete the request-response processing + * when {@link Connection} is getting closed by the remote host. + * + * @since 1.8.7 + * @author The Grizzly Team + */ +public class GracefulCloseEvent implements FilterChainEvent { + private final HttpTxContext httpTxContext; + + public GracefulCloseEvent(HttpTxContext httpTxContext) { + this.httpTxContext = httpTxContext; + } + + public HttpTxContext getHttpTxContext() { + return httpTxContext; + } + + @Override + public Object type() { + return GracefulCloseEvent.class; + } +} diff --git a/providers/grizzly/src/test/java/org/asynchttpclient/providers/grizzly/GrizzlyFeedableBodyGeneratorTest.java b/providers/grizzly/src/test/java/org/asynchttpclient/providers/grizzly/GrizzlyFeedableBodyGeneratorTest.java new file mode 100644 index 0000000000..b80e6b724e --- /dev/null +++ b/providers/grizzly/src/test/java/org/asynchttpclient/providers/grizzly/GrizzlyFeedableBodyGeneratorTest.java @@ -0,0 +1,292 @@ +/* + * Copyright (c) 2013 Sonatype, Inc. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +package org.asynchttpclient.providers.grizzly; + +import org.asynchttpclient.AsyncCompletionHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.RequestBuilder; +import org.glassfish.grizzly.Buffer; +import org.glassfish.grizzly.http.server.HttpHandler; +import org.glassfish.grizzly.http.server.HttpServer; +import org.glassfish.grizzly.http.server.NetworkListener; +import org.glassfish.grizzly.http.server.Request; +import org.glassfish.grizzly.http.server.Response; +import org.glassfish.grizzly.memory.Buffers; +import org.glassfish.grizzly.ssl.SSLContextConfigurator; +import org.glassfish.grizzly.ssl.SSLEngineConfigurator; +import org.glassfish.grizzly.utils.Charsets; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.asynchttpclient.DefaultAsyncHttpClient; + +import static org.glassfish.grizzly.http.server.NetworkListener.DEFAULT_NETWORK_HOST; +import static org.glassfish.grizzly.memory.MemoryManager.DEFAULT_MEMORY_MANAGER; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; +import static org.testng.AssertJUnit.assertEquals; + +public class GrizzlyFeedableBodyGeneratorTest { + + private static final byte[] DATA = + "aAbBcCdDeEfFgGhHiIjJkKlLmMnNoOpPqQrRsStTuUvVwWxXyYzZ".getBytes(Charsets.ASCII_CHARSET); + private static final int TEMP_FILE_SIZE = 2 * 1024 * 1024; + private static final int NON_SECURE_PORT = 9991; + private static final int SECURE_PORT = 9992; + + + private HttpServer server; + private File tempFile; + + + // ------------------------------------------------------------------- Setup + + + @BeforeTest + public void setup() throws Exception { + generateTempFile(); + server = new HttpServer(); + NetworkListener nonSecure = + new NetworkListener("nonsecure", + DEFAULT_NETWORK_HOST, + NON_SECURE_PORT); + NetworkListener secure = + new NetworkListener("secure", + DEFAULT_NETWORK_HOST, + SECURE_PORT); + secure.setSecure(true); + secure.setSSLEngineConfig(createSSLConfig()); + server.addListener(nonSecure); + server.addListener(secure); + server.getServerConfiguration().addHttpHandler(new ConsumingHandler(), "/test"); + server.start(); + } + + + // --------------------------------------------------------------- Tear Down + + + @AfterTest + public void tearDown() { + if (!tempFile.delete()) { + tempFile.deleteOnExit(); + } + tempFile = null; + server.shutdownNow(); + server = null; + } + + + // ------------------------------------------------------------ Test Methods + + + @Test + public void testSimpleFeederMultipleThreads() throws Exception { + doSimpleFeeder(false); + } + + @Test + public void testSimpleFeederOverSSLMultipleThreads() throws Exception { + doSimpleFeeder(true); + } + + + // --------------------------------------------------------- Private Methods + + + private void doSimpleFeeder(final boolean secure) { + final int threadCount = 10; + final CountDownLatch latch = new CountDownLatch(threadCount); + final int port = (secure ? SECURE_PORT : NON_SECURE_PORT); + final String scheme = (secure ? "https" : "http"); + ExecutorService service = Executors.newFixedThreadPool(threadCount); + + AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder() + .setMaximumConnectionsPerHost(60) + .setMaximumConnectionsTotal(60) + .build(); + final AsyncHttpClient client = + new DefaultAsyncHttpClient(new GrizzlyAsyncHttpProvider(config), config); + final int[] statusCodes = new int[threadCount]; + final int[] totalsReceived = new int[threadCount]; + final Throwable[] errors = new Throwable[threadCount]; + for (int i = 0; i < threadCount; i++) { + final int idx = i; + service.execute(new Runnable() { + @Override + public void run() { + FeedableBodyGenerator generator = + new FeedableBodyGenerator(); + FeedableBodyGenerator.SimpleFeeder simpleFeeder = + new FeedableBodyGenerator.SimpleFeeder(generator) { + @Override + public void flush() throws IOException { + FileInputStream in = null; + try { + final byte[] bytesIn = new byte[2048]; + in = new FileInputStream(tempFile); + int read; + while ((read = in.read(bytesIn)) != -1) { + final Buffer b = + Buffers.wrap( + DEFAULT_MEMORY_MANAGER, + bytesIn, + 0, + read); + feed(b, false); + } + feed(Buffers.EMPTY_BUFFER, true); + } finally { + if (in != null) { + try { + in.close(); + } catch (IOException ignored) { + } + } + } + } + }; + generator.setFeeder(simpleFeeder); + generator.setMaxPendingBytes(10000); + + RequestBuilder builder = new RequestBuilder("POST"); + builder.setUrl(scheme + "://localhost:" + port + "/test"); + builder.setBody(generator); + try { + client.executeRequest(builder.build(), + new AsyncCompletionHandler() { + @Override + public org.asynchttpclient.Response onCompleted(org.asynchttpclient.Response response) + throws Exception { + try { + totalsReceived[idx] = Integer.parseInt(response.getHeader("x-total")); + } catch (Exception e) { + errors[idx] = e; + } + statusCodes[idx] = response.getStatusCode(); + latch.countDown(); + return response; + } + + @Override + public void onThrowable(Throwable t) { + errors[idx] = t; + t.printStackTrace(); + latch.countDown(); + } + }); + } catch (IOException e) { + errors[idx] = e; + latch.countDown(); + } + } + }); + } + + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + fail("Latch interrupted"); + } + + for (int i = 0; i < threadCount; i++) { + assertEquals(200, statusCodes[i]); + assertNull(errors[i]); + assertEquals(tempFile.length(), totalsReceived[i]); + } + } + + + private static SSLEngineConfigurator createSSLConfig() + throws Exception { + final SSLContextConfigurator sslContextConfigurator = + new SSLContextConfigurator(); + final ClassLoader cl = GrizzlyFeedableBodyGeneratorTest.class.getClassLoader(); + // override system properties + final URL cacertsUrl = cl.getResource("ssltest-cacerts.jks"); + if (cacertsUrl != null) { + sslContextConfigurator.setTrustStoreFile(cacertsUrl.getFile()); + sslContextConfigurator.setTrustStorePass("changeit"); + } + + // override system properties + final URL keystoreUrl = cl.getResource("ssltest-keystore.jks"); + if (keystoreUrl != null) { + sslContextConfigurator.setKeyStoreFile(keystoreUrl.getFile()); + sslContextConfigurator.setKeyStorePass("changeit"); + } + + return new SSLEngineConfigurator( + sslContextConfigurator.createSSLContext(), + false, false, false); + } + + + private void generateTempFile() throws IOException { + tempFile = File.createTempFile("feedable", null); + int total = 0; + byte[] chunk = new byte[1024]; + Random r = new Random(System.currentTimeMillis()); + FileOutputStream out = new FileOutputStream(tempFile); + while (total < TEMP_FILE_SIZE) { + for (int i = 0; i < chunk.length; i++) { + chunk[i] = DATA[r.nextInt(DATA.length)]; + } + out.write(chunk); + total += chunk.length; + } + out.flush(); + out.close(); + } + + + // ---------------------------------------------------------- Nested Classes + + + private static final class ConsumingHandler extends HttpHandler { + + + // -------------------------------------------- Methods from HttpHandler + + + @Override + public void service(Request request, Response response) + throws Exception { + int total = 0; + byte[] bytesIn = new byte[2048]; + InputStream in = request.getInputStream(); + int read; + while ((read = in.read(bytesIn)) != -1) { + total += read; + Thread.sleep(5); + } + response.addHeader("X-Total", Integer.toString(total)); + } + + } // END ConsumingHandler + +}