Skip to content

Commit

Permalink
Store connection on channel to avoid hash lookup every time
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Jun 17, 2015
1 parent 9751091 commit 14b0d01
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 48 deletions.
Expand Up @@ -17,31 +17,40 @@


import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.DatagramPacket; import io.netty.channel.socket.DatagramPacket;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.net.impl.VertxHandler; import io.vertx.core.net.impl.VertxHandler;


import java.util.HashMap;

/** /**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a> * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/ */
final class DatagramServerHandler extends VertxHandler<DatagramSocketImpl> { final class DatagramServerHandler extends VertxHandler<DatagramSocketImpl> {
private final DatagramSocketImpl server;


DatagramServerHandler(DatagramSocketImpl server) { private final DatagramSocketImpl socket;
super(new HashMap<>());
this.server = server; DatagramServerHandler(DatagramSocketImpl socket) {
this.socket = socket;
} }


@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx); super.handlerAdded(ctx);
connectionMap.put(ctx.channel(), server);
} }


@Override
protected DatagramSocketImpl getConnection(Channel channel) {
return socket;
}

@Override
protected DatagramSocketImpl removeConnection(Channel channel) {
return socket;
}


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected void channelRead(final DatagramSocketImpl server, final ContextImpl context, ChannelHandlerContext chctx, final Object msg) throws Exception { protected void channelRead(final DatagramSocketImpl server, final ContextImpl context, ChannelHandlerContext chctx, final Object msg) throws Exception {
Expand Down
30 changes: 6 additions & 24 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Expand Up @@ -17,47 +17,29 @@
package io.vertx.core.http.impl; package io.vertx.core.http.impl;


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.*;
import io.netty.channel.ChannelFuture; import io.netty.handler.codec.http.*;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.Future; import io.vertx.core.Future;
import io.vertx.core.Handler; import io.vertx.core.Handler;
import io.vertx.core.MultiMap; import io.vertx.core.MultiMap;
import io.vertx.core.VertxException; import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer; import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient; import io.vertx.core.http.*;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod; import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.WebSocket;
import io.vertx.core.http.WebSocketStream;
import io.vertx.core.http.WebsocketVersion;
import io.vertx.core.http.impl.ws.WebSocketFrameImpl; import io.vertx.core.http.impl.ws.WebSocketFrameImpl;
import io.vertx.core.http.impl.ws.WebSocketFrameInternal; import io.vertx.core.http.impl.ws.WebSocketFrameInternal;
import io.vertx.core.impl.Closeable; import io.vertx.core.impl.Closeable;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;
import io.vertx.core.impl.VertxInternal; import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger; import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory; import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.net.impl.KeyStoreHelper; import io.vertx.core.net.impl.KeyStoreHelper;
import io.vertx.core.net.impl.PartialPooledByteBufAllocator; import io.vertx.core.net.impl.PartialPooledByteBufAllocator;
import io.vertx.core.net.impl.SSLHelper; import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider; import io.vertx.core.spi.metrics.MetricsProvider;


import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
Expand Down Expand Up @@ -647,7 +629,7 @@ private void internalConnect(ContextImpl context, int port, String host, Handler
Handler<Throwable> connectErrorHandler, ConnectionLifeCycleListener listener) { Handler<Throwable> connectErrorHandler, ConnectionLifeCycleListener listener) {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.eventLoop()); bootstrap.group(context.eventLoop());
bootstrap.channel(NioSocketChannel.class); bootstrap.channelFactory(new VertxNioSocketChannelFactory());
sslHelper.validate(vertx); sslHelper.validate(vertx);
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
@Override @Override
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Expand Up @@ -21,7 +21,6 @@
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
Expand Down Expand Up @@ -184,7 +183,7 @@ public synchronized HttpServer listen(int port, String host, Handler<AsyncResult
serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE); serverChannelGroup = new DefaultChannelGroup("vertx-acceptor-channels", GlobalEventExecutor.INSTANCE);
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(availableWorkers); bootstrap.group(availableWorkers);
bootstrap.channel(NioServerSocketChannel.class); bootstrap.channelFactory(new VertxNioServerChannelFactory());
applyConnectionOptions(bootstrap); applyConnectionOptions(bootstrap);
sslHelper.validate(vertx); sslHelper.validate(vertx);
bootstrap.childHandler(new ChannelInitializer<Channel>() { bootstrap.childHandler(new ChannelInitializer<Channel>() {
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/io/vertx/core/http/impl/VertxHttpHandler.java
Expand Up @@ -39,12 +39,39 @@
*/ */
public abstract class VertxHttpHandler<C extends ConnectionBase> extends VertxHandler<C> { public abstract class VertxHttpHandler<C extends ConnectionBase> extends VertxHandler<C> {


private static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) {
return safeBuffer(holder.content(), allocator);
}

protected Map<Channel, C> connectionMap;

protected VertxHttpHandler(Map<Channel, C> connectionMap) { protected VertxHttpHandler(Map<Channel, C> connectionMap) {
super(connectionMap); this.connectionMap = connectionMap;
} }


private static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocator) { @Override
return safeBuffer(holder.content(), allocator); protected C getConnection(Channel channel) {
@SuppressWarnings("unchecked")
VertxNioSocketChannel<C> vch = (VertxNioSocketChannel<C>)channel;
// As an optimisation we store the connection on the channel - this prevents a lookup every time
// an event from Netty arrives
if (vch.conn != null) {
return vch.conn;
} else {
C conn = connectionMap.get(channel);
if (conn != null) {
vch.conn = conn;
}
return conn;
}
}

@Override
protected C removeConnection(Channel channel) {
@SuppressWarnings("unchecked")
VertxNioSocketChannel<C> vch = (VertxNioSocketChannel<C>)channel;
vch.conn = null;
return connectionMap.remove(channel);
} }


@Override @Override
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright 2014 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.bootstrap.ChannelFactory;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxNioServerChannelFactory implements ChannelFactory<VertxNioServerSocketChannel> {

@Override
public VertxNioServerSocketChannel newChannel() {
return new VertxNioServerSocketChannel();
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2014 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;

import java.nio.channels.SocketChannel;
import java.util.List;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxNioServerSocketChannel extends NioServerSocketChannel {

private static final Logger log = LoggerFactory.getLogger(VertxNioServerSocketChannel.class);

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();

try {
if (ch != null) {
buf.add(new VertxNioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
log.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
log.warn("Failed to close a socket.", t2);
}
}

return 0;
}
}
42 changes: 42 additions & 0 deletions src/main/java/io/vertx/core/http/impl/VertxNioSocketChannel.java
@@ -0,0 +1,42 @@
/*
* Copyright 2014 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.channel.Channel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.nio.channels.SocketChannel;

/**
* We provide this class so we can store a reference to the connection on it.
*
* This means we don't have to do a lookup in the connectionMap for every message received which improves
* performance.
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxNioSocketChannel<C> extends NioSocketChannel {

public C conn;

public VertxNioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
}

public VertxNioSocketChannel() {
}
}
@@ -0,0 +1,30 @@
/*
* Copyright 2014 Red Hat, Inc.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*/

package io.vertx.core.http.impl;

import io.netty.bootstrap.ChannelFactory;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxNioSocketChannelFactory implements ChannelFactory<VertxNioSocketChannel> {

@Override
public VertxNioSocketChannel newChannel() {
return new VertxNioSocketChannel<>();
}
}
18 changes: 7 additions & 11 deletions src/main/java/io/vertx/core/net/impl/VertxHandler.java
Expand Up @@ -26,18 +26,14 @@
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import io.vertx.core.impl.ContextImpl; import io.vertx.core.impl.ContextImpl;


import java.util.Map;

/** /**
* @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a> * @author <a href="mailto:nmaurer@redhat.com">Norman Maurer</a>
*/ */
public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler { public abstract class VertxHandler<C extends ConnectionBase> extends ChannelDuplexHandler {


protected final Map<Channel, C> connectionMap; protected abstract C getConnection(Channel ch);


protected VertxHandler(Map<Channel, C> connectionMap) { protected abstract C removeConnection(Channel ch);
this.connectionMap = connectionMap;
}


protected ContextImpl getContext(C connection) { protected ContextImpl getContext(C connection) {
return connection.getContext(); return connection.getContext();
Expand Down Expand Up @@ -66,7 +62,7 @@ protected static ByteBuf safeBuffer(ByteBuf buf, ByteBufAllocator allocator) {
@Override @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel(); Channel ch = ctx.channel();
C conn = connectionMap.get(ch); C conn = getConnection(ch);
if (conn != null) { if (conn != null) {
ContextImpl context = getContext(conn); ContextImpl context = getContext(conn);
context.executeFromIO(conn::handleInterestedOpsChanged); context.executeFromIO(conn::handleInterestedOpsChanged);
Expand All @@ -77,7 +73,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio
public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) throws Exception { public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) throws Exception {
Channel ch = chctx.channel(); Channel ch = chctx.channel();
// Don't remove the connection at this point, or the handleClosed won't be called when channelInactive is called! // Don't remove the connection at this point, or the handleClosed won't be called when channelInactive is called!
C connection = connectionMap.get(ch); C connection = getConnection(ch);
if (connection != null) { if (connection != null) {
ContextImpl context = getContext(connection); ContextImpl context = getContext(connection);
context.executeFromIO(() -> { context.executeFromIO(() -> {
Expand All @@ -97,7 +93,7 @@ public void exceptionCaught(ChannelHandlerContext chctx, final Throwable t) thro
@Override @Override
public void channelInactive(ChannelHandlerContext chctx) throws Exception { public void channelInactive(ChannelHandlerContext chctx) throws Exception {
Channel ch = chctx.channel(); Channel ch = chctx.channel();
C connection = connectionMap.remove(ch); C connection = removeConnection(ch);
if (connection != null) { if (connection != null) {
ContextImpl context = getContext(connection); ContextImpl context = getContext(connection);
context.executeFromIO(connection::handleClosed); context.executeFromIO(connection::handleClosed);
Expand All @@ -106,7 +102,7 @@ public void channelInactive(ChannelHandlerContext chctx) throws Exception {


@Override @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
C conn = connectionMap.get(ctx.channel()); C conn = getConnection(ctx.channel());
if (conn != null) { if (conn != null) {
ContextImpl context = getContext(conn); ContextImpl context = getContext(conn);
context.executeFromIO(conn::endReadAndFlush); context.executeFromIO(conn::endReadAndFlush);
Expand All @@ -116,7 +112,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
@Override @Override
public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
Object message = safeObject(msg, chctx.alloc()); Object message = safeObject(msg, chctx.alloc());
C connection = connectionMap.get(chctx.channel()); C connection = getConnection(chctx.channel());


ContextImpl context; ContextImpl context;
if (connection != null) { if (connection != null) {
Expand Down

0 comments on commit 14b0d01

Please sign in to comment.