Issue/544 reactive streams support #963

Merged
merged 1 commit into from Sep 7, 2015

Projects

None yet

4 participants

@dotta
Contributor
dotta commented Sep 1, 2015

Here is an initial proposal to fix #544.

I'm entirely new to both Netty and AHC, so if you notice anything weird it's very likely that I haven't understood something :-)

Other than that, something is puzzling me with the tests I wrote. Basically, if I change the CHUNK_SIZE in https://github.com/AsyncHttpClient/async-http-client/compare/AsyncHttpClient:master...dotta:issue/544-reactive-streams-support?expand=1#diff-5133f4e3fed98f7a99081a1ae643c4c0R104 to a value around 20000 (or lower), the tests start to fail because there is a mismatch between the received bytes and the expected ones. I honestly have no good explanation for this, but I thought it may have something to do with the internals of Netty (or maybe a config flag?). I'd really appreciate it if someone more experienced with Netty could have a look.
EDIT: This issue is now fixed after using RxJava instead of providing my own implementation of Publisher in the test :-)

This PR also includes @jroper commit to add support for handling response bodies.

@slandelle slandelle and 1 other commented on an outdated diff Sep 1, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ public long getContentLength() {
+ return -1L;
+ }
+
+ @Override
+ public State read(ByteBuffer buffer) throws IOException {
+ boolean isCompleted = subscriber.readChunk(buffer);
+ return (isCompleted) ? State.Stop : State.Continue;
+ }
+ }
+
+ private static class QueueBackedSubscriber implements Subscriber<ByteBuffer> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueueBackedSubscriber.class);
+
+ private final BlockingQueue<ByteBuffer> queue;
@slandelle
slandelle Sep 1, 2015 Contributor

Blocking, really?

@dotta
dotta Sep 1, 2015 Contributor

The alternative is queuing up elements in the subscriber, but the risk with that is overflowing the queue (dropping elements is even worse) when the consumer is much slower than the producer. Also, note this implementation is only used when the provider doesn't support reactive streams.

@slandelle
slandelle Sep 1, 2015 Contributor

Also, note this implementation is only used when the provider doesn't support reactive streams.

I'd rather throw an UnsupportedOperationException then. :)

@dotta
dotta Sep 1, 2015 Contributor

Mmm, I'm looking at the implementation of FeedableBodyGenerator and realise I probably don't need blocking. Forgive me :-)

@slandelle slandelle and 1 other commented on an outdated diff Sep 1, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ publisher.subscribe(subscriber);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // noop
+ }
+
+ @Override
+ public long getContentLength() {
+ return -1L;
+ }
+
+ @Override
+ public State read(ByteBuffer buffer) throws IOException {
+ boolean isCompleted = subscriber.readChunk(buffer);
@slandelle
slandelle Sep 1, 2015 Contributor

Correct me if I'm wrong, but your subscriber is going to block if there's nothing in the queue, causing the reading thread to block.

@dotta
dotta Sep 1, 2015 Contributor

That is avoided by adding EMPTY in the queue on stream completion (see https://github.com/AsyncHttpClient/async-http-client/pull/963/files#diff-2fcc9d1538924b58a722dbc03fe9c2b6R116). I agree this is a bit convoluted, I'll think if there is a better way but I'm open to suggestions.

@slandelle
Contributor

Hi Mirko,

I think your ReactiveStreamBodyGenerator is broken. Are you sure it cannot be implemented on top of FeedableBodyGenerator?

@slandelle slandelle and 2 others commented on an outdated diff Sep 1, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ lastElement = queue.take();
+
+ // we request another element only if previous one was fully consumed
+ if(completed.get()) subscription.cancel();
+ else subscription.request(1);
+ }
+
+ // If we are at the end of the stream, then this condition must be true.
+ // In fact, if we have received an empty chunk, but the stream is not completed, then we handle it
+ // as any other chunk.
+ if(lastElement == EMPTY && completed.get()) return true;
+
+ // don't overflow the passed buffer
+ if(buffer.remaining() <= lastElement.remaining()) {
+ int bufferSize = buffer.remaining();
+ buffer.put(lastElement.array(), lastElement.position(), bufferSize);
@slandelle
slandelle Sep 1, 2015 Contributor

What if the buffer doesn't have an array?

@dotta
dotta Sep 1, 2015 Contributor

Oh, is that possible? Any idea how I should handle that? (should I create the array myself, or fail?)

@slandelle
slandelle Sep 1, 2015 Contributor

Oh, is that possible?

Of course: DirectBuffers, composites, etc. That's what hasArray() is for.

See FeedableBodyGenerator, you can put a ByteBuffer into another.

@hepin1989
hepin1989 Sep 1, 2015

@dotta the netty in action cover this:).composites could have array if it have only one element and the element have array or it was empty.

@slandelle
slandelle Sep 1, 2015 Contributor

@hepin1989 Don't confuse Netty's ByteBuf and Java's ByteBuffer :)

@hepin1989
hepin1989 Sep 1, 2015

@slandelle haha,sorry,I don't see the whole context:).

@slandelle slandelle commented on an outdated diff Sep 1, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ int newPos = lastElement.position() + bufferSize;
+ lastElement.position(newPos); // mark next position from where to start reading
+ }
+ else { // the buffer has enough space to contain the popped element
+ buffer.put(lastElement);
+ lastElement = null;
+ }
+ return false;
+ } catch (InterruptedException ie) {
+ LOGGER.debug("Queue was interrupted while waiting for next element.", ie);
+ subscription.cancel();
+ return true; // returning true will provoke the channel handler to stop
+ }
+ }
+ }
+}
@slandelle
slandelle Sep 1, 2015 Contributor

empty line

@dotta
Contributor
dotta commented Sep 1, 2015

Hi @slandelle, thanks a lot for the feedback!

I think your ReactiveStreamBodyGenerator is broken. Are you sure it cannot be implemented on top of FeedableBodyGenerator?

Do you mean the implementation of ReactiveStreamBodyGenerator.StreamedBody? After looking at the FeedableBodyGenerator.PushBody, I feel like I should be able to reuse it. I'll have a deeper look now.

@slandelle
Contributor

Do you mean the implementation of ReactiveStreamBodyGenerator.StreamedBody? After looking at the FeedableBodyGenerator.PushBody, I feel like I should be able to reuse it. I'll have a deeper look now.

Yeah, they serve the same purpose, so I guess existing code could be reuse/extended/wrapped. FeedableBodyGenerator.PushBody currently has a known issue that the reading buffer has to be larger than the payload, but that's something that can/should be fixed.

@dotta dotta and 1 other commented on an outdated diff Sep 1, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ }
+ else {
+ subscription = s;
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onNext(ByteBuffer t) {
+ if (t == null) throw null;
+ try {
+ feeder.feed(t, false);
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ subscription.request(1);
@dotta
dotta Sep 1, 2015 Contributor

@jroper I'm not so convinced about requesting only one element at a time. Any thought?

@jroper
jroper Sep 2, 2015 Contributor

Considering this is a fallback implementation, I think it's fine. Another option, seeing as the provider is going to end up buffering infinitely anyway, is you can invoke subscription.request(Long.MAX_VALUE) in onSubscribe, this is allowed according to reactive streams spec 3.17.

@dotta
Contributor
dotta commented Sep 1, 2015

@slandelle I think you should be more pleased now (I for sure am :-)).

@slandelle slandelle and 1 other commented on an outdated diff Sep 1, 2015
...nt/request/body/generator/IFeedableBodyGenerator.java
@@ -0,0 +1,11 @@
+package org.asynchttpclient.request.body.generator;
+
+public interface IFeedableBodyGenerator {
@slandelle
slandelle Sep 1, 2015 Contributor

For consistency, I'd rather remove the I prefix and prefix the default implementation with Simple (or better if you can think of something).

@dotta
dotta Sep 1, 2015 Contributor

👍

@slandelle slandelle commented on the diff Sep 1, 2015
...a/org/asynchttpclient/netty/handler/HttpProtocol.java
@@ -400,6 +401,20 @@ private boolean exitAfterHandlingHeaders(Channel channel, NettyResponseFuture<?>
return false;
}
+ private boolean exitAfterHandlingReactiveStreams(Channel channel, NettyResponseFuture<?> future, HttpResponse response, AsyncHandler<?> handler) throws IOException {
+ if (handler instanceof StreamedAsyncHandler) {
@slandelle
slandelle Sep 1, 2015 Contributor

Anyone missing pattern matching, raise a hand!

@jroper
jroper Sep 2, 2015 Contributor

@slandelle slandelle commented on the diff Sep 1, 2015
...a/org/asynchttpclient/netty/handler/HttpProtocol.java
@@ -425,7 +440,8 @@ private boolean handleHttpResponse(final HttpResponse response, final Channel ch
exitAfterHandlingRedirect(channel, future, response, request, statusCode, realm) || //
exitAfterHandlingConnect(channel, future, request, proxyServer, statusCode, httpRequest) || //
exitAfterHandlingStatus(channel, future, response, handler, status) || //
- exitAfterHandlingHeaders(channel, future, response, handler, responseHeaders);
+ exitAfterHandlingHeaders(channel, future, response, handler, responseHeaders) ||
+ exitAfterHandlingReactiveStreams(channel, future, response, handler);
@slandelle
slandelle Sep 1, 2015 Contributor

Anyone missing monadic composition, raise a hand...

@jroper
jroper Sep 2, 2015 Contributor

@slandelle slandelle commented on the diff Sep 1, 2015
...java/org/asynchttpclient/netty/handler/Processor.java
} else if (attribute instanceof NettyResponseFuture) {
NettyResponseFuture<?> future = (NettyResponseFuture<?>) attribute;
protocol.handle(channel, future, msg);
+ } else if (attribute instanceof StreamedResponsePublisher) {
@slandelle
slandelle Sep 1, 2015 Contributor

I missed this one. I suspect not having a NettyResponseFuture attribute would lead to tons of issues with retry and exception handling.

@slandelle
slandelle Sep 1, 2015 Contributor

I think the StreamedResponsePublisher could be stored in the NettyResponseFuture.
Yes, I know it's a huge pile of junk, better design suggestions mostly welcome.

@jroper
jroper Sep 2, 2015 Contributor

If you look in the exception handling below on line 190 (the diff doesn't make it obvious that the code I added is part of the exception handling), I have handled it - the exception gets sent to the stream as an error, and then the future is sent to requestSender.abort.

As to whether it should be replayed or not - in Play for example, if it fails during the stream, we never want it to be replayed. Why? Because our API essentially looks something like this:

Future<ResponseHeader, Publisher<ByteBuffer>> executeStream();

This future will be redeemed when onStream is invoked. All subsequent calls to onStream will be ignored - since there is literally nothing we can do with them. And, to imagine the most common use case for this, where you're streaming a response body out to another response body (acting essentially as a proxy), if the stream fails halfway through, what are you going to do? You can only forward the failure to the client. You can't retry, because your opportunity to send a response has gone. Basically, if you're handling the response as a stream, then this very likely means that consuming the body is side effecting, in which case, it's very unlikely that you want to retry, in fact it could often be dangerous - this is a very different thing to, for example, buffering the body in memory, where there are no negative consequences to retrying.

@slandelle
slandelle Sep 2, 2015 Contributor

Actually, there's 2 cases where retry makes sense. Sadly, AHC doesn't currently tell them appart, as it was originally intended to deal with file downloading.

  1. The request was sent, and the connection crashed while the response body chunks were being received.
  2. The request was not sent, because the connection crashed first.

The second case typically happens when reusing a pooled connection, and the remote peer closes it (keep-alive idle timeout, server being overloaded). We do check that the connection is alive when pooling it, and just before writing, but there's always a time window where this could happen. Under heavy load, this happens all the time, as soon as the server implements idle timeout. Browsers do implement a retry to deal with such use case. AHC has a maxRetry default value of 4.

Note that you can be notified of a retry when you implement AsyncHandlerExtensions, such you can clean/reset chunks that you've received so far.

What's the expected behavior of reactive-streams when the connection crashes? Is it up to the user to implement retry logic?

@jroper
jroper Sep 3, 2015 Contributor

In this case the stream will simply be notified with the exception via Subscriber.onError.

If we did attach the publisher to the future, I think we'd have to make a mutable field in NettyResponseFuture. It appeared to me that everything in NettyResponseFuture is immutable, and I assumed there was a reason for that - by keeping the only mutable thing the channel attribute, that makes it easier to reason about the state of the system.

On the two crash conditions - the second one is not impacted here. It's only once we start receiving a body that we change the attribute, before that, all the handling is the same. For the first crash condition, I think it would be quite straight forward to modify the exception handling such that connections are still retried. Note that unlike other AsyncHandlers, the StreamedResponsePublisher changes the state of the pipeline (inserts a handler), and requires events, such as the exception itself, to be forwarded down the channel so that the subscriber that's handling the body receives them. This is why I think the handling for this should (or must) be in the Processor, it requires custom handling at that level.

I'll have a closer look at fixing the exception handling to implement retry - I've realised a StreamedAsyncHandler can always return ABORT from onStream if it's invoked a second time, and that's what we'll do in Play, so I don't think retrying is that big an issue.

@slandelle
slandelle Sep 3, 2015 Contributor

The second one is the real important one. It would be great if AHC could tell them appart, maybe some day...

Please let me know when you guys are done so I can merge.

Thanks a bunch!

@jroper
jroper Sep 4, 2015 Contributor

@dotta I believe https://github.com/dotta/async-http-client/blob/issue/544-reactive-streams-support/providers/netty4/src/main/java/org/asynchttpclient/netty/handler/Processor.java#L177 is where the retry is initiated, and that code is only executed if the attribute is a NettyResponseFuture, so that's where we need to change things.

@dotta
dotta Sep 4, 2015 Contributor

@jroper I've tried to write a test for it, but haven't succeeded. Is the following what you had in mind to fix it? dotta@eab7a33 \cc @slandelle

@dotta
dotta Sep 5, 2015 Contributor

Is there a way to emulate a connection crash? As said, I've tried to write a test for this, but I can't make the connection crash (so that I can confirm the retry logic is run). My attempt was to call channel.close().await(), but that blocks indefinitely.

@dotta
dotta Sep 5, 2015 Contributor

By the way, I was calling await() to force Processor.exceptionCaught to be called. If I don't call await, then of course execution continues, but the Processor.exceptionCaught method is still never called.

@dotta
dotta Sep 7, 2015 Contributor

Alright, I've managed to write a test, see dotta@a373a21 (not really a straightforward and easy-to-read one. I'm all hears if there is a better way of writing it).

@slandelle
slandelle Sep 7, 2015 Contributor

Yeah, those are not easy to test... What I did in the past was to do a kind of load test to try to reproduce.

@jroper jroper and 2 others commented on an outdated diff Sep 2, 2015
...ain/java/org/asynchttpclient/BoundRequestBuilder.java
@@ -93,6 +97,16 @@ public BoundRequestBuilder setBody(String data) {
}
@Override
+ public BoundRequestBuilder setBody(Publisher<ByteBuffer> publisher) {
+ return super.setBody(new ReactiveStreamBodyGenerator(client.getConfig().getExecutorService(), publisher));
+ }
+
+ @Override
+ public BoundRequestBuilder setBody(BodyGenerator bodyGenerator) {
+ return super.setBody(bodyGenerator);
+ }
@jroper
jroper Sep 2, 2015 Contributor

This isn't needed to be overridden since it just delegates straight to super is it?

@dotta
dotta Sep 2, 2015 Contributor

I've done so for consistency with regard to other methods overridden in this same source. See this comment https://github.com/dotta/async-http-client/blob/issue/544-reactive-streams-support/api/src/main/java/org/asynchttpclient/BoundRequestBuilder.java#L49-L52 for context.

@slandelle
slandelle Sep 2, 2015 Contributor

I'm going to remove all of those from master:

  • CLJ-126 seems to be related to Java 5, that we don't support.
  • CLJ-259 has been open in 2010. Come on, 5 years to come up with a fix!
  • https://github.com/cch1/http.async.client has just been upgraded to AHC 1.8. So they won't upgrade to AHC 2 any time soon.
  • I just hate it with my guts when people push their own mess to others.
@slandelle
slandelle Sep 2, 2015 Contributor

@dotta Please remove those too.

@dotta
dotta Sep 2, 2015 Contributor

👍

@jroper jroper and 1 other commented on an outdated diff Sep 2, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
+ // If someone has made a mistake and added this Subscriber multiple times, let's handle it gracefully
+ if (this.subscription != null) {
+ s.cancel(); // Cancel the additional subscription
+ }
+ else {
+ subscription = s;
+ subscription.request(1);
+ }
+ }
+
+ @Override
+ public void onNext(ByteBuffer t) {
+ if (t == null) throw null;
+ try {
+ feeder.feed(t, false);
+ executor.execute(new Runnable() {
@jroper
jroper Sep 2, 2015 Contributor

This is not necessary, reactive streams spec 3.3 says it's the responsibility of Subscription.request to put a bound on the recursion, so it is allowed to call request from onNext as much as you like.

@dotta
dotta Sep 2, 2015 Contributor

Got it, thanks.

@jroper jroper and 1 other commented on an outdated diff Sep 2, 2015
...lient/netty/request/body/NettyReactiveStreamBody.java
+ return null;
+ }
+
+ @Override
+ public void write(Channel channel, NettyResponseFuture<?> future) throws IOException {
+ if (future.isStreamWasAlreadyConsumed()) {
+ LOGGER.warn("Stream has already been consumed and cannot be reset");
+ } else {
+ future.setStreamWasAlreadyConsumed(true);
+ NettySubscriber subscriber = new NettySubscriber(channel);
+ channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
+ publisher.subscribe(new SubscriberAdapter(subscriber));
+ }
+ }
+
+ private static class SubscriberAdapter implements Subscriber<ByteBuffer> {
@jroper
jroper Sep 2, 2015 Contributor

A better "Netty" way to implement this would be to insert another handler in the chain, just before the HandlerSubscriber that intercepts calls to write, and converts it to a DefaultHttpContent.

@dotta
dotta Sep 2, 2015 Contributor

But I have a publisher of type Publisher<ByteBuffer> that wants to subscribe to the NettySubscriber. See https://github.com/AsyncHttpClient/async-http-client/pull/963/files#diff-201ec22bdb5bc3211518d720cfce85d3R67

@jroper jroper and 1 other commented on an outdated diff Sep 2, 2015
...lient/netty/request/body/NettyReactiveStreamBody.java
+ subscriber.onComplete();
+ }
+ }
+
+ private static class NettySubscriber extends HandlerSubscriber<HttpContent> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class);
+
+ private final Channel channel;
+
+ public NettySubscriber(Channel channel) {
+ super(channel.eventLoop());
+ this.channel = channel;
+ }
+
+ @Override
+ public void onComplete() {
@jroper
jroper Sep 2, 2015 Contributor

HandlerSubscriber actually provides protected complete() for the purposes of overriding. If you override that, then lets say in future we find a bug that needs to be fixed by doing some internal processing in onComplete, overriding complete won't interfere with that. A good example of this is that currently we let netty buffer all writes, but maybe in future we might find that we need to do our own buffering, in which case, our onComplete handling will need to flush before invoking complete, but if we change that this will break, since it will bypass the flush.

@dotta
dotta Sep 2, 2015 Contributor

I did not notice complete and error where protected. Thanks!

@jroper jroper commented on an outdated diff Sep 2, 2015
...lient/netty/request/body/NettyReactiveStreamBody.java
+ EventExecutor executor = channel.eventLoop();
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ removeFromPipeline();
+ }
+ });
+ }
+ });
+ }
+
+ @Override
+ public void onError(Throwable error) {
@jroper
jroper Sep 2, 2015 Contributor

Same with onComplete, HandlerSubscriber provides an error method for the purposes of overriding.

@jroper jroper and 1 other commented on an outdated diff Sep 2, 2015
...lient/netty/request/body/NettyReactiveStreamBody.java
+ @Override
+ public String getContentType() {
+ return null;
+ }
+
+ @Override
+ public void write(Channel channel, NettyResponseFuture<?> future) throws IOException {
+ if (future.isStreamWasAlreadyConsumed()) {
+ LOGGER.warn("Stream has already been consumed and cannot be reset");
+ } else {
+ future.setStreamWasAlreadyConsumed(true);
+ NettySubscriber subscriber = new NettySubscriber(channel);
+ channel.pipeline().addLast(NAME_IN_CHANNEL_PIPELINE, subscriber);
+ publisher.subscribe(new SubscriberAdapter(subscriber));
+ }
+ }
@jroper
jroper Sep 2, 2015 Contributor

Shouldn't this method be capturing the NettyResponseFuture, and at very least invoking future.abort in the error callback?

@dotta
dotta Sep 2, 2015 Contributor

Absolutely. I'll add a test and fix it.

@jroper jroper and 2 others commented on an outdated diff Sep 2, 2015
...src/test/java/org/asynchttpclient/test/TestUtils.java
+ }
+
+ public SyncByteBufferPublisher(byte[] bytes, int chunkSize) {
+ this.bytes = Arrays.copyOf(bytes, bytes.length);
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
+ subscriber.onSubscribe(new Subscription() {
+ // This index is used to keep track of bytes that have been sent.
+ private volatile int currentIndex = 0;
+ // This flag will track whether this `Subscription` is to be considered cancelled or not
+ private volatile boolean cancelled = false;
+
+ public void request(long n) {
@jroper
jroper Sep 2, 2015 Contributor

This will need to be modified to avoid blowing the stack in case request is invoked by onNext - the reactive stream spec requires that this be handled here, not in Subscriber.onNext. A better option here might be to pull in a reactive streams implementation for the tests, eg rxjava or akka streams. @slandelle do you have a preference as to which implementation should be used?

@slandelle
slandelle Sep 2, 2015 Contributor

If we were to go with Akka, I guess we would have to turn the maven project into a Scala/Java mixed one so we can use TestKit. I'm afraid this would make the project more difficult to build for most people. So, even thought my heart would love to go with Akka, I think it's more reasonable to stay with a simple Java project and go with rx. WDYT?

@dotta
dotta Sep 2, 2015 Contributor

If we were to go with Akka, I guess we would have to turn the maven project into a Scala/Java mixed one so we can use TestKit.

We don't need the TestKit. A dependency to akka-streams would be enough. But, even if we had to use TesKit, I don't see why we would then need to turn this project into a Scala/Java mixed one.

That said, I think we should go with rxjava because with akka-streams turning a Source into a Publisher requires materialization (meaning we need to create an ActorMaterializer and hence also an ActorSystem). Consider that with rxjava it's straightforward to convert an Observer into a Publisher.

@jroper jroper and 2 others commented on an outdated diff Sep 2, 2015
api/src/main/java/org/asynchttpclient/Request.java
@@ -112,6 +113,13 @@
InputStream getStreamData();
/**
+ * Return the current request's body as a Reactive Stream Publisher.
+ *
+ * @return a Reactive Stream Publisher representation of the current request's body.
@jroper
jroper Sep 2, 2015 Contributor

This will return null if no publisher was set, right? That should probably be documented, we definitely don't want to expose any arbitrarily set body as a publisher as implementing that will require either depending on a reactive streams implementation from the async-http-client API, or implementing reactive streams ourselves.

@dotta
dotta Sep 2, 2015 Contributor

This will return null if no publisher was set, right?

I'm not sure what is the contract of Request. Though, by looking at RequestBuilder, it seems null is an acceptable value to return in all the getters in Request.

we definitely don't want to expose any arbitrarily set body as a publisher as implementing that will require either depending on a reactive streams implementation from the async-http-client API, or implementing reactive streams ourselves.

Here I'm not following. The AHC client core api already depends on reactive-stream, and the setBody(Publisher<T>) would be only a conveniency method so that you can write client.preparePut(getTargetUrl()).setBody(LARGE_IMAGE_PUBLISHER) instead of client.preparePut(getTargetUrl()).setBody(new ReactiveStreamBodyGenerator(LARGE_IMAGE_PUBLISHER)). That said, I can see the argument of not polluting the Request interface with yet another method.

@slandelle WDYT?

@slandelle
slandelle Sep 2, 2015 Contributor

Sadly, I don't see any convenient solution for not having core API depend on reactive-streams.

@dotta
dotta Sep 2, 2015 Contributor

Any thought on having getPublisherData() in Request?

@dotta
dotta Sep 2, 2015 Contributor

Alright, forget this, turns out I don't need it. See the upcoming commit for details.

@slandelle slandelle commented on the diff Sep 2, 2015
...org/asynchttpclient/handler/StreamedAsyncHandler.java
@@ -0,0 +1,19 @@
+package org.asynchttpclient.handler;
@slandelle
slandelle Sep 2, 2015 Contributor

Missing license header. Please grant copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...nt/request/body/generator/IFeedableBodyGenerator.java
@@ -0,0 +1,11 @@
+package org.asynchttpclient.request.body.generator;
@slandelle
slandelle Sep 2, 2015 Contributor

Missing license header. Please grant copyright to "the AsyncHttpClient Project".

@slandelle slandelle and 1 other commented on an outdated diff Sep 2, 2015
...quest/body/generator/ReactiveStreamBodyGenerator.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@dotta
dotta Sep 2, 2015 Contributor

Ah, I've copied the license note from another source. Grepping on 2010-2012 Sonatype reveals quite a few hits, just letting you know in case this is something you want to update.

@slandelle
slandelle Sep 2, 2015 Contributor

Yeah, Ning and Sonatype have successively employed @jfarcand to develop this project. I can't change copyrights without the owner donating.

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...nchttpclient/reactivestreams/ReactiveStreamsTest.java
@@ -0,0 +1,251 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...asynchttpclient/request/body/ReactiveStreamsTest.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on the diff Sep 2, 2015
...ient/netty/request/body/NettyReactiveStreamsTest.java
@@ -0,0 +1,13 @@
+package org.asynchttpclient.netty.request.body;
@slandelle
slandelle Sep 2, 2015 Contributor

Missing license header. Please grant copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on the diff Sep 2, 2015
...tpclient/netty/handler/StreamedResponsePublisher.java
@@ -0,0 +1,46 @@
+package org.asynchttpclient.netty.handler;
@slandelle
slandelle Sep 2, 2015 Contributor

Missing license header. Please grant copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...lient/netty/request/body/NettyReactiveStreamBody.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...t/netty/reactivestreams/NettyReactiveStreamsTest.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@slandelle slandelle commented on an outdated diff Sep 2, 2015
...ient/netty/request/body/NettyReactiveStreamsTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2010-2012 Sonatype, Inc. All rights reserved.
@slandelle
slandelle Sep 2, 2015 Contributor

Please change copyright to "the AsyncHttpClient Project".

@dotta
Contributor
dotta commented Sep 2, 2015

@slandelle I believe I've addressed all comments. The only pending one is #963 (comment), which you are discussing with @jroper.

Let me know if you have other comments, or if I should rebase and squash commits.

@slandelle
Contributor

@dotta @jroper Sorry, busy day. Will have a look this week-end.

@dotta
Contributor
dotta commented Sep 7, 2015

@slandelle I'm happy to squash commits and rebase if you give me your bless :-) In the meanwhile, I'll update Play master to take advantage of the changes in this PR. Whohoo!

@slandelle
Contributor

@dotta Sorry, had some unexpected things to deal with first... Yes, could you please squash and rebase? Let's merge this so everyone can move forward and get everything stable in the next weeks so we can release 2.0.0.

@dotta
Contributor
dotta commented Sep 7, 2015

Whohooo! I'm on it, thanks! :-)

@slandelle slandelle added this to the 2.0.0 milestone Sep 7, 2015
@slandelle slandelle merged commit fc870d8 into AsyncHttpClient:master Sep 7, 2015
@dotta dotta deleted the dotta:issue/544-reactive-streams-support branch Sep 7, 2015
@slandelle
Contributor

@jroper @dotta Thanks guys!

@dotta
Contributor
dotta commented Sep 7, 2015

@slandelle Thank you for merging!

I'm updating the Play codebase to use AHC master, will let you know if I hit any issue.

@slandelle
Contributor

Great!
What's your timeline for Play 2.5?

@dotta
Contributor
dotta commented Sep 7, 2015

I believe it's November 2015, but I'll let @jroper confirm.

Once I've verified that things work properly on the Play side, I'd really appreciate if you could release a 2.0.0 milestone (in case you would like to wait a bit more before cutting a 2.0.0 final).

@slandelle
Contributor

I can easily cut an alpha.

I'm still considering some refactorings (maybe Proxies) + I might want to break a few things to improve DNS handling in Gatling.

@dotta
Contributor
dotta commented Sep 7, 2015

Sounds good. I'll ping you soon :-)

@dotta dotta referenced this pull request in playframework/playframework Sep 8, 2015
Closed

Reimplement `NingWS.executeStream` using Akka Stream #4952

@dotta
Contributor
dotta commented Sep 8, 2015

@slandelle I've a branch in Play that is now using the new StreamedAsyncHandler, and it seems to be working just fine (and I'm really pleased with the result, the code is much, much, much easier to read now!). Could you cut a release so that I can push that work in Play?

@slandelle
Contributor

I just released 2.0.0-alpha9.
Cheers!

@jroper
Contributor
jroper commented Sep 8, 2015

Thanks @slandelle!

@dotta
Contributor
dotta commented Sep 9, 2015

@slandelle Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment