Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inbound read queue work + usage in Http1xServerRequest #5164

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 33 additions & 20 deletions src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.impl.InboundReadQueue;
import io.vertx.core.tracing.TracingPolicy;

import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -157,23 +158,42 @@ public HttpServerMetrics metrics() {
return metrics;
}

public void handleMessage(Object msg) {
private final InboundReadQueue<Object> inbound = new InboundReadQueue<>(this::handleMessage2);

@Override
protected void handleMessage(Object msg) {
if (!inbound.add(msg)) {
doPause();
}
}

void ack(Object msg) {
if (inbound.ack(msg)) {
chctx.executor().execute(() -> {
if (inbound.drain()) {
doResume();
}
});
}
}

public void handleMessage2(Object msg) {
assert msg != null;
if (requestInProgress == null && wantClose && webSocket == null) {
// Discard message
return;
}
// fast-path first
if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
onEnd();
onContent(msg);
} else if (msg instanceof DefaultHttpRequest) {
// fast path type check vs concrete class
DefaultHttpRequest request = (DefaultHttpRequest) msg;
ContextInternal requestCtx = streamContextSupplier.get();
Http1xServerRequest req = new Http1xServerRequest(this, request, requestCtx);
assert requestInProgress == null;
requestInProgress = req;
if (responseInProgress != null) {
enqueueRequest(req);
return;
}
boolean keepAlive = HttpUtils.isKeepAlive(request);
Expand All @@ -182,48 +202,42 @@ public void handleMessage(Object msg) {
req.handleBegin(keepAlive);
Handler<HttpServerRequest> handler = request.decoderResult().isSuccess() ? requestHandler : invalidRequestHandler;
req.context.emit(req, handler);
ack(msg);
} else {
handleOther(msg);
}
}

private void enqueueRequest(Http1xServerRequest req) {
// Deferred until the current response completion
responseInProgress.enqueue(req);
req.pause();
}

private void handleOther(Object msg) {
// concrete type check first
if (msg instanceof DefaultHttpContent || msg instanceof HttpContent) {
onContent(msg);
} else if (msg instanceof WebSocketFrame) {
ack(msg);
handleWsFrame((WebSocketFrame) msg);
}
}

private void onContent(Object msg) {
HttpContent content = (HttpContent) msg;
if (!content.decoderResult().isSuccess()) {
ack(msg); // ??
handleError(content);
return;
}
Buffer buffer = BufferInternal.buffer(VertxHandler.safeBuffer(content.content()));
Http1xServerRequest request = requestInProgress;
request.context.execute(buffer, request::handleContent);
request.context.execute(content, request::handleContent);
//TODO chunk trailers
if (content instanceof LastHttpContent) {
onEnd();
}
}

private void onEnd() {
boolean tryClose;
Http1xServerRequest request = requestInProgress;
requestInProgress = null;
tryClose = wantClose && responseInProgress == null;
request.context.execute(request, Http1xServerRequest::handleEnd);
if (tryClose) {
boolean doClose;
doClose = wantClose && responseInProgress == null && webSocket == null;
if (doClose) {
if (shutdownTimerID != -1L) {
if (!vertx.cancelTimer(shutdownTimerID)) {
return;
Expand Down Expand Up @@ -273,10 +287,9 @@ void responseComplete() {
// No keep-alive
flushAndClose();
} else {
Http1xServerRequest next = request.next();
Http1xServerRequest next = requestInProgress;
if (next != null) {
// Handle pipelined request
handleNext(next);
handleNext(requestInProgress);
} else if (wantClose && shutdownTimerID != -1L && vertx.cancelTimer(shutdownTimerID)) {
shutdownTimerID = -1L;
flushAndClose();
Expand All @@ -299,10 +312,10 @@ private void handleNext(Http1xServerRequest next) {
wantClose |= !keepAlive;
next.handleBegin(keepAlive);
next.context.emit(next, next_ -> {
next_.resume();
Handler<HttpServerRequest> handler = next_.nettyRequest().decoderResult().isSuccess() ? requestHandler : invalidRequestHandler;
handler.handle(next_);
});
ack(next.nettyRequest());
}

@Override
Expand Down
149 changes: 86 additions & 63 deletions src/main/java/io/vertx/core/http/impl/Http1xServerRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package io.vertx.core.http.impl;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.*;
Expand All @@ -35,11 +36,11 @@
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.impl.InboundBuffer;

import javax.net.ssl.SSLPeerUnverifiedException;
import javax.security.cert.X509Certificate;
Expand Down Expand Up @@ -77,7 +78,6 @@ public class Http1xServerRequest extends HttpServerRequestInternal implements io
private String query;

// Accessed on event loop
Http1xServerRequest next;
Object metric;
Object trace;
boolean reportMetricsFailed;
Expand All @@ -95,14 +95,17 @@ public class Http1xServerRequest extends HttpServerRequestInternal implements io
private MultiMap attributes;
private boolean expectMultipart;
private HttpPostRequestDecoder decoder;
private boolean ending;
private boolean ended;
private long bytesRead;
private InboundBuffer<Object> pending;
private HttpContent pending;
private long demand;

Http1xServerRequest(Http1xServerConnection conn, HttpRequest request, ContextInternal context) {
this.conn = conn;
this.context = context;
this.request = request;
this.demand = Long.MAX_VALUE;
}

private HttpEventHandler eventHandler(boolean create) {
Expand All @@ -118,35 +121,34 @@ HttpRequest nettyRequest() {
}
}

private InboundBuffer<Object> pendingQueue() {
if (pending == null) {
pending = new InboundBuffer<>(context, 8);
pending.drainHandler(v -> conn.doResume());
pending.handler(buffer -> {
if (buffer == InboundBuffer.END_SENTINEL) {
onEnd();
} else {
onData((Buffer) buffer);
}
});
}
return pending;
}

void handleContent(Buffer buffer) {
InboundBuffer<Object> queue;
boolean handleContent(HttpContent content) {
boolean last = content instanceof LastHttpContent;
vietj marked this conversation as resolved.
Show resolved Hide resolved
synchronized (conn) {
queue = pending;
}
if (queue != null) {
// We queue requests if paused or a request is in progress to prevent responses being written in the wrong order
if (!queue.write(buffer)) {
// We only pause when we are actively called by the connection
conn.doPause();
assert pending == null;
if (demand == 0L) {
pending = content;
return false;
} else if (demand != Long.MAX_VALUE) {
demand--;
}
ending |= last;
}
ByteBuf buf = content.content();
if (buf.readableBytes() > 0) {
Buffer data = BufferInternal.buffer(VertxHandler.safeBuffer(buf));
vietj marked this conversation as resolved.
Show resolved Hide resolved
onData(data);
}
conn.ack(content);
if (last) {
synchronized (conn) {
if (demand == 0L) {
return false;
}
ended = true;
}
} else {
onData(buffer);
onEnd();
}
return true;
}

void handleBegin(boolean keepAlive) {
Expand All @@ -159,26 +161,6 @@ void handleBegin(boolean keepAlive) {
}
}

/**
* Enqueue a pipelined request.
*
* @param request the enqueued request
*/
void enqueue(Http1xServerRequest request) {
Http1xServerRequest current = this;
while (current.next != null) {
current = current.next;
}
current.next = request;
}

/**
* @return the next request following this one
*/
Http1xServerRequest next() {
return next;
}

private void check100() {
if (HttpUtil.is100ContinueExpected(request)) {
conn.write100Continue(null);
Expand Down Expand Up @@ -336,17 +318,64 @@ public HttpServerRequest exceptionHandler(Handler<Throwable> handler) {
@Override
public HttpServerRequest pause() {
synchronized (conn) {
pendingQueue().pause();
demand = 0L;
return this;
}
}

@Override
public HttpServerRequest fetch(long amount) {
synchronized (conn) {
pendingQueue().fetch(amount);
if (amount < 0L) {
throw new IllegalArgumentException();
}
if (amount == 0L) {
return this;
}
synchronized (conn) {
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
// if (pending == null) {
// return this;
// }
}
// SHOULD EXECUTE INSTEAD ??? MAYBE BUT THEN DEAL WITH REENTRANT
context.runOnContext(v -> {
checkPending();
});
return this;
}

private void checkPending() {
while (true) {
HttpContent content;
synchronized (conn) {
if (demand == 0L) {
return;
}
content = pending;
if (content != null) {
boolean last = content instanceof LastHttpContent;
ending |= last;
pending = null;
} else if (ending && !ended) {
ended = true;
} else {
return;
}
}
if (content == null) {
onEnd();
} else {
ByteBuf buf = content.content();
if (buf.readableBytes() > 0) {
Buffer data = BufferInternal.buffer(VertxHandler.safeBuffer(buf));
onData(data);
}
conn.ack(content);
}
}
}

@Override
Expand Down Expand Up @@ -509,7 +538,7 @@ public synchronized boolean isExpectMultipart() {
@Override
public boolean isEnded() {
synchronized (conn) {
return ended && (pending == null || (!pending.isPaused() && pending.isEmpty()));
return ended;
}
}

Expand All @@ -535,6 +564,7 @@ public synchronized Future<Void> end() {
return eventHandler(true).end();
}

// TODO : should pass content and ACK
private void onData(Buffer data) {
HttpEventHandler handler;
synchronized (conn) {
Expand All @@ -557,16 +587,9 @@ private void onData(Buffer data) {
}
}

void handleEnd() {
InboundBuffer<Object> queue;
synchronized (conn) {
ended = true;
queue = pending;
}
if (queue != null) {
queue.write(InboundBuffer.END_SENTINEL);
} else {
onEnd();
private void consume1() {
if (demand != Long.MAX_VALUE) {
demand--;
}
}

Expand Down