Skip to content

Commit

Permalink
Always process a message in Http1xServerConnection and don't put it b…
Browse files Browse the repository at this point in the history
…ack on the queue, instead when the HttpRequest ends and the corresponding response is still in progress then we pause the connection to avoid processing pipelined requests concurrently
  • Loading branch information
vietj committed Apr 11, 2018
1 parent fea9580 commit 5c55981
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions src/main/java/io/vertx/core/http/impl/Http1xServerConnection.java
Expand Up @@ -139,8 +139,10 @@ synchronized void resume() {
} }


synchronized void handleMessage(Object msg) { synchronized void handleMessage(Object msg) {
if (queueing || !processMessage(msg)) { if (queueing) {
enqueue(msg); enqueue(msg);
} else {
processMessage(msg);
} }
} }


Expand Down Expand Up @@ -180,7 +182,9 @@ synchronized void responseComplete() {
} }
} }
pendingResponse = null; pendingResponse = null;
checkNextTick(); if (currentRequest == null && paused) {
resume();
}
} }


synchronized void requestHandlers(HttpHandlers handlers) { synchronized void requestHandlers(HttpHandlers handlers) {
Expand Down Expand Up @@ -427,15 +431,12 @@ private void handleError(HttpObject obj) {
} }
} }


private boolean processMessage(Object msg) { private void processMessage(Object msg) {
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
if (pendingResponse != null) {
return false;
}
HttpRequest request = (HttpRequest) msg; HttpRequest request = (HttpRequest) msg;
if (request.decoderResult().isFailure()) { if (request.decoderResult().isFailure()) {
handleError(request); handleError(request);
return false; return;
} }
if (options.isHandle100ContinueAutomatically() && HttpUtil.is100ContinueExpected(request)) { if (options.isHandle100ContinueAutomatically() && HttpUtil.is100ContinueExpected(request)) {
write100Continue(); write100Continue();
Expand All @@ -457,7 +458,6 @@ private boolean processMessage(Object msg) {
} else { } else {
handleOther(msg); handleOther(msg);
} }
return true;
} }


private void handleContent(Object msg) { private void handleContent(Object msg) {
Expand All @@ -484,6 +484,9 @@ private void handleLastHttpContent() {
bytesRead = 0; bytesRead = 0;
} }
currentRequest = null; currentRequest = null;
if (pendingResponse != null) {
pause();
}
} }


private void handleOther(Object msg) { private void handleOther(Object msg) {
Expand All @@ -506,15 +509,14 @@ private void checkNextTick() {
// The only place we poll the pending queue, so we are sure that pending.size() > 0 // The only place we poll the pending queue, so we are sure that pending.size() > 0
// since we got there because queueing was true // since we got there because queueing was true
Object msg = pending.poll(); Object msg = pending.poll();
if (processMessage(msg)) { if (pending.isEmpty()) {
if (pending.isEmpty()) { // paused == false && pending.size() == 0 => queueing = false
unsetQueueing(); unsetQueueing();
} else {
checkNextTick();
}
} else {
pending.addFirst(msg);
} }
// Process message, it might pause the connection
processMessage(msg);
// Check next tick in case we still have pending messages and the connection is not paused
checkNextTick();
} }
} }
}); });
Expand Down

0 comments on commit 5c55981

Please sign in to comment.