Skip to content

Commit

Permalink
Detach PushBody from QueueBasedFeedableBodyGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
slandelle committed Dec 7, 2015
1 parent 18500c7 commit 1a5985c
Showing 1 changed file with 22 additions and 16 deletions.
Expand Up @@ -28,12 +28,13 @@ public abstract class QueueBasedFeedableBodyGenerator<T extends Queue<BodyChunk>


@Override @Override
public Body createBody() { public Body createBody() {
return new PushBody(); return new PushBody(queue());
} }


protected abstract boolean offer(BodyChunk chunk) throws Exception; protected abstract boolean offer(BodyChunk chunk) throws Exception;

protected abstract Queue<BodyChunk> queue(); protected abstract Queue<BodyChunk> queue();

@Override @Override
public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception { public boolean feed(final ByteBuffer buffer, final boolean isLast) throws Exception {
boolean offered = offer(new BodyChunk(buffer, isLast)); boolean offered = offer(new BodyChunk(buffer, isLast));
Expand All @@ -48,10 +49,15 @@ public void setListener(FeedListener listener) {
this.listener = listener; this.listener = listener;
} }


public final class PushBody implements Body { public static final class PushBody implements Body {


private final Queue<BodyChunk> queue;
private BodyState state = BodyState.CONTINUE; private BodyState state = BodyState.CONTINUE;


public PushBody(Queue<BodyChunk> queue) {
this.queue = queue;
}

@Override @Override
public long getContentLength() { public long getContentLength() {
return -1; return -1;
Expand All @@ -72,13 +78,13 @@ public BodyState transferTo(final ByteBuf target) throws IOException {
private BodyState readNextChunk(ByteBuf target) throws IOException { private BodyState readNextChunk(ByteBuf target) throws IOException {
BodyState res = BodyState.SUSPEND; BodyState res = BodyState.SUSPEND;
while (target.isWritable() && state != BodyState.STOP) { while (target.isWritable() && state != BodyState.STOP) {
BodyChunk nextChunk = queue().peek(); BodyChunk nextChunk = queue.peek();
if (nextChunk == null) { if (nextChunk == null) {
// Nothing in the queue. suspend stream if nothing was read. (reads == 0) // Nothing in the queue. suspend stream if nothing was read. (reads == 0)
return res; return res;
} else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) { } else if (!nextChunk.buffer.hasRemaining() && !nextChunk.isLast) {
// skip empty buffers // skip empty buffers
queue().remove(); queue.remove();
} else { } else {
res = BodyState.CONTINUE; res = BodyState.CONTINUE;
readChunk(target, nextChunk); readChunk(target, nextChunk);
Expand All @@ -94,22 +100,22 @@ private void readChunk(ByteBuf target, BodyChunk part) {
if (part.isLast) { if (part.isLast) {
state = BodyState.STOP; state = BodyState.STOP;
} }
queue().remove(); queue.remove();
} }
} }


@Override private void move(ByteBuf target, ByteBuffer source) {
public void close() { int size = Math.min(target.writableBytes(), source.remaining());
if (size > 0) {
ByteBuffer slice = source.slice();
slice.limit(size);
target.writeBytes(slice);
source.position(source.position() + size);
}
} }
}


private void move(ByteBuf target, ByteBuffer source) { @Override
int size = Math.min(target.writableBytes(), source.remaining()); public void close() {
if (size > 0) {
ByteBuffer slice = source.slice();
slice.limit(size);
target.writeBytes(slice);
source.position(source.position() + size);
} }
} }


Expand Down

0 comments on commit 1a5985c

Please sign in to comment.