Skip to content

Commit

Permalink
Refactor to provide a ChannelHandlerContext for ConnectionBase instea…
Browse files Browse the repository at this point in the history
…d of a Channel
  • Loading branch information
vietj committed Jun 25, 2017
1 parent 842eb94 commit c34e106
Show file tree
Hide file tree
Showing 21 changed files with 223 additions and 122 deletions.
Expand Up @@ -37,7 +37,7 @@ final class DatagramServerHandler extends VertxHandler<DatagramSocketImpl.Connec

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
conn = socket.createConnection();
conn = socket.createConnection(ctx);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.DatagramChannel;
Expand Down Expand Up @@ -369,13 +370,13 @@ protected void finalize() throws Throwable {
super.finalize();
}

Connection createConnection() {
return new Connection(context.owner(), channel, context);
Connection createConnection(ChannelHandlerContext chctx) {
return new Connection(context.owner(), chctx, context);
}

class Connection extends ConnectionBase {

public Connection(VertxInternal vertx, Channel channel, ContextImpl context) {
public Connection(VertxInternal vertx, ChannelHandlerContext channel, ContextImpl context) {
super(vertx, channel, context);
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ClientConnection.java
Expand Up @@ -90,7 +90,7 @@ class ClientConnection extends ConnectionBase implements HttpClientConnection, H
private boolean paused;
private Buffer pausedChunk;

ClientConnection(HttpVersion version, HttpClientImpl client, Object endpointMetric, Channel channel, boolean ssl, String host,
ClientConnection(HttpVersion version, HttpClientImpl client, Object endpointMetric, ChannelHandlerContext channel, boolean ssl, String host,
int port, ContextImpl context, Http1xPool pool, HttpClientMetrics metrics) {
super(client.getVertx(), channel, context);
this.client = client;
Expand Down Expand Up @@ -577,7 +577,7 @@ public synchronized void close() {

public NetSocket createNetSocket() {
// connection was upgraded to raw TCP socket
NetSocketImpl socket = new NetSocketImpl(vertx, channel, context, client.getSslHelper(), metrics);
NetSocketImpl socket = new NetSocketImpl(vertx, chctx, context, client.getSslHelper(), metrics);
socket.metric(metric());
Map<Channel, NetSocketImpl> connectionMap = new HashMap<>(1);
connectionMap.put(channel, socket);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/vertx/core/http/impl/ClientHandler.java
Expand Up @@ -39,12 +39,22 @@ class ClientHandler extends VertxHttpHandler<ClientConnection> {

private boolean closeFrameSent;
private ContextImpl context;
private ChannelHandlerContext chctx;

public ClientHandler(Channel ch, ContextImpl context, Map<Channel, ClientConnection> connectionMap) {
super(connectionMap, ch);
this.context = context;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
chctx = ctx;
}

public ChannelHandlerContext context() {
return chctx;
}

@Override
protected ContextImpl getContext(ClientConnection connection) {
return context;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/vertx/core/http/impl/Http1xPool.java
Expand Up @@ -117,14 +117,15 @@ void responseEnded(ClientConnection conn, boolean close) {
}

void createConn(HttpVersion version, ContextImpl context, int port, String host, Channel ch, Waiter waiter) {
ClientConnection conn = new ClientConnection(version, client, queue.metric, ch,
// Ugly : improve this
ClientHandler handler = ch.pipeline().get(ClientHandler.class);
ClientConnection conn = new ClientConnection(version, client, queue.metric, handler.context(),
ssl, host, port, context, this, metrics);
if (metrics != null) {
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(metric);
metrics.endpointConnected(queue.metric, metric);
}
ClientHandler handler = ch.pipeline().get(ClientHandler.class);
handler.conn = conn;
synchronized (queue) {
allConnections.add(conn);
Expand Down
Expand Up @@ -18,7 +18,6 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
Expand All @@ -41,7 +40,6 @@
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.net.NetSocket;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.metrics.NetworkMetrics;

import java.util.Map;

Expand All @@ -60,10 +58,9 @@ class Http2ClientConnection extends Http2ConnectionBase implements HttpClientCon
public Http2ClientConnection(Http2Pool http2Pool,
Object queueMetric,
ContextImpl context,
Channel channel,
VertxHttp2ConnectionHandler connHandler,
HttpClientMetrics metrics) {
super(channel, context, connHandler);
super(context, connHandler);
this.http2Pool = http2Pool;
this.metrics = metrics;
this.queueMetric = queueMetric;
Expand Down
10 changes: 3 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2ConnectionBase.java
Expand Up @@ -92,18 +92,14 @@ static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
private boolean closed;
private int windowSize;

public Http2ConnectionBase(Channel channel, ContextImpl context, VertxHttp2ConnectionHandler handler) {
super((VertxInternal) context.owner(), channel, context);
this.channel = channel;
public Http2ConnectionBase(ContextImpl context, VertxHttp2ConnectionHandler handler) {
super(context.owner(), handler.context(), context);
this.channel = handler.context().channel();
this.handlerContext = channel.pipeline().context(handler);
this.handler = handler;
this.windowSize = handler.connection().local().flowController().windowSize(handler.connection().connectionStream());
}

void setHandlerContext(ChannelHandlerContext handlerContext) {
this.handlerContext = handlerContext;
}

VertxInternal vertx() {
return vertx;
}
Expand Down
13 changes: 3 additions & 10 deletions src/main/java/io/vertx/core/http/impl/Http2Pool.java
Expand Up @@ -19,17 +19,14 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.impl.ContextImpl;
import io.vertx.core.spi.metrics.HttpClientMetrics;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

/**
Expand Down Expand Up @@ -85,15 +82,15 @@ public Http2ClientConnection pollConnection() {
}

void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade) throws Http2Exception {
ChannelPipeline p = ch.pipeline();
synchronized (queue) {
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>()
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>(ch)
.connectionMap(connectionMap)
.server(false)
.upgrade(upgrade)
.useCompression(client.getOptions().isTryUseCompression())
.initialSettings(client.getOptions().getInitialSettings())
.connectionFactory(connHandler -> {
Http2ClientConnection conn = new Http2ClientConnection(Http2Pool.this, queue.metric, context, ch, connHandler, metrics);
Http2ClientConnection conn = new Http2ClientConnection(Http2Pool.this, queue.metric, context, connHandler, metrics);
if (metrics != null) {
Object metric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(metric);
Expand All @@ -102,14 +99,10 @@ void createConn(ContextImpl context, Channel ch, Waiter waiter, boolean upgrade)
})
.logEnabled(logEnabled)
.build();
if (upgrade) {
handler.onHttpClientUpgrade();
}
Http2ClientConnection conn = handler.connection;
if (metrics != null) {
metrics.endpointConnected(queue.metric, conn.metric());
}
p.addLast(handler);
allConnections.add(conn);
if (windowSize > 0) {
conn.setWindowSize(windowSize);
Expand Down
Expand Up @@ -16,7 +16,6 @@

package io.vertx.core.http.impl;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
Expand Down Expand Up @@ -58,14 +57,13 @@ public class Http2ServerConnection extends Http2ConnectionBase {
private final ArrayDeque<Push> pendingPushes = new ArrayDeque<>(8);

Http2ServerConnection(
Channel channel,
ContextImpl context,
String serverOrigin,
VertxHttp2ConnectionHandler connHandler,
HttpServerOptions options,
Handler<HttpServerRequest> requestHandler,
HttpServerMetrics metrics) {
super(channel, context, connHandler);
super(context, connHandler);

this.options = options;
this.serverOrigin = serverOrigin;
Expand Down
54 changes: 29 additions & 25 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -341,19 +341,22 @@ public HttpServerImpl setConnectionExceptionHandler(Handler<Throwable> connectio
return this;
}

private VertxHttp2ConnectionHandler<Http2ServerConnection> createHttp2Handler(HandlerHolder<Handlers> holder, Channel ch) {
return new VertxHttp2ConnectionHandlerBuilder<Http2ServerConnection>()
private VertxHttp2ConnectionHandler<Http2ServerConnection> setHandler(HandlerHolder<Handlers> holder, Channel ch) {
return new VertxHttp2ConnectionHandlerBuilder<Http2ServerConnection>(ch)
.connectionMap(connectionMap2)
.server(true)
.useCompression(options.isCompressionSupported())
.useDecompression(options.isDecompressionSupported())
.compressionLevel(options.getCompressionLevel())
.initialSettings(options.getInitialSettings())
.connectionFactory(connHandler -> {
Http2ServerConnection conn = new Http2ServerConnection(ch, holder.context, serverOrigin, connHandler, options, holder.handler.requesthHandler, metrics);
Http2ServerConnection conn = new Http2ServerConnection(holder.context, serverOrigin, connHandler, options, holder.handler.requesthHandler, metrics);
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
return conn;
})
.logEnabled(logEnabled)
Expand Down Expand Up @@ -404,41 +407,30 @@ private void configureHttp1(ChannelPipeline pipeline) {
pipeline.addLast("h2c", new Http2UpgradeHandler());
}
HandlerHolder<Handlers> holder = reqHandlerManager.chooseHandler(pipeline.channel().eventLoop());
ServerConnection conn = new ServerConnection(vertx,
sslHelper,
options,
pipeline.channel(),
holder.context,
serverOrigin,
metrics);
if (DISABLE_WEBSOCKETS) {
// As a performance optimisation you can set a system property to disable websockets altogether which avoids
// some casting and a header check
pipeline.addLast("handler", new ServerHandler(connectionMap, pipeline.channel(), holder, conn, metrics));
pipeline.addLast("handler", new ServerHandler(sslHelper, options, serverOrigin, connectionMap, pipeline.channel(), holder, metrics));
} else {
pipeline.addLast("handler", new ServerHandleWithWebSockets(pipeline.channel(), holder, conn, metrics));
pipeline.addLast("handler", new ServerHandleWithWebSockets(sslHelper, options, serverOrigin, pipeline.channel(), holder, metrics));
}
}

public void handleHttp2(Channel ch) {
HandlerHolder<Handlers> holder = reqHandlerManager.chooseHandler(ch.eventLoop());
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = createHttp2Handler(holder, ch);
configureHttp2(ch.pipeline(), handler);
configureHttp2(ch.pipeline());
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = setHandler(holder, ch);
if (holder.handler.connectionHandler != null) {
holder.context.executeFromIO(() -> {
holder.handler.connectionHandler.handle(handler.connection);
});
}
}

public void configureHttp2(ChannelPipeline pipeline, VertxHttp2ConnectionHandler<Http2ServerConnection> handler) {
public void configureHttp2(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout()));
}
pipeline.addLast("handler", handler);
if (options.getHttp2ConnectionWindowSize() > 0) {
handler.connection.setWindowSize(options.getHttp2ConnectionWindowSize());
}
}

@Override
Expand Down Expand Up @@ -589,8 +581,8 @@ public class ServerHandleWithWebSockets extends ServerHandler {
private boolean closeFrameSent;
private FullHttpRequest wsRequest;

public ServerHandleWithWebSockets(Channel ch, HandlerHolder<Handlers> holder, ServerConnection conn, HttpServerMetrics metrics) {
super(HttpServerImpl.this.connectionMap, ch, holder, conn, metrics);
public ServerHandleWithWebSockets(SSLHelper sslHelper, HttpServerOptions options, String serverOrigin, Channel ch, HandlerHolder<Handlers> holder, HttpServerMetrics metrics) {
super(sslHelper, options, serverOrigin, HttpServerImpl.this.connectionMap, ch, holder, metrics);
}

@Override
Expand Down Expand Up @@ -729,20 +721,32 @@ protected void handshake(FullHttpRequest request, Channel ch, ChannelHandlerCont

public static class ServerHandler extends VertxHttpHandler<ServerConnection> {

private final SSLHelper sslHelper;
private final HttpServerOptions options;
private final String serverOrigin;
private final HttpServerMetrics metrics;
private final HandlerHolder<Handlers> holder;

public ServerHandler(Map<Channel, ServerConnection> connectionMap, Channel ch, HandlerHolder<Handlers> holder, ServerConnection conn, HttpServerMetrics metrics) {
public ServerHandler(SSLHelper sslHelper, HttpServerOptions options, String serverOrigin, Map<Channel, ServerConnection> connectionMap, Channel ch, HandlerHolder<Handlers> holder, HttpServerMetrics metrics) {
super(connectionMap, ch);

this.holder = holder;
this.conn = conn;
this.metrics = metrics;
this.sslHelper = sslHelper;
this.options = options;
this.serverOrigin = serverOrigin;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
conn = new ServerConnection(holder.context.owner(),
sslHelper,
options,
ctx,
holder.context,
serverOrigin,
metrics);
conn.requestHandler(holder.handler.requesthHandler);
connectionMap.put(ch, conn);
holder.context.executeFromIO(() -> {
Expand Down Expand Up @@ -1016,9 +1020,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}
}
try {
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = createHttp2Handler(reqHandler, ctx.channel());
configureHttp2(pipeline);
VertxHttp2ConnectionHandler<Http2ServerConnection> handler = setHandler(reqHandler, ctx.channel());
handler.onHttpServerUpgrade(settings);
configureHttp2(pipeline, handler);
return;
} catch (Http2Exception ignore) {
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/ServerConnection.java
Expand Up @@ -120,7 +120,7 @@ public class ServerConnection extends ConnectionBase implements HttpConnection {
public ServerConnection(VertxInternal vertx,
SSLHelper sslHelper,
HttpServerOptions options,
Channel channel,
ChannelHandlerContext channel,
ContextImpl context,
String serverOrigin,
HttpServerMetrics metrics) {
Expand Down Expand Up @@ -255,7 +255,7 @@ ServerWebSocket upgrade(HttpServerRequest request, HttpRequest nettyReq) {
}

NetSocket createNetSocket() {
NetSocketImpl socket = new NetSocketImpl(vertx, channel, context, sslHelper, metrics);
NetSocketImpl socket = new NetSocketImpl(vertx, chctx, context, sslHelper, metrics);
socket.metric(metric());
Map<Channel, NetSocketImpl> connectionMap = new HashMap<>(1);
connectionMap.put(channel, socket);
Expand Down

0 comments on commit c34e106

Please sign in to comment.