Skip to content

Commit

Permalink
Create the connection ahead instead of lazily
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Jun 25, 2017
1 parent 13b338e commit 53bb58f
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 73 deletions.
80 changes: 30 additions & 50 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -508,10 +508,6 @@ public SSLHelper getSslHelper() {
return sslHelper;
}

void removeChannel(Channel channel) {
connectionMap.remove(channel);
}

private void applyConnectionOptions(ServerBootstrap bootstrap) {
bootstrap.childOption(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
if (options.getSendBufferSize() != -1) {
Expand Down Expand Up @@ -623,11 +619,7 @@ protected void doMessageReceived(ServerConnection connection, ChannelHandlerCont
}
} else {
//HTTP request
if (conn == null) {
createConnAndHandle(ctx, ch, msg, null);
} else {
conn.handleMessage(msg);
}
conn.handleMessage(msg);
}
} else if (msg instanceof WebSocketFrameInternal) {
//Websocket frame
Expand All @@ -636,9 +628,7 @@ protected void doMessageReceived(ServerConnection connection, ChannelHandlerCont
case BINARY:
case CONTINUATION:
case TEXT:
if (conn != null) {
conn.handleMessage(msg);
}
conn.handleMessage(msg);
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
Expand Down Expand Up @@ -668,9 +658,7 @@ protected void doMessageReceived(ServerConnection connection, ChannelHandlerCont
return;
}
}
if (conn != null) {
conn.handleMessage(msg);
}
conn.handleMessage(msg);
} else {
throw new IllegalStateException("Invalid message " + msg);
}
Expand All @@ -684,36 +672,37 @@ public ServerHandler(Channel ch) {
}

@Override
protected void doMessageReceived(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
if (conn == null) {
createConnAndHandle(ctx, ch, msg, null);
} else {
conn.handleMessage(msg);
}
}

protected void createConnAndHandle(ChannelHandlerContext ctx, Channel ch, Object msg, WebSocketServerHandshaker shake) {
HandlerHolder<Handlers> reqHandler = reqHandlerManager.chooseHandler(ch.eventLoop());
if (reqHandler != null) {

// Put in the connection map before executeFromIO
ServerConnection conn = new ServerConnection(vertx, HttpServerImpl.this, ch, reqHandler.context, serverOrigin, shake, metrics);
conn.requestHandler(reqHandler.handler.requesthHandler);
this.conn = conn;
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
HandlerHolder<Handlers> holder = reqHandlerManager.chooseHandler(ch.eventLoop());
if (holder != null) {
conn = new ServerConnection(vertx, HttpServerImpl.this, ch, holder.context, serverOrigin, metrics);
conn.requestHandler(holder.handler.requesthHandler);
connectionMap.put(ch, conn);
reqHandler.context.executeFromIO(() -> {
holder.context.executeFromIO(() -> {
if (metrics != null) {
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
Handler<HttpConnection> connHandler = reqHandler.handler.connectionHandler;
Handler<HttpConnection> connHandler = holder.handler.connectionHandler;
if (connHandler != null) {
connHandler.handle(conn);
}
conn.handleMessage(msg);
});
} else {
// ?
}
}

@Override
protected void channelRead(ServerConnection connection, ContextImpl context, ChannelHandlerContext chctx, Object msg) throws Exception {
context.executeFromIO(() -> doMessageReceived(conn, chctx, msg));
}

@Override
protected void doMessageReceived(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
conn.handleMessage(msg);
}

protected void handshake(FullHttpRequest request, Channel ch, ChannelHandlerContext ctx) throws Exception {

WebSocketServerHandshaker shake = createHandshaker(ch, request);
Expand All @@ -723,7 +712,7 @@ protected void handshake(FullHttpRequest request, Channel ch, ChannelHandlerCont
HandlerHolder<Handlers> wsHandler = reqHandlerManager.chooseHandler(ch.eventLoop());

if (wsHandler == null || wsHandler.handler.wsHandler == null) {
createConnAndHandle(ctx, ch, request, shake);
conn.handleMessage(request);
} else {

wsHandler.context.executeFromIO(() -> {
Expand All @@ -734,33 +723,28 @@ protected void handshake(FullHttpRequest request, Channel ch, ChannelHandlerCont
throw new IllegalArgumentException("Invalid uri " + request.getUri()); //Should never happen
}

ServerConnection wsConn = new ServerConnection(vertx, HttpServerImpl.this, ch, wsHandler.context,
serverOrigin, shake, metrics);
if (metrics != null) {
wsConn.metric(metrics.connected(wsConn.remoteAddress(), wsConn.remoteName()));
conn.metric(metrics.connected(conn.remoteAddress(), conn.remoteName()));
}
wsConn.wsHandler(wsHandler.handler.wsHandler);
conn.wsHandler(shake, wsHandler.handler.wsHandler);

Runnable connectRunnable = () -> {
VertxHttpHandler<ServerConnection> handler = ch.pipeline().get(VertxHttpHandler.class);
handler.conn = wsConn;
connectionMap.put(ch, wsConn);
try {
shake.handshake(ch, request);
} catch (WebSocketHandshakeException e) {
wsConn.handleException(e);
conn.handleException(e);
} catch (Exception e) {
log.error("Failed to generate shake response", e);
}
};

ServerWebSocketImpl ws = new ServerWebSocketImpl(vertx, theURI.toString(), theURI.getPath(),
theURI.getQuery(), new HeadersAdaptor(request.headers()), wsConn, shake.version() != WebSocketVersion.V00,
theURI.getQuery(), new HeadersAdaptor(request.headers()), conn, shake.version() != WebSocketVersion.V00,
connectRunnable, options.getMaxWebsocketFrameSize(), options().getMaxWebsocketMessageSize());
if (METRICS_ENABLED && metrics != null) {
ws.setMetric(metrics.connected(wsConn.metric(), ws));
ws.setMetric(metrics.connected(conn.metric(), ws));
}
wsConn.handleWebsocketConnect(ws);
conn.handleWebsocketConnect(ws);
if (!ws.isRejected()) {
ChannelHandler handler = ctx.pipeline().get(HttpChunkContentCompressor.class);
if (handler != null) {
Expand Down Expand Up @@ -855,10 +839,6 @@ HttpServerOptions options() {
return options;
}

Map<Channel, ServerConnection> connectionMap() {
return connectionMap;
}

/*
Needs to be protected using the HttpServerImpl monitor as that protects the listening variable
In practice synchronized overhead should be close to zero assuming most access is from the same thread due
Expand Down
25 changes: 15 additions & 10 deletions src/main/java/io/vertx/core/http/impl/ServerConnection.java
Expand Up @@ -113,12 +113,10 @@ class ServerConnection extends ConnectionBase implements HttpConnection {
// queuing == true <=> (paused || (pendingResponse != null && msg instanceof HttpRequest) || !pending.isEmpty())
private boolean queueing;

ServerConnection(VertxInternal vertx, HttpServerImpl server, Channel channel, ContextImpl context, String serverOrigin,
WebSocketServerHandshaker handshaker, HttpServerMetrics metrics) {
ServerConnection(VertxInternal vertx, HttpServerImpl server, Channel channel, ContextImpl context, String serverOrigin, HttpServerMetrics metrics) {
super(vertx, channel, context);
this.serverOrigin = serverOrigin;
this.server = server;
this.handshaker = handshaker;
this.metrics = metrics;
}

Expand Down Expand Up @@ -188,7 +186,8 @@ synchronized void requestHandler(Handler<HttpServerRequest> handler) {
this.requestHandler = handler;
}

synchronized void wsHandler(Handler<ServerWebSocket> handler) {
synchronized void wsHandler(WebSocketServerHandshaker handshaker, Handler<ServerWebSocket> handler) {
this.handshaker = handshaker;
this.wsHandler = handler;
}

Expand Down Expand Up @@ -240,7 +239,6 @@ ServerWebSocket upgrade(HttpServerRequest request, HttpRequest nettyReq) {
// remove compressor as its not needed anymore once connection was upgraded to websockets
channel.pipeline().remove(handler);
}
server.connectionMap().put(channel, this);
return ws;
}

Expand Down Expand Up @@ -273,14 +271,14 @@ NetSocket createNetSocket() {
@Override
public void exceptionCaught(ChannelHandlerContext chctx, Throwable t) throws Exception {
// remove from the real mapping
server.removeChannel(channel);
connectionMap.remove(channel);
super.exceptionCaught(chctx, t);
}

@Override
public void channelInactive(ChannelHandlerContext chctx) throws Exception {
// remove from the real mapping
server.removeChannel(channel);
connectionMap.remove(channel);
super.channelInactive(chctx);
}

Expand Down Expand Up @@ -427,10 +425,17 @@ private void handleError(HttpObject obj) {
}
HttpResponseStatus status = causeMsg.startsWith("An HTTP line is larger than") ? HttpResponseStatus.REQUEST_URI_TOO_LONG : HttpResponseStatus.BAD_REQUEST;
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(version, status);
writeToChannel(resp);
ChannelFuture fut = writeToChannel(resp);
fut.addListener(res -> {
if (res.isSuccess()) {
// That will close the connection as it is considered as unusable
channel.pipeline().fireExceptionCaught(result.cause());
}
});
} else {
// That will close the connection as it is considered as unusable
channel.pipeline().fireExceptionCaught(result.cause());
}
// That will close the connection as it is considered as unusable
channel.pipeline().fireExceptionCaught(result.cause());
}

private void processMessage(Object msg) {
Expand Down
14 changes: 1 addition & 13 deletions src/main/java/io/vertx/core/http/impl/VertxHttpHandler.java
Expand Up @@ -60,24 +60,12 @@ protected C getConnection() {
@Override
protected C removeConnection() {
connectionMap.remove(ch);
C conn = this.conn;
this.conn = null;
return conn;
}

@Override
protected void channelRead(final C connection, final ContextImpl context, final ChannelHandlerContext chctx, final Object msg) throws Exception {
if (connection != null) {
context.executeFromIO(() -> doMessageReceived(connection, chctx, msg));
} else {
// We execute this directly as we don't have a context yet, the context will have to be set manually
// inside doMessageReceived();
try {
doMessageReceived(null, chctx, msg);
} catch (Throwable t) {
chctx.pipeline().fireExceptionCaught(t);
}
}
context.executeFromIO(() -> doMessageReceived(connection, chctx, msg));
}

@Override
Expand Down

0 comments on commit 53bb58f

Please sign in to comment.