From 1a5985c8f87b734e44f80249ed7b3ae45802a99a Mon Sep 17 00:00:00 2001 From: Stephane Landelle Date: Mon, 7 Dec 2015 17:42:37 +0100 Subject: [PATCH] Detach PushBody from QueueBasedFeedableBodyGenerator --- .../QueueBasedFeedableBodyGenerator.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java b/client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java index a722e601f4..1537a8808d 100644 --- a/client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java +++ b/client/src/main/java/org/asynchttpclient/request/body/generator/QueueBasedFeedableBodyGenerator.java @@ -28,12 +28,13 @@ public abstract class QueueBasedFeedableBodyGenerator @Override public Body createBody() { - return new PushBody(); + return new PushBody(queue()); } protected abstract boolean offer(BodyChunk chunk) throws Exception; + protected abstract Queue queue(); - + @Override public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception { boolean offered = offer(new BodyChunk(buffer, isLast)); @@ -48,10 +49,15 @@ public void setListener(FeedListener listener) { this.listener = listener; } - public final class PushBody implements Body { + public static final class PushBody implements Body { + private final Queue queue; private BodyState state = BodyState.CONTINUE; + public PushBody(Queue queue) { + this.queue = queue; + } + @Override public long getContentLength() { return -1; @@ -72,13 +78,13 @@ public BodyState transferTo(final ByteBuf target) throws IOException { private BodyState readNextChunk(ByteBuf target) throws IOException { BodyState res = BodyState.SUSPEND; while (target.isWritable() && state != BodyState.STOP) { - BodyChunk nextChunk = queue().peek(); + BodyChunk nextChunk = queue.peek(); if (nextChunk == null) { // Nothing in the queue. suspend stream if nothing was read. (reads == 0) return res; } else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) { // skip empty buffers - queue().remove(); + queue.remove(); } else { res = BodyState.CONTINUE; readChunk(target, nextChunk); @@ -94,22 +100,22 @@ private void readChunk(ByteBuf target, BodyChunk part) { if (part.isLast) { state = BodyState.STOP; } - queue().remove(); + queue.remove(); } } - @Override - public void close() { + private void move(ByteBuf target, ByteBuffer source) { + int size = Math.min(target.writableBytes(), source.remaining()); + if (size > 0) { + ByteBuffer slice = source.slice(); + slice.limit(size); + target.writeBytes(slice); + source.position(source.position() + size); + } } - } - private void move(ByteBuf target, ByteBuffer source) { - int size = Math.min(target.writableBytes(), source.remaining()); - if (size > 0) { - ByteBuffer slice = source.slice(); - slice.limit(size); - target.writeBytes(slice); - source.position(source.position() + size); + @Override + public void close() { } }