Skip to content

Commit

Permalink
Client response pause/resume
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 14d839f commit a4c16fd
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
35 changes: 29 additions & 6 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -106,6 +106,8 @@ class Http2ClientStream implements HttpClientStream {
private final Http2Stream stream;
private final Http2ConnectionEncoder encoder;
private HttpClientResponseImpl resp;
private boolean paused;
private int numBytes;

public Http2ClientStream(HttpClientRequestImpl req,
ContextImpl context,
Expand Down Expand Up @@ -135,14 +137,21 @@ void handleHeaders(Http2Headers headers, boolean end) {
}
}

void handleData(ByteBuf chunk, boolean end) {
int handleData(ByteBuf chunk, boolean end) {
int consumed = 0;
if (chunk.isReadable()) {
Buffer buff = Buffer.buffer(chunk.slice());
resp.handleChunk(buff);
if (paused) {
numBytes += chunk.readableBytes();
} else {
consumed = chunk.readableBytes();
}
}
if (end) {
handleEnd();
}
return consumed;
}

private void handleEnd() {
Expand Down Expand Up @@ -189,7 +198,6 @@ public Context getContext() {
}
@Override
public void doSetWriteQueueMaxSize(int size) {
throw new UnsupportedOperationException();
}
@Override
public boolean isNotWritable() {
Expand All @@ -204,11 +212,26 @@ public void endRequest() {
}
@Override
public void doPause() {
throw new UnsupportedOperationException();
paused = true;
}
@Override
public void doResume() {
throw new UnsupportedOperationException();
paused = false;
if (numBytes > 0) {
int pending = numBytes;
context.runOnContext(v -> {
// DefaultHttp2LocalFlowController requires to do this from the event loop
try {
boolean windowUpdateSent = conn.local().flowController().consumeBytes(stream, pending);
if (windowUpdateSent) {
handlerCtx.flush();
}
} catch (Http2Exception e) {
e.printStackTrace();
}
});
numBytes = 0;
}
}
@Override
public void reportBytesWritten(long numberOfBytes) {
Expand Down Expand Up @@ -265,8 +288,8 @@ Http2ClientStream createStream(HttpClientRequestImpl req) {
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = streams.get(streamId);
stream.handleData(data, endOfStream);
return data.readableBytes() + padding;
int consumed = stream.handleData(data, endOfStream);
return consumed + padding;
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientStream.java
Expand Up @@ -35,6 +35,8 @@ interface HttpClientStream {
void doSetWriteQueueMaxSize(int size);
boolean isNotWritable();
void handleInterestedOpsChanged();

// Perhaps it's possible to remove this with writeBuffer(buf, true) instead
void endRequest();

void doPause();
Expand Down
48 changes: 47 additions & 1 deletion src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -17,10 +17,12 @@
package io.vertx.test.core;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.NetServerOptions;
Expand Down Expand Up @@ -96,7 +98,7 @@ public void testPost() throws Exception {
}

@Test
public void testRequestWriteability() throws Exception {
public void testClientRequestWriteability() throws Exception {
Buffer content = Buffer.buffer();
Buffer expected = Buffer.buffer();
String chunk = TestUtils.randomAlphaString(100);
Expand Down Expand Up @@ -146,6 +148,50 @@ public void testRequestWriteability() throws Exception {
await();
}

@Test
public void testClientResponsePauseResume() throws Exception {
String content = TestUtils.randomAlphaString(1024);
Buffer expected = Buffer.buffer();
Future<Void> whenFull = Future.future();
AtomicBoolean drain = new AtomicBoolean();
server.requestHandler(req -> {
HttpServerResponse resp = req.response();
resp.putHeader("content-type", "text/plain");
resp.setChunked(true);
vertx.setPeriodic(1, timerID -> {
if (resp.writeQueueFull()) {
resp.drainHandler(v -> {
Buffer last = Buffer.buffer("last");
expected.appendBuffer(last);
resp.end(last);
assertEquals(expected.toString().getBytes().length, resp.bytesWritten());
});
vertx.cancelTimer(timerID);
drain.set(true);
whenFull.complete();
} else {
Buffer chunk = Buffer.buffer(content);
expected.appendBuffer(chunk);
resp.write(chunk);
}
});
});
startServer();
client.getNow(4043, "localhost", "/somepath", resp -> {
Buffer received = Buffer.buffer();
resp.pause();
resp.handler(received::appendBuffer);
resp.endHandler(v -> {
assertEquals(expected.toString(), received.toString());
testComplete();
});
whenFull.setHandler(v -> {
resp.resume();
});
});
await();
}

@Test
public void testQueueRequests() throws Exception {
int numReq = 100;
Expand Down

0 comments on commit a4c16fd

Please sign in to comment.