Skip to content

Commit

Permalink
Have FeedableBodyGenerator use ByteBuf instead of ByteBuffer, close #…
Browse files Browse the repository at this point in the history
…1424

So one can use Netty's pooled buffers
  • Loading branch information
slandelle committed Jun 12, 2017
1 parent 27b2e7a commit 94f8ef2
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 94 deletions.
Expand Up @@ -18,6 +18,7 @@
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static org.asynchttpclient.util.HttpUtils.*; import static org.asynchttpclient.util.HttpUtils.*;
import static org.asynchttpclient.util.MiscUtils.isNonEmpty; import static org.asynchttpclient.util.MiscUtils.isNonEmpty;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
Expand Down Expand Up @@ -398,11 +399,11 @@ public T setBody(InputStream stream) {
return asDerivedType(); return asDerivedType();
} }


public T setBody(Publisher<ByteBuffer> publisher) { public T setBody(Publisher<ByteBuf> publisher) {
return setBody(publisher, -1L); return setBody(publisher, -1L);
} }


public T setBody(Publisher<ByteBuffer> publisher, long contentLength) { public T setBody(Publisher<ByteBuf> publisher, long contentLength) {
return setBody(new ReactiveStreamsBodyGenerator(publisher, contentLength)); return setBody(new ReactiveStreamsBodyGenerator(publisher, contentLength));
} }


Expand Down
Expand Up @@ -12,8 +12,13 @@
*/ */
package org.asynchttpclient.netty.request.body; package org.asynchttpclient.netty.request.body;


import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;

import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;


import org.asynchttpclient.netty.NettyResponseFuture; import org.asynchttpclient.netty.NettyResponseFuture;
Expand All @@ -25,24 +30,16 @@


import com.typesafe.netty.HandlerSubscriber; import com.typesafe.netty.HandlerSubscriber;


import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.EventExecutor;

public class NettyReactiveStreamsBody implements NettyBody { public class NettyReactiveStreamsBody implements NettyBody {


private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class); private static final Logger LOGGER = LoggerFactory.getLogger(NettyReactiveStreamsBody.class);
private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer"; private static final String NAME_IN_CHANNEL_PIPELINE = "request-body-streamer";


private final Publisher<ByteBuffer> publisher; private final Publisher<ByteBuf> publisher;


private final long contentLength; private final long contentLength;


public NettyReactiveStreamsBody(Publisher<ByteBuffer> publisher, long contentLength) { public NettyReactiveStreamsBody(Publisher<ByteBuf> publisher, long contentLength) {
this.publisher = publisher; this.publisher = publisher;
this.contentLength = contentLength; this.contentLength = contentLength;
} }
Expand All @@ -69,32 +66,35 @@ public void write(Channel channel, NettyResponseFuture<?> future) throws IOExcep
} }
} }


private static class SubscriberAdapter implements Subscriber<ByteBuffer> { private static class SubscriberAdapter implements Subscriber<ByteBuf> {
private volatile Subscriber<HttpContent> subscriber; private volatile Subscriber<HttpContent> subscriber;

public SubscriberAdapter(Subscriber<HttpContent> subscriber) { public SubscriberAdapter(Subscriber<HttpContent> subscriber) {
this.subscriber = subscriber; this.subscriber = subscriber;
} }

@Override @Override
public void onSubscribe(Subscription s) { public void onSubscribe(Subscription s) {
subscriber.onSubscribe(s); subscriber.onSubscribe(s);
} }

@Override @Override
public void onNext(ByteBuffer t) { public void onNext(ByteBuf buffer) {
ByteBuf buffer = Unpooled.wrappedBuffer(t);
HttpContent content = new DefaultHttpContent(buffer); HttpContent content = new DefaultHttpContent(buffer);
subscriber.onNext(content); subscriber.onNext(content);
} }

@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
subscriber.onError(t); subscriber.onError(t);
} }

@Override @Override
public void onComplete() { public void onComplete() {
subscriber.onComplete(); subscriber.onComplete();
} }
} }

private static class NettySubscriber extends HandlerSubscriber<HttpContent> { private static class NettySubscriber extends HandlerSubscriber<HttpContent> {
private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class); private static final Logger LOGGER = LoggerFactory.getLogger(NettySubscriber.class);


Expand All @@ -109,8 +109,7 @@ public NettySubscriber(Channel channel, NettyResponseFuture<?> future) {


@Override @Override
protected void complete() { protected void complete() {
EventExecutor executor = channel.eventLoop(); channel.eventLoop().execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline()));
executor.execute(() -> channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(future -> removeFromPipeline()));
} }


@Override @Override
Expand Down
Expand Up @@ -13,13 +13,13 @@
*/ */
package org.asynchttpclient.request.body.generator; package org.asynchttpclient.request.body.generator;


import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf;


public final class BodyChunk { public final class BodyChunk {
public final boolean last; public final boolean last;
public final ByteBuffer buffer; public final ByteBuf buffer;


public BodyChunk(final ByteBuffer buffer, final boolean last) { public BodyChunk(ByteBuf buffer, boolean last) {
this.buffer = buffer; this.buffer = buffer;
this.last = last; this.last = last;
} }
Expand Down
Expand Up @@ -13,15 +13,15 @@
*/ */
package org.asynchttpclient.request.body.generator; package org.asynchttpclient.request.body.generator;


import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf;


/** /**
* {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it. * {@link BodyGenerator} which may return just part of the payload at the time handler is requesting it.
* If it happens, client becomes responsible for providing the rest of the chunks. * If it happens, client becomes responsible for providing the rest of the chunks.
*/ */
public interface FeedableBodyGenerator extends BodyGenerator { public interface FeedableBodyGenerator extends BodyGenerator {


boolean feed(ByteBuffer buffer, boolean isLast) throws Exception; boolean feed(ByteBuf buffer, boolean isLast) throws Exception;


void setListener(FeedListener listener); void setListener(FeedListener listener);
} }
Expand Up @@ -16,7 +16,6 @@
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue; import java.util.Queue;


import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.Body;
Expand Down Expand Up @@ -54,7 +53,7 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
if (nextChunk == null) { if (nextChunk == null) {
// Nothing in the queue. suspend stream if nothing was read. (reads == 0) // Nothing in the queue. suspend stream if nothing was read. (reads == 0)
return res; return res;
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.last) { } else if (!nextChunk.buffer.isReadable() && !nextChunk.last) {
// skip empty buffers // skip empty buffers
queue.remove(); queue.remove();
} else { } else {
Expand All @@ -66,26 +65,15 @@ private BodyState readNextChunk(ByteBuf target) throws IOException {
} }


private void readChunk(ByteBuf target, BodyChunk part) { private void readChunk(ByteBuf target, BodyChunk part) {
move(target, part.buffer); target.writeBytes(part.buffer);

if (!part.buffer.isReadable()) {
if (!part.buffer.hasRemaining()) {
if (part.last) { if (part.last) {
state = BodyState.STOP; state = BodyState.STOP;
} }
queue.remove(); queue.remove();
} }
} }


private void move(ByteBuf target, ByteBuffer source) {
int size = Math.min(target.writableBytes(), source.remaining());
if (size > 0) {
ByteBuffer slice = source.slice();
slice.limit(size);
target.writeBytes(slice);
source.position(source.position() + size);
}
}

@Override @Override
public void close() { public void close() {
} }
Expand Down
Expand Up @@ -13,7 +13,8 @@
*/ */
package org.asynchttpclient.request.body.generator; package org.asynchttpclient.request.body.generator;


import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf;

import java.util.Queue; import java.util.Queue;


import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.Body;
Expand All @@ -35,7 +36,7 @@ public Body createBody() {
protected abstract boolean offer(BodyChunk chunk) throws Exception; protected abstract boolean offer(BodyChunk chunk) throws Exception;


@Override @Override
public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception { public boolean feed(final ByteBuf buffer, final boolean isLast) throws Exception {
boolean offered = offer(new BodyChunk(buffer, isLast)); boolean offered = offer(new BodyChunk(buffer, isLast));
if (offered && listener != null) { if (offered && listener != null) {
listener.onContentAdded(); listener.onContentAdded();
Expand Down
Expand Up @@ -13,9 +13,9 @@
package org.asynchttpclient.request.body.generator; package org.asynchttpclient.request.body.generator;


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;


import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;


import org.asynchttpclient.request.body.Body; import org.asynchttpclient.request.body.Body;
Expand All @@ -26,9 +26,8 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator { public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
private static final ByteBuffer EMPTY = ByteBuffer.wrap("".getBytes());


private final Publisher<ByteBuffer> publisher; private final Publisher<ByteBuf> publisher;
private final FeedableBodyGenerator feedableBodyGenerator; private final FeedableBodyGenerator feedableBodyGenerator;
private volatile FeedListener feedListener; private volatile FeedListener feedListener;
private final long contentLength; private final long contentLength;
Expand All @@ -41,18 +40,18 @@ public class ReactiveStreamsBodyGenerator implements FeedableBodyGenerator {
* @param publisher Body as a Publisher * @param publisher Body as a Publisher
* @param contentLength Content-Length of the Body * @param contentLength Content-Length of the Body
*/ */
public ReactiveStreamsBodyGenerator(Publisher<ByteBuffer> publisher, long contentLength) { public ReactiveStreamsBodyGenerator(Publisher<ByteBuf> publisher, long contentLength) {
this.publisher = publisher; this.publisher = publisher;
this.feedableBodyGenerator = new UnboundedQueueFeedableBodyGenerator(); this.feedableBodyGenerator = new UnboundedQueueFeedableBodyGenerator();
this.contentLength = contentLength; this.contentLength = contentLength;
} }


public Publisher<ByteBuffer> getPublisher() { public Publisher<ByteBuf> getPublisher() {
return this.publisher; return this.publisher;
} }


@Override @Override
public boolean feed(ByteBuffer buffer, boolean isLast) throws Exception { public boolean feed(ByteBuf buffer, boolean isLast) throws Exception {
return feedableBodyGenerator.feed(buffer, isLast); return feedableBodyGenerator.feed(buffer, isLast);
} }


Expand All @@ -79,7 +78,7 @@ private class StreamedBody implements Body {


private final long contentLength; private final long contentLength;


public StreamedBody(Publisher<ByteBuffer> publisher, FeedableBodyGenerator bodyGenerator, long contentLength) { public StreamedBody(Publisher<ByteBuf> publisher, FeedableBodyGenerator bodyGenerator, long contentLength) {
this.body = bodyGenerator.createBody(); this.body = bodyGenerator.createBody();
this.subscriber = new SimpleSubscriber(bodyGenerator); this.subscriber = new SimpleSubscriber(bodyGenerator);
this.contentLength = contentLength; this.contentLength = contentLength;
Expand All @@ -97,14 +96,15 @@ public long getContentLength() {


@Override @Override
public BodyState transferTo(ByteBuf target) throws IOException { public BodyState transferTo(ByteBuf target) throws IOException {
if (initialized.compareAndSet(false, true)) if (initialized.compareAndSet(false, true)) {
publisher.subscribe(subscriber); publisher.subscribe(subscriber);
}


return body.transferTo(target); return body.transferTo(target);
} }
} }


private class SimpleSubscriber implements Subscriber<ByteBuffer> { private class SimpleSubscriber implements Subscriber<ByteBuf> {


private final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class); private final Logger LOGGER = LoggerFactory.getLogger(SimpleSubscriber.class);


Expand All @@ -130,7 +130,7 @@ public void onSubscribe(Subscription s) {
} }


@Override @Override
public void onNext(ByteBuffer t) { public void onNext(ByteBuf t) {
if (t == null) if (t == null)
throw null; throw null;
try { try {
Expand All @@ -147,14 +147,15 @@ public void onError(Throwable t) {
throw null; throw null;
LOGGER.debug("Error occurred while consuming body stream.", t); LOGGER.debug("Error occurred while consuming body stream.", t);
FeedListener listener = feedListener; FeedListener listener = feedListener;
if (listener != null) if (listener != null) {
listener.onError(t); listener.onError(t);
}
} }


@Override @Override
public void onComplete() { public void onComplete() {
try { try {
feeder.feed(EMPTY, true); feeder.feed(Unpooled.EMPTY_BUFFER, true);
} catch (Exception e) { } catch (Exception e) {
LOGGER.info("Ignoring exception occurred while completing stream processing.", e); LOGGER.info("Ignoring exception occurred while completing stream processing.", e);
this.subscription.cancel(); this.subscription.cancel();
Expand Down
Expand Up @@ -16,11 +16,11 @@
import static org.asynchttpclient.test.TestUtils.*; import static org.asynchttpclient.test.TestUtils.*;
import static org.testng.Assert.*; import static org.testng.Assert.*;
import static org.testng.FileAssert.fail; import static org.testng.FileAssert.fail;
import io.netty.buffer.Unpooled;


import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files; import java.nio.file.Files;


import org.asynchttpclient.AbstractBasicTest; import org.asynchttpclient.AbstractBasicTest;
Expand Down Expand Up @@ -89,10 +89,10 @@ private void feed(FeedableBodyGenerator feedableBodyGenerator, InputStream is) t
for (int i = 0; (i = inputStream.read(buffer)) > -1;) { for (int i = 0; (i = inputStream.read(buffer)) > -1;) {
byte[] chunk = new byte[i]; byte[] chunk = new byte[i];
System.arraycopy(buffer, 0, chunk, 0, i); System.arraycopy(buffer, 0, chunk, 0, i);
feedableBodyGenerator.feed(ByteBuffer.wrap(chunk), false); feedableBodyGenerator.feed(Unpooled.wrappedBuffer(chunk), false);
} }
} }
feedableBodyGenerator.feed(ByteBuffer.allocate(0), true); feedableBodyGenerator.feed(Unpooled.EMPTY_BUFFER, true);


} }


Expand Down

0 comments on commit 94f8ef2

Please sign in to comment.