Skip to content

Commit

Permalink
Ensure executeFromIO is set correctly in various places
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 23, 2016
1 parent 7ca0a51 commit c91b634
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 56 deletions.
39 changes: 19 additions & 20 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -106,33 +106,32 @@ public boolean isValid() {


@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception { public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
throw new UnsupportedOperationException("todo");
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
stream.handleHeaders(headers, endOfStream); if (stream != null) {
} context.executeFromIO(() -> {

stream.handleHeaders(headers, endOfStream);
@Override });
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception { }
} }


@Override @Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception { public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId); Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
HttpMethod method = HttpUtils.toVertxMethod(headers.method().toString()); if (stream != null) {
String uri = headers.path().toString(); context.executeFromIO(() -> {
String host = headers.authority() != null ? headers.authority().toString() : null; HttpMethod method = HttpUtils.toVertxMethod(headers.method().toString());
MultiMap headersMap = new Http2HeadersAdaptor(headers); String uri = headers.path().toString();
Http2Stream promisedStream = handler.connection().stream(promisedStreamId); String host = headers.authority() != null ? headers.authority().toString() : null;
HttpClientRequestPushPromise promisedReq = new HttpClientRequestPushPromise(this, promisedStream, http2Pool.client, method, uri, host, headersMap); MultiMap headersMap = new Http2HeadersAdaptor(headers);
if (metrics.isEnabled()) { Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
promisedReq.metric(metrics.responsePushed(metric, localAddress(), remoteAddress(), promisedReq)); HttpClientRequestPushPromise promisedReq = new HttpClientRequestPushPromise(this, promisedStream, http2Pool.client, method, uri, host, headersMap);
if (metrics.isEnabled()) {
promisedReq.metric(metrics.responsePushed(metric, localAddress(), remoteAddress(), promisedReq));
}
streams.put(promisedStreamId, promisedReq.getStream());
stream.handlePushPromise(promisedReq);
});
} }
streams.put(promisedStreamId, promisedReq.getStream());
stream.handlePushPromise(promisedReq);
} }


static class Http2ClientStream extends VertxHttp2Stream<Http2ClientConnection> implements HttpClientStream { static class Http2ClientStream extends VertxHttp2Stream<Http2ClientConnection> implements HttpClientStream {
Expand Down
18 changes: 14 additions & 4 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -70,10 +70,10 @@ public Http2ConnectionBase(Channel channel, ContextImpl context, VertxHttp2Conne


handler.connection().addListener(this); handler.connection().addListener(this);


handler.encoder().flowController().listener(stream -> { handler.encoder().flowController().listener(s -> {
VertxHttp2Stream resp = streams.get(stream.id()); VertxHttp2Stream stream = streams.get(s.id());
if (resp != null) { if (stream != null) {
resp.handleInterestedOpsChanged(); context.executeFromIO(stream::handleInterestedOpsChanged);
} }
}); });


Expand Down Expand Up @@ -189,6 +189,16 @@ public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData


// Http2FrameListener // Http2FrameListener


@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) {
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}

@Override @Override
public void onSettingsAckRead(ChannelHandlerContext ctx) { public void onSettingsAckRead(ChannelHandlerContext ctx) {
Runnable handler = updateSettingsHandler.poll(); Runnable handler = updateSettingsHandler.poll();
Expand Down
14 changes: 1 addition & 13 deletions src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
Expand Up @@ -108,15 +108,14 @@ private boolean isMalformedRequest(Http2Headers headers) {
@Override @Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
Http2Headers headers, int padding, boolean endOfStream) { Http2Headers headers, int padding, boolean endOfStream) {
Http2Connection conn = handler.connection();
VertxHttp2Stream stream = streams.get(streamId); VertxHttp2Stream stream = streams.get(streamId);
if (stream == null) { if (stream == null) {
if (isMalformedRequest(headers)) { if (isMalformedRequest(headers)) {
handler.encoder().writeRstStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise()); handler.encoder().writeRstStream(ctx, streamId, Http2Error.PROTOCOL_ERROR.code(), ctx.newPromise());
return; return;
} }
String contentEncoding = options.isCompressionSupported() ? HttpUtils.determineContentEncoding(headers) : null; String contentEncoding = options.isCompressionSupported() ? HttpUtils.determineContentEncoding(headers) : null;
Http2ServerRequestImpl req = new Http2ServerRequestImpl(this, conn.stream(streamId), metrics, serverOrigin, headers, contentEncoding); Http2ServerRequestImpl req = new Http2ServerRequestImpl(this, handler.connection().stream(streamId), metrics, serverOrigin, headers, contentEncoding);
stream = req; stream = req;
CharSequence value = headers.get(HttpHeaderNames.EXPECT); CharSequence value = headers.get(HttpHeaderNames.EXPECT);
if (options.isHandle100ContinueAutomatically() && if (options.isHandle100ContinueAutomatically() &&
Expand All @@ -136,17 +135,6 @@ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
} }
} }


@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) {
onHeadersRead(ctx, streamId, headers, padding, endOfStream);
}

@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency,
short weight, boolean exclusive) {
}

@Override @Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) { public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
Long v = settings.maxConcurrentStreams(); Long v = settings.maxConcurrentStreams();
Expand Down
Expand Up @@ -97,9 +97,7 @@ void handleClose() {
void handleInterestedOpsChanged() { void handleInterestedOpsChanged() {
Handler<Void> handler = this.drainHandler; Handler<Void> handler = this.drainHandler;
if (handler != null && !writeQueueFull()) { if (handler != null && !writeQueueFull()) {
vertx.runOnContext(v -> { handler.handle(null);
handler.handle(null);
});
} }
} }


Expand Down
Expand Up @@ -71,7 +71,7 @@ public void doPause() {


public void doResume() { public void doResume() {
paused = false; paused = false;
checkNextTick(null); context.runOnContext(this::checkNextTick);
} }


private void consume(int numBytes) { private void consume(int numBytes) {
Expand Down
60 changes: 55 additions & 5 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -47,6 +47,7 @@
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientRequest;
Expand Down Expand Up @@ -103,7 +104,9 @@ public void testClientSettings() throws Exception {
req.response().end(); req.response().end();
}); });
}).connectionHandler(conn -> { }).connectionHandler(conn -> {
Context ctx = Vertx.currentContext();
conn.remoteSettingsHandler(settings -> { conn.remoteSettingsHandler(settings -> {
assertOnIOContext(ctx);
switch (count.getAndIncrement()) { switch (count.getAndIncrement()) {
case 0: case 0:
assertEquals(initialSettings.isPushEnabled(), settings.isPushEnabled()); assertEquals(initialSettings.isPushEnabled(), settings.isPushEnabled());
Expand Down Expand Up @@ -207,6 +210,8 @@ public void testGet() throws Exception {
}); });
startServer(); startServer();
client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context ctx = vertx.getOrCreateContext();
assertOnIOContext(ctx);
assertEquals(1, reqCount.get()); assertEquals(1, reqCount.get());
assertEquals(HttpVersion.HTTP_2, resp.version()); assertEquals(HttpVersion.HTTP_2, resp.version());
assertEquals(200, resp.statusCode()); assertEquals(200, resp.statusCode());
Expand All @@ -219,8 +224,12 @@ public void testGet() throws Exception {
assertEquals("juu_value_1", resp.headers().getAll("juu_response").get(0)); assertEquals("juu_value_1", resp.headers().getAll("juu_response").get(0));
assertEquals("juu_value_2", resp.headers().getAll("juu_response").get(1)); assertEquals("juu_value_2", resp.headers().getAll("juu_response").get(1));
Buffer content = Buffer.buffer(); Buffer content = Buffer.buffer();
resp.handler(content::appendBuffer); resp.handler(buff -> {
assertOnIOContext(ctx);
content.appendBuffer(buff);
});
resp.endHandler(v -> { resp.endHandler(v -> {
assertOnIOContext(ctx);
assertEquals(expected, content.toString()); assertEquals(expected, content.toString());
testComplete(); testComplete();
}); });
Expand Down Expand Up @@ -286,7 +295,9 @@ public void testBodyEndHandler() throws Exception {
}); });
startServer(); startServer();
client.getNow(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { client.getNow(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context ctx = vertx.getOrCreateContext();
resp.bodyHandler(body -> { resp.bodyHandler(body -> {
assertOnIOContext(ctx);
assertEquals(expected, body); assertEquals(expected, body);
testComplete(); testComplete();
}); });
Expand Down Expand Up @@ -352,10 +363,12 @@ public void testClientRequestWriteability() throws Exception {
AtomicInteger count = new AtomicInteger(); AtomicInteger count = new AtomicInteger();
AtomicInteger drained = new AtomicInteger(); AtomicInteger drained = new AtomicInteger();
vertx.setPeriodic(1, timerID -> { vertx.setPeriodic(1, timerID -> {
Context ctx = vertx.getOrCreateContext();
if (req.writeQueueFull()) { if (req.writeQueueFull()) {
assertTrue(paused.get()); assertTrue(paused.get());
assertEquals(1, numPause.get()); assertEquals(1, numPause.get());
req.drainHandler(v -> { req.drainHandler(v -> {
assertOnIOContext(ctx);
assertEquals(0, drained.getAndIncrement()); assertEquals(0, drained.getAndIncrement());
assertEquals(1, numPause.get()); assertEquals(1, numPause.get());
assertFalse(paused.get()); assertFalse(paused.get());
Expand Down Expand Up @@ -403,9 +416,17 @@ public void testClientResponsePauseResume() throws Exception {
}); });
startServer(); startServer();
client.getNow(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { client.getNow(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context ctx = vertx.getOrCreateContext();
Buffer received = Buffer.buffer(); Buffer received = Buffer.buffer();
resp.pause(); resp.pause();
resp.handler(received::appendBuffer); resp.handler(buff -> {
if (whenFull.isComplete()) {
assertSame(ctx, Vertx.currentContext());
} else {
assertOnIOContext(ctx);
}
received.appendBuffer(buff);
});
resp.endHandler(v -> { resp.endHandler(v -> {
assertEquals(expected.toString().length(), received.toString().length()); assertEquals(expected.toString().length(), received.toString().length());
testComplete(); testComplete();
Expand Down Expand Up @@ -499,6 +520,8 @@ public void testReuseConnection() throws Exception {
public void testConnectionFailed() throws Exception { public void testConnectionFailed() throws Exception {
client.get(4044, DEFAULT_HTTPS_HOST, "/somepath", resp -> { client.get(4044, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
}).exceptionHandler(err -> { }).exceptionHandler(err -> {
Context ctx = Vertx.currentContext();
assertOnIOContext(ctx);
assertEquals(err.getClass(), java.net.ConnectException.class); assertEquals(err.getClass(), java.net.ConnectException.class);
testComplete(); testComplete();
}).end(); }).end();
Expand Down Expand Up @@ -534,6 +557,8 @@ public void testServerResetClientStreamDuringRequest() throws Exception {
client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { client.post(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
fail(); fail();
}).exceptionHandler(err -> { }).exceptionHandler(err -> {
Context ctx = Vertx.currentContext();
assertOnIOContext(ctx);
assertTrue(err instanceof StreamResetException); assertTrue(err instanceof StreamResetException);
StreamResetException reset = (StreamResetException) err; StreamResetException reset = (StreamResetException) err;
assertEquals(8, reset.getCode()); assertEquals(8, reset.getCode());
Expand Down Expand Up @@ -610,12 +635,26 @@ public void testPushPromise() throws Exception {
}).end(); }).end();
}); });
startServer(); startServer();
AtomicReference<Context> ctx = new AtomicReference<>();
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context current = Vertx.currentContext();
if (ctx.get() == null) {
ctx.set(current);
} else {
assertEquals(ctx.get(), current);
}
resp.endHandler(v -> { resp.endHandler(v -> {
complete(); complete();
}); });
}); });
req.pushPromiseHandler(pushedReq -> { req.pushPromiseHandler(pushedReq -> {
Context current = Vertx.currentContext();
if (ctx.get() == null) {
ctx.set(current);
} else {
assertEquals(ctx.get(), current);
}
assertOnIOContext(current);
assertEquals(HttpMethod.GET, pushedReq.method()); assertEquals(HttpMethod.GET, pushedReq.method());
assertEquals("/wibble", pushedReq.uri()); assertEquals("/wibble", pushedReq.uri());
pushedReq.handler(resp -> { pushedReq.handler(resp -> {
Expand Down Expand Up @@ -692,6 +731,8 @@ public void testConnectionHandler() throws Exception {
AtomicReference<HttpConnection> connection = new AtomicReference<>(); AtomicReference<HttpConnection> connection = new AtomicReference<>();
HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath"); HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath");
req1.connectionHandler(conn -> { req1.connectionHandler(conn -> {
Context ctx = Vertx.currentContext();
assertOnIOContext(ctx);
assertTrue(connection.compareAndSet(null, conn)); assertTrue(connection.compareAndSet(null, conn));
}); });
req1.handler(resp -> { req1.handler(resp -> {
Expand Down Expand Up @@ -736,7 +777,9 @@ public void testConnectionShutdownInConnectionHandler() throws Exception {
AtomicInteger clientStatus = new AtomicInteger(); AtomicInteger clientStatus = new AtomicInteger();
HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath"); HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath");
req1.connectionHandler(conn -> { req1.connectionHandler(conn -> {
Context ctx = Vertx.currentContext();
conn.shutdownHandler(v -> { conn.shutdownHandler(v -> {
assertOnIOContext(ctx);
clientStatus.compareAndSet(1, 2); clientStatus.compareAndSet(1, 2);
}); });
if (clientStatus.getAndIncrement() == 0) { if (clientStatus.getAndIncrement() == 0) {
Expand Down Expand Up @@ -764,7 +807,9 @@ public void testServerShutdownConnection() throws Exception {
startServer(); startServer();
HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath"); HttpClientRequest req1 = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath");
req1.connectionHandler(conn -> { req1.connectionHandler(conn -> {
Context ctx = Vertx.currentContext();
conn.goAwayHandler(ga -> { conn.goAwayHandler(ga -> {
assertOnIOContext(ctx);
complete(); complete();
}); });
}); });
Expand Down Expand Up @@ -1039,6 +1084,8 @@ public void test100Continue() throws Exception {
}); });
req.putHeader("expect", "100-continue"); req.putHeader("expect", "100-continue");
req.continueHandler(v -> { req.continueHandler(v -> {
Context ctx = Vertx.currentContext();
assertOnIOContext(ctx);
assertEquals(1, status.getAndIncrement()); assertEquals(1, status.getAndIncrement());
req.end(Buffer.buffer("request-body")); req.end(Buffer.buffer("request-body"));
}); });
Expand Down Expand Up @@ -1200,8 +1247,10 @@ public void testUnknownFrame() throws Exception {
startServer(); startServer();
AtomicInteger status = new AtomicInteger(); AtomicInteger status = new AtomicInteger();
HttpClientRequest req = client.request(HttpMethod.GET, DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { HttpClientRequest req = client.request(HttpMethod.GET, DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context ctx = Vertx.currentContext();
assertEquals(0, status.getAndIncrement()); assertEquals(0, status.getAndIncrement());
resp.unknownFrameHandler(frame -> { resp.unknownFrameHandler(frame -> {
assertOnIOContext(ctx);
assertEquals(1, status.getAndIncrement()); assertEquals(1, status.getAndIncrement());
assertEquals(12, frame.type()); assertEquals(12, frame.type());
assertEquals(134, frame.flags()); assertEquals(134, frame.flags());
Expand Down Expand Up @@ -1279,18 +1328,19 @@ public void testIdleTimeout() throws Exception {
client.close(); client.close();
client = vertx.createHttpClient(clientOptions.setIdleTimeout(2)); client = vertx.createHttpClient(clientOptions.setIdleTimeout(2));
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> { HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath", resp -> {
Context ctx = Vertx.currentContext();
resp.exceptionHandler(err -> { resp.exceptionHandler(err -> {
System.out.println("got resp timeout"); assertOnIOContext(ctx);
complete(); complete();
}); });
}); });
req.exceptionHandler(err -> { req.exceptionHandler(err -> {
System.out.println("got req timeout");
complete(); complete();
}); });
req.connectionHandler(conn -> { req.connectionHandler(conn -> {
conn.closeHandler(v -> { conn.closeHandler(v -> {
System.out.println("got close"); Context ctx = Vertx.currentContext();
assertOnIOContext(ctx);
complete(); complete();
}); });
}); });
Expand Down

0 comments on commit c91b634

Please sign in to comment.