Skip to content

Commit

Permalink
Update Netty cluster implementation to handle automatic reconnects.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 4, 2015
1 parent d64575a commit 52a7cd7
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 38 deletions.
Expand Up @@ -17,6 +17,8 @@


import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
Expand Down Expand Up @@ -48,12 +50,14 @@ public class NettyLocalMember extends ManagedLocalMember implements NettyMember{
private static final int TASK = 1; private static final int TASK = 1;
private static final int STATUS_FAILURE = 0; private static final int STATUS_FAILURE = 0;
private static final int STATUS_SUCCESS = 1; private static final int STATUS_SUCCESS = 1;
private static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true);
private static final ThreadLocal<ByteBufBuffer> BUFFER = new ThreadLocal<ByteBufBuffer>() { private static final ThreadLocal<ByteBufBuffer> BUFFER = new ThreadLocal<ByteBufBuffer>() {
@Override @Override
protected ByteBufBuffer initialValue() { protected ByteBufBuffer initialValue() {
return new ByteBufBuffer(); return new ByteBufBuffer();
} }
}; };

private final Map<Integer, HandlerHolder> handlers = new ConcurrentHashMap<>(); private final Map<Integer, HandlerHolder> handlers = new ConcurrentHashMap<>();
private final Map<String, Integer> hashMap = new HashMap<>(); private final Map<String, Integer> hashMap = new HashMap<>();
private final NettyMemberInfo info; private final NettyMemberInfo info;
Expand Down Expand Up @@ -161,6 +165,7 @@ public void initChannel(SocketChannel channel) throws Exception {
bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, ALLOCATOR);


bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);


Expand Down
Expand Up @@ -17,52 +17,59 @@


import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.LengthFieldPrepender;
import net.kuujo.copycat.Task; import net.kuujo.copycat.Task;
import net.kuujo.copycat.io.util.HashFunctions; import net.kuujo.copycat.io.util.HashFunctions;
import net.kuujo.copycat.util.ExecutionContext; import net.kuujo.copycat.util.ExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


/** /**
* Netty remote member. * Netty remote member.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class NettyRemoteMember extends ManagedRemoteMember implements NettyMember { public class NettyRemoteMember extends ManagedRemoteMember implements NettyMember {
private static final int RETRY_ATTEMPTS = 3; private static final Logger LOGGER = LoggerFactory.getLogger(NettyRemoteMember.class);
private static final long RECONNECT_INTERVAL = 1000; private static final long MAX_RECONNECT_INTERVAL = 60000;
private static final long INITIAL_RECONNECT_INTERVAL = 100;
private static final int MESSAGE = 0; private static final int MESSAGE = 0;
private static final int TASK = 1; private static final int TASK = 1;
private static final int STATUS_FAILURE = 0; private static final int STATUS_FAILURE = 0;
private static final int STATUS_SUCCESS = 1; private static final int STATUS_SUCCESS = 1;
private static ThreadLocal<ByteBufBuffer> BUFFER = new ThreadLocal<ByteBufBuffer>() { private static final ByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true);
private static final ThreadLocal<ByteBufBuffer> BUFFER = new ThreadLocal<ByteBufBuffer>() {
@Override @Override
protected ByteBufBuffer initialValue() { protected ByteBufBuffer initialValue() {
return new ByteBufBuffer(); return new ByteBufBuffer();
} }
}; };

private final NettyMemberInfo info; private final NettyMemberInfo info;
private EventLoopGroup eventLoopGroup; private EventLoopGroup eventLoopGroup;
private boolean eventLoopInitialized; private boolean eventLoopInitialized;
private Channel channel; private Channel channel;
private ChannelHandlerContext context; private ChannelHandlerContext context;
private final Map<String, Integer> hashMap = new HashMap<>(); private final Map<String, Integer> hashMap = new HashMap<>();
private final Map<Object, ContextualFuture> responseFutures = new HashMap<>(1024); private final Map<Object, ContextualFuture> responseFutures = new HashMap<>(1024);
private boolean connected; private final AtomicBoolean connecting = new AtomicBoolean();
private final AtomicBoolean connected = new AtomicBoolean();
private long requestId; private long requestId;
private CompletableFuture<RemoteMember> connectFuture;
private CompletableFuture<Void> closeFuture; private CompletableFuture<Void> closeFuture;
private ScheduledFuture<?> reconnectFuture; private ScheduledFuture<?> connectFuture;


NettyRemoteMember(NettyMemberInfo info, Type type) { NettyRemoteMember(NettyMemberInfo info, Type type) {
super(info, type); super(info, type);
Expand Down Expand Up @@ -153,28 +160,32 @@ public <T> CompletableFuture<T> submit(Task<T> task) {
} }


@Override @Override
public CompletableFuture<RemoteMember> connect() { public synchronized CompletableFuture<RemoteMember> connect() {
if (connected) if (connecting.compareAndSet(false, true))
return CompletableFuture.completedFuture(this); connect(INITIAL_RECONNECT_INTERVAL);
return CompletableFuture.completedFuture(this);
}


if (connectFuture == null) { /**
synchronized (this) { * Attempts to connect to the server.
if (connectFuture == null) { */
connectFuture = new CompletableFuture<>(); private synchronized void connect(long timeout) {
if (eventLoopGroup == null) { LOGGER.info("Connecting to {}...", info.address);
eventLoopGroup = new NioEventLoopGroup(1); doConnect(timeout, () -> connect(Math.min(timeout * 2, MAX_RECONNECT_INTERVAL)));
}
connect(RETRY_ATTEMPTS, RECONNECT_INTERVAL);
}
}
}
return connectFuture;
} }


/** /**
* Attempts to connect for the given number of attempts. * Attempts to reconnect to the server.
*/ */
private void connect(int attempts, long timeout) { private void reconnect(long timeout) {
LOGGER.info("Reconnecting to {}...", info.address);
doConnect(timeout, () -> reconnect(Math.min(timeout * 2, MAX_RECONNECT_INTERVAL)));
}

/**
* Attempts to connect to the server.
*/
private void doConnect(long timeout, Runnable reschedule) {
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup) bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
Expand All @@ -189,30 +200,28 @@ protected void initChannel(SocketChannel channel) throws Exception {


bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 15000); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.option(ChannelOption.ALLOCATOR, ALLOCATOR);


bootstrap.connect(info.address().getHostString(), info.address().getPort()).addListener((ChannelFutureListener) channelFuture -> { bootstrap.connect(info.address().getHostString(), info.address().getPort()).addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) { if (channelFuture.isSuccess()) {
channel = channelFuture.channel(); channel = channelFuture.channel();
connected = true; connecting.set(false);
connectFuture.complete(this); connected.set(true);
} else if (attempts > 0) { } else {
reconnectFuture = eventLoopGroup.schedule(() -> connect(attempts - 1, timeout * 2), timeout, TimeUnit.MILLISECONDS); connectFuture = eventLoopGroup.schedule(reschedule, timeout, TimeUnit.MILLISECONDS);
} else {
connectFuture.completeExceptionally(channelFuture.cause());
} }
}); });
} }


@Override @Override
public CompletableFuture<Void> close() { public CompletableFuture<Void> close() {
if (reconnectFuture != null) { if (connectFuture != null) {
reconnectFuture.cancel(false); connectFuture.cancel(false);
reconnectFuture = null; connectFuture = null;
} }


if (!connected) connecting.set(false);
return CompletableFuture.completedFuture(null);


if (closeFuture == null) { if (closeFuture == null) {
synchronized (this) { synchronized (this) {
Expand All @@ -221,7 +230,7 @@ public CompletableFuture<Void> close() {
if (channel != null) { if (channel != null) {
channel.close().addListener((ChannelFutureListener) channelFuture -> { channel.close().addListener((ChannelFutureListener) channelFuture -> {
channel = null; channel = null;
connected = false; connected.set(false);
if (!eventLoopInitialized && eventLoopGroup != null) { if (!eventLoopInitialized && eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
eventLoopGroup = null; eventLoopGroup = null;
Expand All @@ -233,7 +242,7 @@ public CompletableFuture<Void> close() {
} }
}); });
} else { } else {
connected = false; connected.set(false);
if (!eventLoopInitialized && eventLoopGroup != null) { if (!eventLoopInitialized && eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
eventLoopGroup = null; eventLoopGroup = null;
Expand Down Expand Up @@ -287,6 +296,17 @@ public void channelRead(ChannelHandlerContext context, Object message) {
} }
response.release(); response.release();
} }

@Override
public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
context.close();
connected.set(false);
channel = null;
NettyRemoteMember.this.context = null;
if (connecting.compareAndSet(false, true)) {
reconnect(INITIAL_RECONNECT_INTERVAL);
}
}
} }


} }

0 comments on commit 52a7cd7

Please sign in to comment.