Skip to content

Commit

Permalink
Improve HttpClientRequest reset support - fixes #1642
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Sep 28, 2016
1 parent baf997d commit e3affd7
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 14 deletions.
23 changes: 17 additions & 6 deletions src/main/java/io/vertx/core/http/HttpClientRequest.java
Expand Up @@ -137,13 +137,13 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
/** /**
* Set the request host.<p/> * Set the request host.<p/>
* *
* For HTTP2 it sets the {@literal :authority} pseudo header otherwise it sets the {@literal Host} header * For HTTP/2 it sets the {@literal :authority} pseudo header otherwise it sets the {@literal Host} header
*/ */
@Fluent @Fluent
HttpClientRequest setHost(String host); HttpClientRequest setHost(String host);


/** /**
* @return the request host. For HTTP2 it returns the {@literal :authority} pseudo header otherwise it returns the {@literal Host} header * @return the request host. For HTTP/2 it returns the {@literal :authority} pseudo header otherwise it returns the {@literal Host} header
*/ */
String getHost(); String getHost();


Expand Down Expand Up @@ -308,17 +308,28 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC


/** /**
* Reset this stream with the error code {@code 0}. * Reset this stream with the error code {@code 0}.
*
* @see #reset(long)
*/ */
default void reset() { default boolean reset() {
reset(0L); return reset(0L);
} }


/** /**
* Reset this stream with the error {@code code}. * Reset this request:
* <p/>
* <ul>
* <li>for HTTP/2, this performs send an HTTP/2 reset frame with the specified error {@code code}</li>
* <li>for HTTP/1.x, this closes the connection after the current in-flight requests are ended</li>
* </ul>
* <p/>
* When the request has not yet been sent, the request will be aborted and false is returned as indicator.
* <p/>
* *
* @param code the error code * @param code the error code
* @return true when reset has been performed
*/ */
void reset(long code); boolean reset(long code);


/** /**
* @return the {@link HttpConnection} associated with this request * @return the {@link HttpConnection} associated with this request
Expand Down
29 changes: 26 additions & 3 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -44,6 +44,7 @@


import java.net.URI; import java.net.URI;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
Expand Down Expand Up @@ -76,7 +77,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private final Http1xPool pool; private final Http1xPool pool;
private final Object endpointMetric; private final Object endpointMetric;
// Requests can be pipelined so we need a queue to keep track of requests // Requests can be pipelined so we need a queue to keep track of requests
private final Queue<HttpClientRequestImpl> requests = new ArrayDeque<>(); private final Deque<HttpClientRequestImpl> requests = new ArrayDeque<>();
private final Object metric; private final Object metric;
private final HttpClientMetrics metrics; private final HttpClientMetrics metrics;
private final HttpVersion version; private final HttpVersion version;
Expand All @@ -87,6 +88,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private HttpClientRequestImpl requestForResponse; private HttpClientRequestImpl requestForResponse;
private WebSocketImpl ws; private WebSocketImpl ws;


private boolean reset;
private boolean paused; private boolean paused;
private Buffer pausedChunk; private Buffer pausedChunk;


Expand Down Expand Up @@ -377,7 +379,18 @@ void handleResponseEnd(LastHttpContent trailer) {
// currently Vertx forces the Connection header if keepalive is enabled for 1.0 // currently Vertx forces the Connection header if keepalive is enabled for 1.0
close = true; close = true;
} }
pool.responseEnded(this, close);
if (close) {
pool.responseEnded(this, true);
} else {
if (reset) {
if (requests.isEmpty()) {
pool.responseEnded(this, true);
}
} else {
pool.responseEnded(this, false);
}
}
} }
currentResponse = null; currentResponse = null;
} }
Expand Down Expand Up @@ -426,7 +439,17 @@ public ContextImpl getContext() {
} }


public void reset(long code) { public void reset(long code) {
throw new UnsupportedOperationException("HTTP/1.x request cannot be reset"); if (currentRequest == null) {
throw new IllegalStateException();
}
if (!reset) {
currentRequest = null;
reset = true;
requests.removeLast();
if (requests.size() == 0) {
pool.responseEnded(this, true);
}
}
} }


private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) { private HttpRequest createRequest(HttpVersion version, HttpMethod method, String rawMethod, String uri, MultiMap headers) {
Expand Down
14 changes: 10 additions & 4 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -362,13 +362,19 @@ public HttpClientRequest pushHandler(Handler<HttpClientRequest> handler) {
} }


@Override @Override
public void reset(long code) { public boolean reset(long code) {
synchronized (getLock()) { synchronized (getLock()) {
if (stream == null) { if (completed) {
throw new IllegalStateException("Cannot reset the request that is not yet connected"); throw new IllegalStateException("Request already completed");
} }
exceptionOccurred = true;
completed = true; completed = true;
stream.reset(code); if (stream != null) {
stream.reset(code);
return true;
} else {
return false;
}
} }
} }


Expand Down
Expand Up @@ -103,9 +103,10 @@ public HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> han
} }


@Override @Override
public void reset(long code) { public boolean reset(long code) {
synchronized (conn) { synchronized (conn) {
stream.reset(code); stream.reset(code);
return true;
} }
} }


Expand Down

0 comments on commit e3affd7

Please sign in to comment.