Skip to content

Commit

Permalink
Move the http2 pool in a separate class instead of an inner
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Mar 13, 2016
1 parent 728038f commit 08f0fe0
Show file tree
Hide file tree
Showing 2 changed files with 327 additions and 299 deletions.
303 changes: 4 additions & 299 deletions src/main/java/io/vertx/core/http/impl/ConnectionManager.java
Expand Up @@ -17,7 +17,6 @@
package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
Expand All @@ -31,41 +30,22 @@
import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.AbstractHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionDecoder;
import io.netty.handler.codec.http2.Http2ConnectionEncoder;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2EventAdapter;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Flags;
import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.collection.IntObjectHashMap;
import io.netty.util.collection.IntObjectMap;
import io.vertx.core.Context; import io.vertx.core.Context;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.http.ConnectionPoolTooBusyException; import io.vertx.core.http.ConnectionPoolTooBusyException;
import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion; import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.PartialPooledByteBufAllocator; import io.vertx.core.net.impl.PartialPooledByteBufAllocator;
import io.vertx.core.net.impl.SSLHelper; import io.vertx.core.net.impl.SSLHelper;


Expand All @@ -86,7 +66,7 @@
*/ */
public class ConnectionManager { public class ConnectionManager {


private static final Logger log = LoggerFactory.getLogger(ConnectionManager.class); static final Logger log = LoggerFactory.getLogger(ConnectionManager.class);


private final Map<Channel, ClientConnection> connectionMap = new ConcurrentHashMap<>(); private final Map<Channel, ClientConnection> connectionMap = new ConcurrentHashMap<>();
private final VertxInternal vertx; private final VertxInternal vertx;
Expand Down Expand Up @@ -167,7 +147,7 @@ public int hashCode() {
} }
} }


private static class Waiter { static class Waiter {
final HttpClientRequestImpl req; final HttpClientRequestImpl req;
final Handler<HttpClientStream> handler; final Handler<HttpClientStream> handler;
final Handler<Throwable> connectionExceptionHandler; final Handler<Throwable> connectionExceptionHandler;
Expand Down Expand Up @@ -240,7 +220,7 @@ private void createNewConnection(
}, connectionExceptionHandler, context); }, connectionExceptionHandler, context);
} }


private Waiter getNextWaiter() { Waiter getNextWaiter() {
// See if there are any non-canceled waiters in the queue // See if there are any non-canceled waiters in the queue
Waiter waiter = waiters.poll(); Waiter waiter = waiters.poll();
while (waiter != null && waiter.canceled.getAsBoolean()) { while (waiter != null && waiter.canceled.getAsBoolean()) {
Expand Down Expand Up @@ -388,7 +368,7 @@ private void connectionFailed(ContextImpl context, Channel ch, Handler<Throwable
} }
} }


abstract class Pool { static abstract class Pool {


public final int maxSockets; public final int maxSockets;


Expand Down Expand Up @@ -582,281 +562,6 @@ protected void doMessageReceived(ClientConnection conn, ChannelHandlerContext ct
} }
} }


public class Http2Pool extends Pool {

final ConnQueue queue;
private VertxClientHandler clientHandler;

public Http2Pool(ConnQueue queue) {
super(1);
this.queue = queue;
}

public boolean getConnection(HttpClientRequestImpl req, Handler<HttpClientStream> handler, ContextImpl context) {
if (clientHandler != null) {
if (context == null) {
context = clientHandler.context;
} else if (context != clientHandler.context) {
log.warn("Reusing a connection with a different context: an HttpClient is probably shared between different Verticles");
}
context.runOnContext(v -> {
clientHandler.handle(handler, req);
});
return true;
} else {
return false;
}
}

private void createConn(ChannelHandlerContext handlerCtx, ContextImpl context, int port, String host, Channel ch, HttpClientRequestImpl req, Handler<HttpClientStream> connectHandler,
Handler<Throwable> exceptionHandler) {
ChannelPipeline p = ch.pipeline();
Http2Connection connection = new DefaultHttp2Connection(false);
VertxClientHandlerBuilder clientHandlerBuilder = new VertxClientHandlerBuilder(handlerCtx, context);
synchronized (queue) {
VertxClientHandler handler = clientHandlerBuilder.build(connection);
handler.decoder().frameListener(handler);
clientHandler = handler;
p.addLast(handler);
handler.handle(connectHandler, req);
// Todo : limit according to the max concurrency of the stream
Waiter waiter;
while ((waiter = queue.getNextWaiter()) != null) {
handler.handle(waiter.handler, waiter.req);
}
}
}

@Override
void closeAllConnections() {
// todo
}
}

class Http2ClientStream implements HttpClientStream {

private final HttpClientRequestImpl req;
private final ChannelHandlerContext context;
private final Http2Connection conn;
private final int id;
private final Http2ConnectionEncoder encoder;
private HttpClientResponseImpl resp;

public Http2ClientStream(HttpClientRequestImpl req,
ChannelHandlerContext context,
Http2Connection conn,
Http2ConnectionEncoder encoder) {
this.req = req;
this.context = context;
this.conn = conn;
this.id = conn.local().incrementAndGetNextStreamId();
this.encoder = encoder;
}

void handleHeaders(Http2Headers headers, boolean end) {
resp = new HttpClientResponseImpl(
req,
this,
Integer.parseInt(headers.status().toString()),
"todo",
new Http2HeadersAdaptor(headers)
);
req.handleResponse(resp);
if (end) {
handleEnd();
}
}

void handleData(ByteBuf chunk, boolean end) {
if (chunk.isReadable()) {
Buffer buff = Buffer.buffer(chunk.slice());
resp.handleChunk(buff);
}
if (end) {
handleEnd();
}
}

private void handleEnd() {
// Should use an shared immutable object ?
resp.handleEnd(new CaseInsensitiveHeaders());
}

@Override
public void writeHead(HttpMethod method, String uri, MultiMap headers, boolean chunked) {
throw new UnsupportedOperationException();
}

@Override
public void writeHeadWithContent(HttpMethod method, String uri, MultiMap headers, boolean chunked, ByteBuf buf, boolean end) {
Http2Headers h = new DefaultHttp2Headers();
h.method(method.name());
h.path(uri);
h.scheme("https");
encoder.writeHeaders(context, id, h, 0, end, context.newPromise());
context.flush();
}
@Override
public void writeBuffer(ByteBuf buf, boolean end) {
throw new UnsupportedOperationException();
}
@Override
public String hostHeader() {
throw new UnsupportedOperationException();
}
@Override
public Context getContext() {
throw new UnsupportedOperationException();
}
@Override
public void doSetWriteQueueMaxSize(int size) {
throw new UnsupportedOperationException();
}
@Override
public boolean isNotWritable() {
throw new UnsupportedOperationException();
}
@Override
public void handleInterestedOpsChanged() {
throw new UnsupportedOperationException();
}
@Override
public void endRequest() {
}
@Override
public void doPause() {
throw new UnsupportedOperationException();
}
@Override
public void doResume() {
throw new UnsupportedOperationException();
}
@Override
public void reportBytesWritten(long numberOfBytes) {
}
@Override
public void reportBytesRead(long s) {
}
@Override
public NetSocket createNetSocket() {
throw new UnsupportedOperationException();
}
}

class VertxClientHandler extends Http2ConnectionHandler implements Http2FrameListener {

private final ChannelHandlerContext handlerCtx;
private final ContextImpl context;
private final IntObjectMap<Http2ClientStream> streams = new IntObjectHashMap<>();

public VertxClientHandler(
ChannelHandlerContext handlerCtx,
ContextImpl context,
Http2ConnectionDecoder decoder,
Http2ConnectionEncoder encoder,
Http2Settings initialSettings) {
super(decoder, encoder, initialSettings);
this.handlerCtx = handlerCtx;
this.context = context;
}

void handle(Handler<HttpClientStream> handler, HttpClientRequestImpl req) {
Http2ClientStream stream = createStream(req);
handler.handle(stream);
}

Http2ClientStream createStream(HttpClientRequestImpl req) {
Http2ClientStream stream = new Http2ClientStream(req, handlerCtx, connection(), encoder());
streams.put(stream.id, stream);
return stream;
}

@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = streams.get(streamId);
stream.handleData(data, endOfStream);
return data.readableBytes() + padding;
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding, boolean endOfStream) throws Http2Exception {
}

@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency, short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
Http2ClientStream stream = streams.get(streamId);
stream.handleHeaders(headers, endOfStream);
}

@Override
public void onPriorityRead(ChannelHandlerContext ctx, int streamId, int streamDependency, short weight, boolean exclusive) throws Http2Exception {
}

@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
}

@Override
public void onSettingsAckRead(ChannelHandlerContext ctx) throws Http2Exception {
}

@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings) throws Http2Exception {
}

@Override
public void onPingRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}

@Override
public void onPingAckRead(ChannelHandlerContext ctx, ByteBuf data) throws Http2Exception {
}

@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
}

@Override
public void onGoAwayRead(ChannelHandlerContext ctx, int lastStreamId, long errorCode, ByteBuf debugData) throws Http2Exception {
}

@Override
public void onWindowUpdateRead(ChannelHandlerContext ctx, int streamId, int windowSizeIncrement) throws Http2Exception {
}

@Override
public void onUnknownFrame(ChannelHandlerContext ctx, byte frameType, int streamId, Http2Flags flags, ByteBuf payload) throws Http2Exception {
}
}

class VertxClientHandlerBuilder extends AbstractHttp2ConnectionHandlerBuilder<VertxClientHandler, VertxClientHandlerBuilder> {

private final ChannelHandlerContext handlerCtx;
private final ContextImpl context;

public VertxClientHandlerBuilder(ChannelHandlerContext handlerCtx, ContextImpl context) {
this.handlerCtx = handlerCtx;
this.context = context;
}

@Override
protected VertxClientHandler build(Http2ConnectionDecoder decoder, Http2ConnectionEncoder encoder, Http2Settings initialSettings) throws Exception {
return new VertxClientHandler(handlerCtx, context, decoder, encoder, initialSettings);
}

public VertxClientHandler build(Http2Connection conn) {
connection(conn);
initialSettings(new Http2Settings());
frameListener(new Http2EventAdapter() {
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream) throws Http2Exception {
return super.onDataRead(ctx, streamId, data, padding, endOfStream);
}
});
return super.build();
}
}

void applyConnectionOptions(HttpClientOptions options, Bootstrap bootstrap) { void applyConnectionOptions(HttpClientOptions options, Bootstrap bootstrap) {
bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
if (options.getSendBufferSize() != -1) { if (options.getSendBufferSize() != -1) {
Expand Down

0 comments on commit 08f0fe0

Please sign in to comment.