Skip to content

Commit

Permalink
Resetting a client stream should make the stream has ended
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 30, 2016
1 parent b508969 commit 2eea704
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 12 deletions.
Expand Up @@ -353,7 +353,14 @@ public void endRequest() {


@Override @Override
public void reset(long code) { public void reset(long code) {
writeReset(code); if (!(requestEnded && responseEnded)) {
requestEnded = true;
responseEnded = true;
writeReset(code);
if (conn.metrics.isEnabled()) {
conn.metrics.requestReset(request.metric());
}
}
} }


@Override @Override
Expand Down
Expand Up @@ -350,6 +350,7 @@ public void reset(long code) {
if (stream == null) { if (stream == null) {
throw new IllegalStateException("Cannot reset the request that is not yet connected"); throw new IllegalStateException("Cannot reset the request that is not yet connected");
} }
completed = true;
stream.reset(code); stream.reset(code);
} }
} }
Expand Down
Expand Up @@ -33,33 +33,33 @@
*/ */
class HttpClientRequestPushPromise extends HttpClientRequestBase { class HttpClientRequestPushPromise extends HttpClientRequestBase {


private final Http2ClientConnection handler; private final Http2ClientConnection conn;
private final Http2ClientConnection.Http2ClientStream clientStream; private final Http2ClientConnection.Http2ClientStream stream;
private final HttpMethod method; private final HttpMethod method;
private final String uri; private final String uri;
private final String host; private final String host;
private final MultiMap headers; private final MultiMap headers;
private Handler<HttpClientResponse> respHandler; private Handler<HttpClientResponse> respHandler;


public HttpClientRequestPushPromise( public HttpClientRequestPushPromise(
Http2ClientConnection handler, Http2ClientConnection conn,
Http2Stream clientStream, Http2Stream stream,
HttpClientImpl client, HttpClientImpl client,
HttpMethod method, HttpMethod method,
String uri, String uri,
String host, String host,
MultiMap headers) throws Http2Exception { MultiMap headers) throws Http2Exception {
super(client, method, host, uri); super(client, method, host, uri);
this.handler = handler; this.conn = conn;
this.clientStream = new Http2ClientConnection.Http2ClientStream(handler, this, clientStream); this.stream = new Http2ClientConnection.Http2ClientStream(conn, this, stream);
this.method = method; this.method = method;
this.uri = uri; this.uri = uri;
this.host = host; this.host = host;
this.headers = headers; this.headers = headers;
} }


Http2ClientConnection.Http2ClientStream getStream() { Http2ClientConnection.Http2ClientStream getStream() {
return clientStream; return stream;
} }


@Override @Override
Expand Down Expand Up @@ -90,7 +90,7 @@ public HttpClientRequest handler(Handler<HttpClientResponse> handler) {


@Override @Override
public HttpConnection connection() { public HttpConnection connection() {
return handler; return conn;
} }


@Override @Override
Expand All @@ -100,7 +100,9 @@ public HttpClientRequest connectionHandler(@Nullable Handler<HttpConnection> han


@Override @Override
public void reset(long code) { public void reset(long code) {
clientStream.reset(code); synchronized (conn) {
stream.reset(code);
}
} }


@Override @Override
Expand Down
29 changes: 27 additions & 2 deletions src/test/java/io/vertx/test/core/Http2ClientTest.java
Expand Up @@ -17,7 +17,6 @@
package io.vertx.test.core; package io.vertx.test.core;


import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -78,6 +77,8 @@
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;


import static io.vertx.test.core.TestUtils.assertIllegalStateException;

/** /**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a> * @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/ */
Expand Down Expand Up @@ -652,7 +653,7 @@ public void testServerResetClientStreamDuringResponse() throws Exception {
} }


@Test @Test
public void testClientResetServerStream() throws Exception { public void testClientResetServerStreamDuringRequest() throws Exception {
Future<Void> bufReceived = Future.future(); Future<Void> bufReceived = Future.future();
server.requestHandler(req -> { server.requestHandler(req -> {
req.handler(buf -> { req.handler(buf -> {
Expand All @@ -677,6 +678,30 @@ public void testClientResetServerStream() throws Exception {
await(); await();
} }


@Test
public void testClientResetServerStreamDuringResponse() throws Exception {
server.requestHandler(req -> {
req.exceptionHandler(err -> {
assertTrue(err instanceof StreamResetException);
});
req.response().exceptionHandler(err -> {
assertTrue(err instanceof StreamResetException);
assertEquals(10L, ((StreamResetException) err).getCode());
testComplete();
});
req.response().setChunked(true).write(Buffer.buffer("some-data"));
});
startServer();
HttpClientRequest req = client.get(DEFAULT_HTTPS_PORT, DEFAULT_HTTPS_HOST, "/somepath");
req.handler(resp -> {
resp.exceptionHandler(this::fail);
req.reset(10);
assertIllegalStateException(() -> req.write(Buffer.buffer()));
assertIllegalStateException(req::end);
}).setChunked(true).write(Buffer.buffer("hello"));
await();
}

@Test @Test
public void testPushPromise() throws Exception { public void testPushPromise() throws Exception {
waitFor(2); waitFor(2);
Expand Down

0 comments on commit 2eea704

Please sign in to comment.