Skip to content

Commit

Permalink
Noop implementation of HttpConnection for client
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 8ebf204 commit f9e34f7
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 19 deletions.
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/HttpClientRequest.java
Expand Up @@ -282,4 +282,9 @@ public interface HttpClientRequest extends WriteStream<Buffer>, ReadStream<HttpC
*/
void reset(long code);

/**
* @return the http connection associated with this request
*/
@CacheReturn
HttpConnection connection();
}
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -498,4 +498,9 @@ public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exceptio
});
return socket;
}

@Override
public HttpConnection connection() {
return null;
}
}
93 changes: 79 additions & 14 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -36,18 +36,19 @@
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.net.NetSocket;

import java.util.ArrayDeque;
import java.util.Map;

/**
Expand Down Expand Up @@ -105,6 +106,7 @@ void closeAllConnections() {

static class Http2ClientStream implements HttpClientStream {

private final VertxClientHandler handler;
private final HttpClientRequestBase req;
private final ContextImpl context;
private final ChannelHandlerContext handlerCtx;
Expand All @@ -115,18 +117,16 @@ static class Http2ClientStream implements HttpClientStream {
private boolean paused;
private int numBytes;

public Http2ClientStream(HttpClientRequestBase req,
Http2Stream stream,
ContextImpl context,
ChannelHandlerContext handlerCtx,
Http2Connection conn,
Http2ConnectionEncoder encoder) throws Http2Exception {
this.context = context;
public Http2ClientStream(VertxClientHandler handler,
HttpClientRequestBase req,
Http2Stream stream) throws Http2Exception {
this.handler = handler;
this.context = handler.context;
this.req = req;
this.handlerCtx = handlerCtx;
this.conn = conn;
this.handlerCtx = handler.handlerCtx;
this.conn = handler.connection();
this.stream = stream;
this.encoder = encoder;
this.encoder = handler.encoder();
}

void handleHeaders(Http2Headers headers, boolean end) {
Expand Down Expand Up @@ -279,9 +279,14 @@ public void reportBytesRead(long s) {
public NetSocket createNetSocket() {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection connection() {
return handler;
}
}

class VertxClientHandler extends Http2ConnectionHandler implements Http2FrameListener {
class VertxClientHandler extends Http2ConnectionHandler implements Http2FrameListener, HttpConnection {

private final ChannelHandlerContext handlerCtx;
private final ContextImpl context;
Expand All @@ -306,6 +311,66 @@ public VertxClientHandler(
this.context = context;
}

@Override
public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData, Handler<Void> completionHandler) {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection shutdown() {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection shutdown(long timeout) {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection closeHandler(Handler<Void> handler) {
throw new UnsupportedOperationException();
}

@Override
public io.vertx.core.http.Http2Settings settings() {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection updateSettings(io.vertx.core.http.Http2Settings settings) {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection updateSettings(io.vertx.core.http.Http2Settings settings, Handler<AsyncResult<Void>> completionHandler) {
throw new UnsupportedOperationException();
}

@Override
public io.vertx.core.http.Http2Settings remoteSettings() {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection remoteSettingsHandler(Handler<io.vertx.core.http.Http2Settings> handler) {
throw new UnsupportedOperationException();
}

@Override
public Handler<io.vertx.core.http.Http2Settings> remoteSettingsHandler() {
throw new UnsupportedOperationException();
}

@Override
public HttpConnection exceptionHandler(Handler<Throwable> handler) {
throw new UnsupportedOperationException();
}

@Override
public Handler<Throwable> exceptionHandler() {
throw new UnsupportedOperationException();
}

void handle(Handler<HttpClientStream> handler, HttpClientRequestImpl req) {
Http2ClientStream stream = createStream(req);
handler.handle(stream);
Expand All @@ -315,7 +380,7 @@ Http2ClientStream createStream(HttpClientRequestBase req) {
try {
Http2Connection conn = connection();
Http2Stream stream = conn.local().createStream(conn.local().incrementAndGetNextStreamId(), false);
Http2ClientStream clientStream = new Http2ClientStream(req, stream, context, handlerCtx, conn, encoder());
Http2ClientStream clientStream = new Http2ClientStream(this, req, stream);
streams.put(clientStream.stream.id(), clientStream);
return clientStream;
} catch (Http2Exception e) {
Expand Down Expand Up @@ -374,7 +439,7 @@ public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promi
String host = headers.authority() != null ? headers.authority().toString() : null;
MultiMap headersMap = new Http2HeadersAdaptor(headers);
Http2Stream promisedStream = connection().stream(promisedStreamId);
HttpClientRequestPushPromise promisedReq = new HttpClientRequestPushPromise(promisedStream, context, handlerCtx, connection(), encoder(), client, method, uri, host, headersMap);
HttpClientRequestPushPromise promisedReq = new HttpClientRequestPushPromise(this, promisedStream, client, method, uri, host, headersMap);
streams.put(promisedStreamId, promisedReq.getStream());
stream.handlePushPromise(promisedReq);
}
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientRequestImpl.java
Expand Up @@ -25,6 +25,7 @@
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.NetSocket;
Expand Down Expand Up @@ -352,6 +353,16 @@ public void reset(long code) {
}
}

@Override
public HttpConnection connection() {
synchronized (getLock()) {
if (conn == null) {
throw new IllegalStateException("Not yet connected");
}
return conn.connection();
}
}

void handleDrained() {
synchronized (getLock()) {
if (!completed && drainHandler != null) {
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.impl.ContextImpl;

Expand All @@ -35,6 +36,7 @@
*/
class HttpClientRequestPushPromise extends HttpClientRequestBase {

private final Http2Pool.VertxClientHandler handler;
private final Http2Pool.Http2ClientStream clientStream;
private final HttpMethod method;
private final String uri;
Expand All @@ -43,18 +45,16 @@ class HttpClientRequestPushPromise extends HttpClientRequestBase {
private Handler<HttpClientResponse> respHandler;

public HttpClientRequestPushPromise(
Http2Pool.VertxClientHandler handler,
Http2Stream clientStream,
ContextImpl context,
ChannelHandlerContext handlerCtx,
Http2Connection conn,
Http2ConnectionEncoder encoder,
HttpClientImpl client,
HttpMethod method,
String uri,
String host,
MultiMap headers) throws Http2Exception {
super(client);
this.clientStream = new Http2Pool.Http2ClientStream(this, clientStream, context, handlerCtx, conn, encoder);
this.handler = handler;
this.clientStream = new Http2Pool.Http2ClientStream(handler, this, clientStream);
this.method = method;
this.uri = uri;
this.host = host;
Expand Down Expand Up @@ -91,6 +91,11 @@ public HttpClientRequest handler(Handler<HttpClientResponse> handler) {
}
}

@Override
public HttpConnection connection() {
return handler;
}

@Override
public void reset(long code) {
clientStream.reset(code);
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/vertx/core/http/impl/HttpClientStream.java
Expand Up @@ -19,6 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.net.NetSocket;

Expand Down Expand Up @@ -49,4 +50,5 @@ interface HttpClientStream {
void reportBytesRead(long s);

NetSocket createNetSocket();
HttpConnection connection();
}

0 comments on commit f9e34f7

Please sign in to comment.