From 6c0f13bb703f6949177775da84cfc15f11f84179 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Mon, 15 Sep 2014 15:56:07 -0700 Subject: [PATCH] Back-pressure handler in NettyHelixIPCService --- .../helix/ipc/netty/NettyHelixIPCService.java | 59 ++++++++++++------- .../helix/ipc/benchmark/BenchmarkDriver.java | 14 +---- 2 files changed, 41 insertions(+), 32 deletions(-) diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java index 00d6157647..bf5161a688 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java @@ -109,7 +109,6 @@ public class NettyHelixIPCService implements HelixIPCService { private final Config config; private final AtomicBoolean isShutdown; private final Map> channelMap; - private final ConcurrentMap channelOpenTimes; private final MetricRegistry metricRegistry; private final ConcurrentMap callbacks; @@ -128,7 +127,6 @@ public NettyHelixIPCService(Config config) { this.config = config; this.isShutdown = new AtomicBoolean(true); this.channelMap = new HashMap>(); - this.channelOpenTimes = new ConcurrentHashMap(); this.metricRegistry = new MetricRegistry(); this.callbacks = new ConcurrentHashMap(); } @@ -175,6 +173,7 @@ protected void initChannel(SocketChannel socketChannel) throws Exception { protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast( new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true)); + socketChannel.pipeline().addLast(new BackPressureHandler()); } }); } @@ -215,17 +214,16 @@ public void send(HelixAddress destination, int messageType, UUID messageId, Byte // Pick the channel for this scope int idx = (Integer.MAX_VALUE & destination.getScope().hashCode()) % channels.size(); Channel channel = channels.get(idx); - if (channel == null || !channel.isOpen() || isExpired(channel)) { + if (channel == null || !channel.isOpen()) { synchronized (channelMap) { channel = channels.get(idx); - if (channel == null || !channel.isOpen() || isExpired(channel)) { + if (channel == null || !channel.isOpen()) { if (channel != null && channel.isOpen()) { channel.close(); } channel = clientBootstrap.connect(destination.getSocketAddress()).sync().channel(); channels.set(idx, channel); statChannelOpen.inc(); - channelOpenTimes.put(channel, System.currentTimeMillis()); } } } @@ -268,26 +266,54 @@ public void send(HelixAddress destination, int messageType, UUID messageId, Byte } // Send + BackPressureHandler backPressureHandler = channel.pipeline().get(BackPressureHandler.class); + backPressureHandler.waitUntilWritable(channel); + channel.writeAndFlush(fullByteBuf); + statTxMsg.mark(); statTxBytes.mark(fullByteBuf.readableBytes()); - channel.writeAndFlush(fullByteBuf); } catch (Exception e) { statError.inc(); throw new IllegalStateException("Could not send message to " + destination, e); } } - private boolean isExpired(Channel channel) { - Long channelOpenTime = channelOpenTimes.get(channel); - return channelOpenTime != null - && System.currentTimeMillis() - channelOpenTime >= config.getMaxChannelLifeMillis(); - } - @Override public void registerCallback(int messageType, HelixIPCCallback callback) { callbacks.put(messageType, callback); } + private class BackPressureHandler extends SimpleChannelInboundHandler { + private final Object sync = new Object(); + + BackPressureHandler() { + super(false); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ctx.fireChannelRead(msg); + } + + public void waitUntilWritable(Channel channel) throws InterruptedException { + synchronized (sync) { + while (channel.isOpen() && !channel.isWritable()) { + LOG.warn(channel + " is not writable, waiting until it is"); + sync.wait(); + } + } + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + synchronized (sync) { + if (ctx.channel().isWritable()) { + sync.notifyAll(); + } + } + } + } + @ChannelHandler.Sharable private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler { @@ -466,11 +492,6 @@ public Config setNumConnections(int numConnections) { return this; } - public Config setMaxChannelLifeMillis(long maxChannelLifeMillis) { - this.maxChannelLifeMillis = maxChannelLifeMillis; - return this; - } - public String getInstanceName() { return instanceName; } @@ -482,9 +503,5 @@ public int getPort() { public int getNumConnections() { return numConnections; } - - public long getMaxChannelLifeMillis() { - return maxChannelLifeMillis; - } } } diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java index 30b02ce30f..376e7aee48 100644 --- a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java +++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java @@ -64,20 +64,17 @@ public class BenchmarkDriver implements Runnable { private final AtomicBoolean isShutdown; private final byte[] messageBytes; private final int numConnections; - private final long maxChannelLifeMillis; private HelixIPCService ipcService; private String localhost; private Thread[] trafficThreads; - public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize, - int numConnections, long maxChannelLifeMillis) { + public BenchmarkDriver(int port, int numPartitions, int numThreads, int messageSize, int numConnections) { this.port = port; this.numPartitions = numPartitions; this.isShutdown = new AtomicBoolean(true); this.trafficThreads = new Thread[numThreads]; this.numConnections = numConnections; - this.maxChannelLifeMillis = maxChannelLifeMillis; StringBuilder sb = new StringBuilder(); for (int i = 0; i < messageSize; i++) { @@ -108,7 +105,7 @@ public void stopTraffic() { ipcService = new NettyHelixIPCService(new NettyHelixIPCService.Config() .setInstanceName(localhost + "_" + port).setPort(port) - .setNumConnections(numConnections).setMaxChannelLifeMillis(maxChannelLifeMillis)); + .setNumConnections(numConnections)); // Counts number of messages received, and ack them ipcService.registerCallback(MESSAGE_TYPE, new HelixIPCCallback() { @@ -182,21 +179,17 @@ private void stopTraffic() { @MXBean public interface Controller { void startTraffic(String remoteHost, int remotePort); - void stopTraffic(); } public static void main(String[] args) throws Exception { BasicConfigurator.configure(); - Logger.getRootLogger().setLevel(Level.DEBUG); Options options = new Options(); options.addOption("partitions", true, "Number of partitions"); options.addOption("threads", true, "Number of threads"); options.addOption("messageSize", true, "Message size in bytes"); options.addOption("numConnections", true, "Number of connections between nodes"); - options.addOption("maxChannelLifeMillis", true, - "Maximum length of time to keep Netty Channel open"); CommandLine commandLine = new GnuParser().parse(options, args); @@ -218,8 +211,7 @@ public void run() { new BenchmarkDriver(Integer.parseInt(commandLine.getArgs()[0]), Integer.parseInt(commandLine .getOptionValue("partitions", "1")), Integer.parseInt(commandLine.getOptionValue("threads", "1")), Integer.parseInt(commandLine.getOptionValue("messageSize", "1024")), - Integer.parseInt(commandLine.getOptionValue("numConnections", "1")), - Long.parseLong(commandLine.getOptionValue("maxChannelLifeMillis", "5000"))).run(); + Integer.parseInt(commandLine.getOptionValue("numConnections", "1"))).run(); latch.await(); }