Skip to content

Commit

Permalink
Use ContextInternal schedule/dispatch instead of executeFromIO for HT…
Browse files Browse the repository at this point in the history
…TP layer
  • Loading branch information
vietj committed Mar 2, 2019
1 parent 5e57b77 commit 35a59de
Show file tree
Hide file tree
Showing 32 changed files with 390 additions and 225 deletions.
Expand Up @@ -34,17 +34,7 @@ final class DatagramChannelFutureListener<T> implements ChannelFutureListener {

@Override
public void operationComplete(final ChannelFuture future) throws Exception {

context.executeFromIO(v ->
notifyHandler(future));

}

private void notifyHandler(ChannelFuture future) {
if (future.isSuccess()) {
handler.handle(Future.succeededFuture(result));
} else {
handler.handle(Future.failedFuture(future.cause()));
}
Future<T> res = future.isSuccess() ? Future.succeededFuture(result) : Future.failedFuture(future.cause());
context.executeFromIO(res, handler);
}
}
Expand Up @@ -385,7 +385,7 @@ protected void handleClosed() {
metrics.close();
}
if (handler != null) {
handler.handle(null);
context.dispatch(handler);
}
}

Expand Down Expand Up @@ -416,7 +416,7 @@ void handlePacket(io.vertx.core.datagram.DatagramPacket packet) {
}
}
if (handler != null) {
handler.handle(packet);
context.dispatch(packet, handler);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -147,12 +147,12 @@ void getConnection(ContextInternal ctx, String peerHost, boolean ssl, int port,
metrics.dequeueRequest(endpoint.metric, metric);
}

handler.handle(Future.succeededFuture(conn));
ctx.schedule(Future.succeededFuture(conn), handler);
} else {
if (metrics != null) {
metrics.dequeueRequest(endpoint.metric, metric);
}
handler.handle(Future.failedFuture(ar.cause()));
ctx.schedule(Future.failedFuture(ar.cause()), handler);
}
})) {
break;
Expand Down
Expand Up @@ -246,7 +246,7 @@ public HttpClientConnection connection() {
}

@Override
public Context getContext() {
public ContextInternal getContext() {
return conn.context;
}

Expand Down
38 changes: 16 additions & 22 deletions src/main/java/io/vertx/core/http/impl/Http2ClientConnection.java
Expand Up @@ -146,19 +146,15 @@ public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
context.executeFromIO(v -> {
stream.handleHeaders(headers, streamPriority, endOfStream);
});
stream.handleHeaders(headers, streamPriority, endOfStream);
}
}

@Override
public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = (Http2ClientStream) streams.get(streamId);
if (stream != null) {
context.executeFromIO(v -> {
stream.handleHeaders(headers, null, endOfStream);
});
stream.handleHeaders(headers, null, endOfStream);
}
}

Expand All @@ -168,21 +164,19 @@ public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int stream
if (stream != null) {
Handler<HttpClientRequest> pushHandler = stream.pushHandler();
if (pushHandler != null) {
context.executeFromIO(v -> {
String rawMethod = headers.method().toString();
HttpMethod method = HttpUtils.toVertxMethod(rawMethod);
String uri = headers.path().toString();
String host = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
int port = remoteAddress().port();
HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap);
if (metrics != null) {
pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq));
}
streams.put(promisedStreamId, pushReq.getStream());
pushHandler.handle(pushReq);
});
String rawMethod = headers.method().toString();
HttpMethod method = HttpUtils.toVertxMethod(rawMethod);
String uri = headers.path().toString();
String host = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers);
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
int port = remoteAddress().port();
HttpClientRequestPushPromise pushReq = new HttpClientRequestPushPromise(this, promisedStream, client, isSsl(), method, rawMethod, uri, host, port, headersMap);
if (metrics != null) {
pushReq.metric(metrics.responsePushed(queueMetric, metric(), localAddress(), remoteAddress(), pushReq));
}
streams.put(promisedStreamId, pushReq.getStream());
context.dispatch(pushReq, pushHandler);
return;
}
}
Expand Down Expand Up @@ -420,7 +414,7 @@ public void reportBytesRead(long numberOfBytes) {
}

@Override
public Context getContext() {
public ContextInternal getContext() {
return context;
}

Expand Down
40 changes: 18 additions & 22 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -135,7 +135,7 @@ synchronized void onConnectionError(Throwable cause) {
copy = new ArrayList<>(streams.values());
}
for (VertxHttp2Stream stream : copy) {
context.executeFromIO(v -> stream.handleException(cause));
stream.handleException(cause);
}
handleException(cause);
}
Expand All @@ -156,7 +156,7 @@ void onStreamWritabilityChanged(Http2Stream s) {
stream = streams.get(s.id());
}
if (stream != null) {
context.executeFromIO(v -> stream.onWritabilityChanged());
context.schedule(null, v -> stream.onWritabilityChanged());
}
}

Expand All @@ -168,7 +168,7 @@ void onStreamClosed(Http2Stream stream) {
return;
}
}
context.executeFromIO(v -> removed.handleClose());
removed.handleClose();
checkShutdownHandler();
}

Expand All @@ -194,7 +194,7 @@ boolean onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
}
if (handler != null) {
Buffer buffer = Buffer.buffer(debugData);
context.executeFromIO(v -> handler.handle(new GoAway().setErrorCode(errorCode).setLastStreamId(lastStreamId).setDebugData(buffer)));
context.dispatch(new GoAway().setErrorCode(errorCode).setLastStreamId(lastStreamId).setDebugData(buffer), handler);
}
checkShutdownHandler();
return true;
Expand All @@ -213,7 +213,7 @@ public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDe
.setDependency(streamDependency)
.setWeight(weight)
.setExclusive(exclusive);
context.executeFromIO(v -> stream.handlePriorityChange(streamPriority));
stream.handlePriorityChange(streamPriority);
}
}

Expand All @@ -230,7 +230,7 @@ public void onSettingsAckRead(ChannelHandlerContext ctx) {
}
if (handler != null) {
// No need to run on a particular context it shall be done by the handler instead
context.executeFromIO(handler);
context.dispatch(handler);
}
}

Expand All @@ -257,7 +257,7 @@ public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) {
handler = remoteSettingsHandler;
}
if (handler != null) {
context.executeFromIO(HttpUtils.toVertxSettings(settings), handler);
context.dispatch(HttpUtils.toVertxSettings(settings), handler);
}
if (changed) {
concurrencyChanged(maxConcurrentStreams);
Expand All @@ -269,18 +269,16 @@ public void onPingRead(ChannelHandlerContext ctx, long data) throws Http2Excepti
Handler<Buffer> handler = pingHandler;
if (handler != null) {
Buffer buff = Buffer.buffer().appendLong(data);
context.executeFromIO(v -> handler.handle(buff));
context.dispatch(buff, handler);
}
}

@Override
public void onPingAckRead(ChannelHandlerContext ctx, long data) throws Http2Exception {
Handler<AsyncResult<Buffer>> handler = pongHandlers.poll();
if (handler != null) {
context.executeFromIO(v -> {
Buffer buff = Buffer.buffer().appendLong(data);
handler.handle(Future.succeededFuture(buff));
});
Buffer buff = Buffer.buffer().appendLong(data);
context.dispatch(Future.succeededFuture(buff), handler);
}
}

Expand All @@ -306,7 +304,7 @@ public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int stream
}
if (req != null) {
Buffer buff = Buffer.buffer(safeBuffer(payload, ctx.alloc()));
context.executeFromIO(v -> req.handleCustomFrame(frameType, flags.value(), buff));
req.handleCustomFrame(frameType, flags.value(), buff);
}
}

Expand All @@ -319,7 +317,7 @@ public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorC
return;
}
}
context.executeFromIO(v -> req.onResetRead(errorCode));
req.onResetRead(errorCode);
}

@Override
Expand All @@ -332,14 +330,12 @@ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int
if (req != null) {
data = safeBuffer(data, ctx.alloc());
Buffer buff = Buffer.buffer(data);
context.executeFromIO(v -> {
int len = buff.length();
if (req.onDataRead(buff)) {
consumed[0] += len;
}
});
int len = buff.length();
if (req.onDataRead(buff)) {
consumed[0] += len;
}
if (endOfStream) {
context.executeFromIO(v -> req.onEnd());
req.onEnd();
}
}
return consumed[0];
Expand Down Expand Up @@ -516,7 +512,7 @@ private void checkShutdownHandler() {
shutdownHandler = this.shutdownHandler;
}
if (shutdownHandler != null) {
context.executeFromIO(shutdownHandler);
context.dispatch(shutdownHandler);
}
}
}
15 changes: 6 additions & 9 deletions src/main/java/io/vertx/core/http/impl/Http2ServerConnection.java
Expand Up @@ -127,13 +127,12 @@ public synchronized void onHeadersRead(ChannelHandlerContext ctx, int streamId,
req.response().writeContinue();
}
streams.put(streamId, req);
context.executeFromIO(req, requestHandler);
context.dispatch(req, requestHandler);
} else {
// Http server request trailer - not implemented yet (in api)
}
if (endOfStream) {
VertxHttp2Stream finalStream = stream;
context.executeFromIO(v -> finalStream.onEnd());
stream.onEnd();
}
}

Expand Down Expand Up @@ -181,15 +180,13 @@ public void handle(AsyncResult<Integer> ar) {
streams.put(promisedStreamId, push);
if (maxConcurrentStreams == null || concurrentStreams < maxConcurrentStreams) {
concurrentStreams++;
context.executeFromIO(v -> push.complete());
push.complete();
} else {
pendingPushes.add(push);
}
}
} else {
context.executeFromIO(v -> {
completionHandler.handle(Future.failedFuture(ar.cause()));
});
context.dispatch(Future.failedFuture(ar.cause()), completionHandler);
}
}
});
Expand Down Expand Up @@ -249,7 +246,7 @@ void handleReset(long errorCode) {
if (response != null) {
response.handleReset(errorCode);
} else {
completionHandler.fail(new StreamResetException(errorCode));
context.dispatch(Future.failedFuture(new StreamResetException(errorCode)), completionHandler);
}
}

Expand Down Expand Up @@ -283,7 +280,7 @@ void complete() {
if (METRICS_ENABLED && metrics != null) {
response.metric(metrics.responsePushed(conn.metric(), method, uri, response));
}
completionHandler.complete(response);
context.dispatch(Future.succeededFuture(response), completionHandler);
}
}
}
Expand Down
Expand Up @@ -130,7 +130,7 @@ private void notifyException(Throwable failure) {
}
}
if (handler != null) {
handler.handle(failure);
context.dispatch(failure, handler);
}
if (upload instanceof NettyFileUpload) {
((NettyFileUpload)upload).handleException(failure);
Expand All @@ -152,7 +152,7 @@ void handleClose() {
@Override
void handleCustomFrame(int type, int flags, Buffer buff) {
if (customFrameHandler != null) {
customFrameHandler.handle(new HttpFrameImpl(type, flags, buff));
context.dispatch(new HttpFrameImpl(type, flags, buff), customFrameHandler);
}
}

Expand All @@ -166,7 +166,7 @@ void handleData(Buffer data) {
}
}
if (dataHandler != null) {
dataHandler.handle(data);
context.dispatch(data, dataHandler);
}
}

Expand Down Expand Up @@ -198,7 +198,7 @@ void handleEnd(MultiMap trailers) {
}
}
if (endHandler != null) {
endHandler.handle(null);
context.dispatch(endHandler);
}
}

Expand Down Expand Up @@ -557,7 +557,7 @@ void handlePriorityChange(StreamPriority streamPriority) {
}
}
if (handler != null && priorityChanged) {
handler.handle(streamPriority);
context.dispatch(streamPriority, handler);
}
}

Expand Down
Expand Up @@ -109,7 +109,7 @@ void handleException(Throwable cause) {
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(cause);
conn.getContext().dispatch(cause, handler);
}
}

Expand Down Expand Up @@ -417,7 +417,7 @@ private void sanitizeHeaders() {
private boolean checkSendHeaders(boolean end) {
if (!headWritten) {
if (headersEndHandler != null) {
headersEndHandler.handle(null);
conn.getContext().dispatch(headersEndHandler);
}
sanitizeHeaders();
if (Metrics.METRICS_ENABLED && metric != null) {
Expand Down Expand Up @@ -465,10 +465,10 @@ void write(ByteBuf chunk, boolean end) {
}
if (end) {
if (bodyEndHandler != null) {
bodyEndHandler.handle(null);
conn.getContext().dispatch(bodyEndHandler);
}
if (endHandler != null) {
endHandler.handle(null);
conn.getContext().dispatch(null, endHandler);
}
}
}
Expand All @@ -492,7 +492,7 @@ private void checkEnded() {

void writabilityChanged() {
if (!ended && !writeQueueFull() && drainHandler != null) {
drainHandler.handle(null);
conn.getContext().dispatch(drainHandler);
}
}

Expand Down

0 comments on commit 35a59de

Please sign in to comment.