Skip to content

Commit

Permalink
Client request flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 7323512 commit 14d839f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 18 deletions.
57 changes: 40 additions & 17 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -33,6 +33,7 @@
import io.netty.handler.codec.http2.Http2FrameListener; import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.collection.IntObjectHashMap; import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap; import io.netty.util.collection.IntObjectMap;
import io.vertx.core.Context; import io.vertx.core.Context;
Expand Down Expand Up @@ -99,20 +100,23 @@ void closeAllConnections() {
class Http2ClientStream implements HttpClientStream { class Http2ClientStream implements HttpClientStream {


private final HttpClientRequestImpl req; private final HttpClientRequestImpl req;
private final ChannelHandlerContext context; private final ContextImpl context;
private final ChannelHandlerContext handlerCtx;
private final Http2Connection conn; private final Http2Connection conn;
private final int id; private final Http2Stream stream;
private final Http2ConnectionEncoder encoder; private final Http2ConnectionEncoder encoder;
private HttpClientResponseImpl resp; private HttpClientResponseImpl resp;


public Http2ClientStream(HttpClientRequestImpl req, public Http2ClientStream(HttpClientRequestImpl req,
ChannelHandlerContext context, ContextImpl context,
ChannelHandlerContext handlerCtx,
Http2Connection conn, Http2Connection conn,
Http2ConnectionEncoder encoder) { Http2ConnectionEncoder encoder) throws Http2Exception {
this.req = req;
this.context = context; this.context = context;
this.req = req;
this.handlerCtx = handlerCtx;
this.conn = conn; this.conn = conn;
this.id = conn.local().incrementAndGetNextStreamId(); this.stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
this.encoder = encoder; this.encoder = encoder;
} }


Expand Down Expand Up @@ -157,36 +161,43 @@ public void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers
h.method(method.name()); h.method(method.name());
h.path(uri); h.path(uri);
h.scheme("https"); h.scheme("https");
encoder.writeHeaders(context, id, h, 0, end && buf == null, context.newPromise()); encoder.writeHeaders(handlerCtx, stream.id(), h, 0, end && buf == null, handlerCtx.newPromise());
if (buf != null) { if (buf != null) {
encoder.writeData(context, id, buf, 0, end, context.newPromise()); encoder.writeData(handlerCtx, stream.id(), buf, 0, end, handlerCtx.newPromise());
} }
context.flush(); handlerCtx.flush();
} }
@Override @Override
public void writeBuffer(ByteBuf buf, boolean end) { public void writeBuffer(ByteBuf buf, boolean end) {
System.out.println("SHOULD WRITE BUFFER"); encoder.writeData(handlerCtx, stream.id(), buf, 0, end, handlerCtx.newPromise());
throw new UnsupportedOperationException(); if (end) {
try {
encoder.flowController().writePendingBytes();
} catch (Http2Exception e) {
e.printStackTrace();
}
}
handlerCtx.flush();
} }
@Override @Override
public String hostHeader() { public String hostHeader() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public Context getContext() { public Context getContext() {
throw new UnsupportedOperationException(); return context;
} }
@Override @Override
public void doSetWriteQueueMaxSize(int size) { public void doSetWriteQueueMaxSize(int size) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override @Override
public boolean isNotWritable() { public boolean isNotWritable() {
throw new UnsupportedOperationException(); return !conn.remote().flowController().isWritable(stream);
} }
@Override @Override
public void handleInterestedOpsChanged() { public void handleInterestedOpsChanged() {
throw new UnsupportedOperationException(); req.handleDrained();
} }
@Override @Override
public void endRequest() { public void endRequest() {
Expand Down Expand Up @@ -224,6 +235,14 @@ public VertxClientHandler(
Http2ConnectionEncoder encoder, Http2ConnectionEncoder encoder,
Http2Settings initialSettings) { Http2Settings initialSettings) {
super(decoder, encoder, initialSettings); super(decoder, encoder, initialSettings);

encoder.flowController().listener(stream -> {
Http2ClientStream clientStream = streams.get(stream.id());
if (clientStream != null && !clientStream.isNotWritable()) {
clientStream.handleInterestedOpsChanged();
}
});

this.handlerCtx = handlerCtx; this.handlerCtx = handlerCtx;
this.context = context; this.context = context;
} }
Expand All @@ -234,9 +253,13 @@ void handle(Handler<HttpClientStream> handler, HttpClientRequestImpl req) {
} }


Http2ClientStream createStream(HttpClientRequestImpl req) { Http2ClientStream createStream(HttpClientRequestImpl req) {
Http2ClientStream stream = new Http2ClientStream(req, handlerCtx, connection(), encoder()); try {
streams.put(stream.id, stream); Http2ClientStream stream = new Http2ClientStream(req, context, handlerCtx, connection(), encoder());
return stream; streams.put(stream.stream.id(), stream);
return stream;
} catch (Http2Exception e) {
throw new UnsupportedOperationException("handle me gracefully", e);
}
} }


@Override @Override
Expand Down
Expand Up @@ -361,7 +361,7 @@ void dataReceived() {


void handleDrained() { void handleDrained() {
synchronized (getLock()) { synchronized (getLock()) {
if (drainHandler != null) { if (!completed && drainHandler != null) {
try { try {
drainHandler.handle(null); drainHandler.handle(null);
} catch (Throwable t) { } catch (Throwable t) {
Expand Down
55 changes: 55 additions & 0 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -16,15 +16,19 @@


package io.vertx.test.core; package io.vertx.test.core;


import io.vertx.core.Context;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.net.JksOptions; import io.vertx.core.net.JksOptions;
import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetServerOptions;
import org.junit.Test; import org.junit.Test;


import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


/** /**
Expand Down Expand Up @@ -91,6 +95,57 @@ public void testPost() throws Exception {
await(); await();
} }


@Test
public void testRequestWriteability() throws Exception {
Buffer content = Buffer.buffer();
Buffer expected = Buffer.buffer();
String chunk = TestUtils.randomAlphaString(100);
CompletableFuture<Void> done = new CompletableFuture<>();
AtomicBoolean paused = new AtomicBoolean();
AtomicInteger numPause = new AtomicInteger();
server.requestHandler(req -> {
Context ctx = vertx.getOrCreateContext();
done.thenAccept(v1 -> {
paused.set(false);
ctx.runOnContext(v2 -> {
req.resume();
});
});
numPause.incrementAndGet();
req.pause();
paused.set(true);
req.handler(content::appendBuffer);
req.endHandler(v -> {
assertEquals(expected, content);
testComplete();
});
});
startServer();
HttpClientRequest req = client.post(4043, "localhost", "/somepath", resp -> {
}).setChunked(true).exceptionHandler(err -> {
fail();
});
AtomicInteger count = new AtomicInteger();
vertx.setPeriodic(1, timerID -> {
if (req.writeQueueFull()) {
assertTrue(paused.get());
assertEquals(1, numPause.get());
req.drainHandler(v -> {
assertEquals(1, numPause.get());
assertFalse(paused.get());
req.end();
});
vertx.cancelTimer(timerID);
done.complete(null);
} else {
count.incrementAndGet();
expected.appendString(chunk);
req.write(chunk);
}
});
await();
}

@Test @Test
public void testQueueRequests() throws Exception { public void testQueueRequests() throws Exception {
int numReq = 100; int numReq = 100;
Expand Down

0 comments on commit 14d839f

Please sign in to comment.