From 21ecdc7c5ef7f4e2ad7a98239b5d70164ee83505 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Tue, 16 Sep 2014 09:48:14 -0700 Subject: [PATCH 1/2] refactored Netty IPC code into separte classes --- .../NettyHelixIPCBackPressureHandler.java | 59 +++++ .../netty/NettyHelixIPCCallbackHandler.java | 160 ++++++++++++ .../helix/ipc/netty/NettyHelixIPCService.java | 236 +++--------------- .../helix/ipc/netty/NettyHelixIPCUtils.java | 64 +++++ .../helix/ipc/benchmark/BenchmarkDriver.java | 2 - 5 files changed, 314 insertions(+), 207 deletions(-) create mode 100644 helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java create mode 100644 helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java create mode 100644 helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java new file mode 100644 index 0000000000..9eb5eadce5 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java @@ -0,0 +1,59 @@ +package org.apache.helix.ipc.netty; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.log4j.Logger; + +public class NettyHelixIPCBackPressureHandler extends SimpleChannelInboundHandler { + private static final Logger LOG = Logger.getLogger(NettyHelixIPCBackPressureHandler.class); + + private final Object sync = new Object(); + + public NettyHelixIPCBackPressureHandler() { + super(false); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ctx.fireChannelRead(msg); + } + + public void waitUntilWritable(Channel channel) throws InterruptedException { + synchronized (sync) { + while (channel.isOpen() && !channel.isWritable()) { + LOG.warn(channel + " is not writable, waiting until it is"); + sync.wait(); + } + } + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + synchronized (sync) { + if (ctx.channel().isWritable()) { + sync.notifyAll(); + } + } + } +} diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java new file mode 100644 index 0000000000..acd0af2604 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java @@ -0,0 +1,160 @@ +package org.apache.helix.ipc.netty; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.helix.ipc.netty.NettyHelixIPCUtils.*; + +import com.codahale.metrics.Meter; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import org.apache.helix.ipc.HelixIPCCallback; +import org.apache.helix.resolver.HelixMessageScope; + +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; + +@ChannelHandler.Sharable +public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler { + + private final String instanceName; + private final ConcurrentMap callbacks; + private final Meter statRxMsg; + private final Meter statRxBytes; + + public NettyHelixIPCCallbackHandler(String instanceName, + ConcurrentMap callbacks, + Meter statRxMsg, + Meter statRxBytes) { + super(false); // we will manage reference + this.instanceName = instanceName; + this.callbacks = callbacks; + this.statRxMsg = statRxMsg; + this.statRxBytes = statRxBytes; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { + try { + int idx = 0; + + // Message length + int messageLength = byteBuf.readInt(); + idx += 4; + + // Message version + @SuppressWarnings("unused") + int messageVersion = byteBuf.readInt(); + idx += 4; + + // Message type + int messageType = byteBuf.readInt(); + idx += 4; + + // Message ID + UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + idx += 16; + + // Cluster + byteBuf.readerIndex(idx); + int clusterSize = byteBuf.readInt(); + idx += 4; + checkLength("clusterSize", clusterSize, messageLength); + String clusterName = toNonEmptyString(clusterSize, byteBuf); + idx += clusterSize; + + // Resource + byteBuf.readerIndex(idx); + int resourceSize = byteBuf.readInt(); + idx += 4; + checkLength("resourceSize", resourceSize, messageLength); + String resourceName = toNonEmptyString(resourceSize, byteBuf); + idx += resourceSize; + + // Partition + byteBuf.readerIndex(idx); + int partitionSize = byteBuf.readInt(); + idx += 4; + checkLength("partitionSize", partitionSize, messageLength); + String partitionName = toNonEmptyString(partitionSize, byteBuf); + idx += partitionSize; + + // State + byteBuf.readerIndex(idx); + int stateSize = byteBuf.readInt(); + idx += 4; + checkLength("stateSize", stateSize, messageLength); + String state = toNonEmptyString(stateSize, byteBuf); + idx += stateSize; + + // Source instance + byteBuf.readerIndex(idx); + int srcInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("srcInstanceSize", srcInstanceSize, messageLength); + String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); + idx += srcInstanceSize; + + // Destination instance + byteBuf.readerIndex(idx); + int dstInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("dstInstanceSize", dstInstanceSize, messageLength); + String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); + idx += dstInstanceSize; + + // Position at message + byteBuf.readerIndex(idx + 4); + + // Error check + if (dstInstance == null) { + throw new IllegalStateException("Received message addressed to null destination from " + + srcInstance); + } else if (!dstInstance.equals(instanceName)) { + throw new IllegalStateException(instanceName + + " received message addressed to " + dstInstance + " from " + srcInstance); + } else if (callbacks.get(messageType) == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Build scope + HelixMessageScope scope = + new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName) + .partition(partitionName).state(state).sourceInstance(srcInstance).build(); + + // Get callback + HelixIPCCallback callback = callbacks.get(messageType); + if (callback == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Handle callback + callback.onMessage(scope, messageId, byteBuf); + + // Stats + statRxMsg.mark(); + statRxBytes.mark(messageLength); + } finally { + byteBuf.release(); + } + + } +} diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java index bf5161a688..f2bfccc9be 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java @@ -19,17 +19,16 @@ * under the License. */ +import static org.apache.helix.ipc.netty.NettyHelixIPCUtils.*; + import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -38,7 +37,6 @@ import io.netty.handler.codec.LengthFieldPrepender; import java.net.InetSocketAddress; -import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -51,7 +49,6 @@ import org.apache.helix.ipc.HelixIPCCallback; import org.apache.helix.ipc.HelixIPCService; import org.apache.helix.resolver.HelixAddress; -import org.apache.helix.resolver.HelixMessageScope; import org.apache.log4j.Logger; import com.codahale.metrics.Counter; @@ -152,18 +149,29 @@ public void start() throws Exception { // Report metrics via JMX jmxReporter = JmxReporter.forRegistry(metricRegistry).build(); jmxReporter.start(); + LOG.info("Registered JMX metrics reporter"); new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast( - new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, - LENGTH_FIELD_LENGTH, LENGTH_ADJUSTMENT, INITIAL_BYTES_TO_STRIP)); - socketChannel.pipeline().addLast(new HelixIPCCallbackHandler()); - } + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast( + new LengthFieldBasedFrameDecoder( + MAX_FRAME_LENGTH, + LENGTH_FIELD_OFFSET, + LENGTH_FIELD_LENGTH, + LENGTH_ADJUSTMENT, + INITIAL_BYTES_TO_STRIP)); + socketChannel.pipeline().addLast( + new NettyHelixIPCCallbackHandler( + config.getInstanceName(), + callbacks, + statRxMsg, + statRxBytes)); + } }).bind(new InetSocketAddress(config.getPort())); + LOG.info("Listening on port " + config.getPort()); clientBootstrap = new Bootstrap().group(eventLoopGroup).channel(NioSocketChannel.class) @@ -171,9 +179,8 @@ protected void initChannel(SocketChannel socketChannel) throws Exception { .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast( - new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true)); - socketChannel.pipeline().addLast(new BackPressureHandler()); + socketChannel.pipeline().addLast(new LengthFieldPrepender(LENGTH_FIELD_LENGTH, true)); + socketChannel.pipeline().addLast(new NettyHelixIPCBackPressureHandler()); } }); } @@ -185,7 +192,10 @@ protected void initChannel(SocketChannel socketChannel) throws Exception { public void shutdown() throws Exception { if (!isShutdown.getAndSet(true)) { jmxReporter.stop(); + LOG.info("Stopped JMX reporter"); + eventLoopGroup.shutdownGracefully(); + LOG.info("Shut down event loop group"); } } @@ -194,6 +204,9 @@ public void shutdown() throws Exception { */ @Override public void send(HelixAddress destination, int messageType, UUID messageId, ByteBuf message) { + if (LOG.isTraceEnabled()) { + LOG.trace("Sending " + messageId); + } // Send message try { // Get list of channels @@ -266,7 +279,8 @@ public void send(HelixAddress destination, int messageType, UUID messageId, Byte } // Send - BackPressureHandler backPressureHandler = channel.pipeline().get(BackPressureHandler.class); + NettyHelixIPCBackPressureHandler backPressureHandler + = channel.pipeline().get(NettyHelixIPCBackPressureHandler.class); backPressureHandler.waitUntilWritable(channel); channel.writeAndFlush(fullByteBuf); @@ -281,201 +295,13 @@ public void send(HelixAddress destination, int messageType, UUID messageId, Byte @Override public void registerCallback(int messageType, HelixIPCCallback callback) { callbacks.put(messageType, callback); - } - - private class BackPressureHandler extends SimpleChannelInboundHandler { - private final Object sync = new Object(); - - BackPressureHandler() { - super(false); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - ctx.fireChannelRead(msg); - } - - public void waitUntilWritable(Channel channel) throws InterruptedException { - synchronized (sync) { - while (channel.isOpen() && !channel.isWritable()) { - LOG.warn(channel + " is not writable, waiting until it is"); - sync.wait(); - } - } - } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) { - synchronized (sync) { - if (ctx.channel().isWritable()) { - sync.notifyAll(); - } - } - } - } - - @ChannelHandler.Sharable - private class HelixIPCCallbackHandler extends SimpleChannelInboundHandler { - - HelixIPCCallbackHandler() { - super(false); // we will manage reference - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { - try { - int idx = 0; - - // Message length - int messageLength = byteBuf.readInt(); - idx += 4; - - // Message version - @SuppressWarnings("unused") - int messageVersion = byteBuf.readInt(); - idx += 4; - - // Message type - int messageType = byteBuf.readInt(); - idx += 4; - - // Message ID - UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); - idx += 16; - - // Cluster - byteBuf.readerIndex(idx); - int clusterSize = byteBuf.readInt(); - idx += 4; - checkLength("clusterSize", clusterSize, messageLength); - String clusterName = toNonEmptyString(clusterSize, byteBuf); - idx += clusterSize; - - // Resource - byteBuf.readerIndex(idx); - int resourceSize = byteBuf.readInt(); - idx += 4; - checkLength("resourceSize", resourceSize, messageLength); - String resourceName = toNonEmptyString(resourceSize, byteBuf); - idx += resourceSize; - - // Partition - byteBuf.readerIndex(idx); - int partitionSize = byteBuf.readInt(); - idx += 4; - checkLength("partitionSize", partitionSize, messageLength); - String partitionName = toNonEmptyString(partitionSize, byteBuf); - idx += partitionSize; - - // State - byteBuf.readerIndex(idx); - int stateSize = byteBuf.readInt(); - idx += 4; - checkLength("stateSize", stateSize, messageLength); - String state = toNonEmptyString(stateSize, byteBuf); - idx += stateSize; - - // Source instance - byteBuf.readerIndex(idx); - int srcInstanceSize = byteBuf.readInt(); - idx += 4; - checkLength("srcInstanceSize", srcInstanceSize, messageLength); - String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); - idx += srcInstanceSize; - - // Destination instance - byteBuf.readerIndex(idx); - int dstInstanceSize = byteBuf.readInt(); - idx += 4; - checkLength("dstInstanceSize", dstInstanceSize, messageLength); - String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); - idx += dstInstanceSize; - - // Position at message - byteBuf.readerIndex(idx + 4); - - // Error check - if (dstInstance == null) { - throw new IllegalStateException("Received message addressed to null destination from " - + srcInstance); - } else if (!dstInstance.equals(config.getInstanceName())) { - throw new IllegalStateException(config.getInstanceName() - + " received message addressed to " + dstInstance + " from " + srcInstance); - } else if (callbacks.get(messageType) == null) { - throw new IllegalStateException("No callback registered for message type " + messageType); - } - - // Build scope - HelixMessageScope scope = - new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName) - .partition(partitionName).state(state).sourceInstance(srcInstance).build(); - - // Get callback - HelixIPCCallback callback = callbacks.get(messageType); - if (callback == null) { - throw new IllegalStateException("No callback registered for message type " + messageType); - } - - // Handle callback - callback.onMessage(scope, messageId, byteBuf); - - // Stats - statRxMsg.mark(); - statRxBytes.mark(messageLength); - } finally { - byteBuf.release(); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) { - LOG.error(cause); - } - } - - /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ - private static String toNonEmptyString(int length, ByteBuf byteBuf) { - if (byteBuf.readableBytes() >= length) { - return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); - } - return null; - } - - /** Writes [s.length(), s] to buf, or [0] if s is null */ - private static void writeStringWithLength(ByteBuf buf, String s) { - if (s == null) { - buf.writeInt(0); - return; - } - - buf.writeInt(s.length()); - for (int i = 0; i < s.length(); i++) { - buf.writeByte(s.charAt(i)); - } - } - - /** Returns the length of a string, or 0 if s is null */ - private static int getLength(String s) { - return s == null ? 0 : s.length(); - } - - /** - * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM - * exceptions) - */ - private static void checkLength(String fieldName, int length, int messageLength) - throws IllegalArgumentException { - if (length > messageLength) { - throw new IllegalArgumentException(fieldName + "=" + length - + " is greater than messageLength=" + messageLength); - } + LOG.info("Registered callback " + callback + " for message type " + messageType); } public static class Config { private String instanceName; private int port; private int numConnections = 1; - private long maxChannelLifeMillis = 5000; public Config setInstanceName(String instanceName) { this.instanceName = instanceName; diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java new file mode 100644 index 0000000000..b60ea08033 --- /dev/null +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java @@ -0,0 +1,64 @@ +package org.apache.helix.ipc.netty; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.Charset; + +public class NettyHelixIPCUtils { + /** Writes [s.length(), s] to buf, or [0] if s is null */ + public static void writeStringWithLength(ByteBuf buf, String s) { + if (s == null) { + buf.writeInt(0); + return; + } + + buf.writeInt(s.length()); + for (int i = 0; i < s.length(); i++) { + buf.writeByte(s.charAt(i)); + } + } + + /** Returns the length of a string, or 0 if s is null */ + public static int getLength(String s) { + return s == null ? 0 : s.length(); + } + + /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ + public static String toNonEmptyString(int length, ByteBuf byteBuf) { + if (byteBuf.readableBytes() >= length) { + return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); + } + return null; + } + + /** + * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM + * exceptions) + */ + public static void checkLength(String fieldName, int length, int messageLength) + throws IllegalArgumentException { + if (length > messageLength) { + throw new IllegalArgumentException(fieldName + "=" + length + + " is greater than messageLength=" + messageLength); + } + } +} diff --git a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java index 376e7aee48..e99114f343 100644 --- a/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java +++ b/helix-ipc/src/test/java/org/apache/helix/ipc/benchmark/BenchmarkDriver.java @@ -43,8 +43,6 @@ import org.apache.helix.resolver.HelixAddress; import org.apache.helix.resolver.HelixMessageScope; import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; import com.google.common.collect.ImmutableSet; From 58507f2b949c2a019d863ce15508af6bbc503495 Mon Sep 17 00:00:00 2001 From: Greg Brandt Date: Thu, 18 Sep 2014 09:13:41 -0700 Subject: [PATCH 2/2] changed indentation to 2 spaces, continuation to 4 spaces --- .../NettyHelixIPCBackPressureHandler.java | 48 ++-- .../netty/NettyHelixIPCCallbackHandler.java | 240 +++++++++--------- .../helix/ipc/netty/NettyHelixIPCService.java | 38 +-- .../helix/ipc/netty/NettyHelixIPCUtils.java | 60 ++--- 4 files changed, 193 insertions(+), 193 deletions(-) diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java index 9eb5eadce5..c250acafb7 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCBackPressureHandler.java @@ -26,34 +26,34 @@ import org.apache.log4j.Logger; public class NettyHelixIPCBackPressureHandler extends SimpleChannelInboundHandler { - private static final Logger LOG = Logger.getLogger(NettyHelixIPCBackPressureHandler.class); + private static final Logger LOG = Logger.getLogger(NettyHelixIPCBackPressureHandler.class); - private final Object sync = new Object(); + private final Object sync = new Object(); - public NettyHelixIPCBackPressureHandler() { - super(false); - } + public NettyHelixIPCBackPressureHandler() { + super(false); + } - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { - ctx.fireChannelRead(msg); - } + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ctx.fireChannelRead(msg); + } - public void waitUntilWritable(Channel channel) throws InterruptedException { - synchronized (sync) { - while (channel.isOpen() && !channel.isWritable()) { - LOG.warn(channel + " is not writable, waiting until it is"); - sync.wait(); - } - } + public void waitUntilWritable(Channel channel) throws InterruptedException { + synchronized (sync) { + while (channel.isOpen() && !channel.isWritable()) { + LOG.warn(channel + " is not writable, waiting until it is"); + sync.wait(); + } } - - @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) { - synchronized (sync) { - if (ctx.channel().isWritable()) { - sync.notifyAll(); - } - } + } + + @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) { + synchronized (sync) { + if (ctx.channel().isWritable()) { + sync.notifyAll(); + } } + } } diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java index acd0af2604..164e6d18a2 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCCallbackHandler.java @@ -35,126 +35,126 @@ @ChannelHandler.Sharable public class NettyHelixIPCCallbackHandler extends SimpleChannelInboundHandler { - private final String instanceName; - private final ConcurrentMap callbacks; - private final Meter statRxMsg; - private final Meter statRxBytes; - - public NettyHelixIPCCallbackHandler(String instanceName, - ConcurrentMap callbacks, - Meter statRxMsg, - Meter statRxBytes) { - super(false); // we will manage reference - this.instanceName = instanceName; - this.callbacks = callbacks; - this.statRxMsg = statRxMsg; - this.statRxBytes = statRxBytes; + private final String instanceName; + private final ConcurrentMap callbacks; + private final Meter statRxMsg; + private final Meter statRxBytes; + + public NettyHelixIPCCallbackHandler(String instanceName, + ConcurrentMap callbacks, + Meter statRxMsg, + Meter statRxBytes) { + super(false); // we will manage reference + this.instanceName = instanceName; + this.callbacks = callbacks; + this.statRxMsg = statRxMsg; + this.statRxBytes = statRxBytes; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { + try { + int idx = 0; + + // Message length + int messageLength = byteBuf.readInt(); + idx += 4; + + // Message version + @SuppressWarnings("unused") + int messageVersion = byteBuf.readInt(); + idx += 4; + + // Message type + int messageType = byteBuf.readInt(); + idx += 4; + + // Message ID + UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); + idx += 16; + + // Cluster + byteBuf.readerIndex(idx); + int clusterSize = byteBuf.readInt(); + idx += 4; + checkLength("clusterSize", clusterSize, messageLength); + String clusterName = toNonEmptyString(clusterSize, byteBuf); + idx += clusterSize; + + // Resource + byteBuf.readerIndex(idx); + int resourceSize = byteBuf.readInt(); + idx += 4; + checkLength("resourceSize", resourceSize, messageLength); + String resourceName = toNonEmptyString(resourceSize, byteBuf); + idx += resourceSize; + + // Partition + byteBuf.readerIndex(idx); + int partitionSize = byteBuf.readInt(); + idx += 4; + checkLength("partitionSize", partitionSize, messageLength); + String partitionName = toNonEmptyString(partitionSize, byteBuf); + idx += partitionSize; + + // State + byteBuf.readerIndex(idx); + int stateSize = byteBuf.readInt(); + idx += 4; + checkLength("stateSize", stateSize, messageLength); + String state = toNonEmptyString(stateSize, byteBuf); + idx += stateSize; + + // Source instance + byteBuf.readerIndex(idx); + int srcInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("srcInstanceSize", srcInstanceSize, messageLength); + String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); + idx += srcInstanceSize; + + // Destination instance + byteBuf.readerIndex(idx); + int dstInstanceSize = byteBuf.readInt(); + idx += 4; + checkLength("dstInstanceSize", dstInstanceSize, messageLength); + String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); + idx += dstInstanceSize; + + // Position at message + byteBuf.readerIndex(idx + 4); + + // Error check + if (dstInstance == null) { + throw new IllegalStateException("Received message addressed to null destination from " + + srcInstance); + } else if (!dstInstance.equals(instanceName)) { + throw new IllegalStateException(instanceName + + " received message addressed to " + dstInstance + " from " + srcInstance); + } else if (callbacks.get(messageType) == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Build scope + HelixMessageScope scope = + new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName) + .partition(partitionName).state(state).sourceInstance(srcInstance).build(); + + // Get callback + HelixIPCCallback callback = callbacks.get(messageType); + if (callback == null) { + throw new IllegalStateException("No callback registered for message type " + messageType); + } + + // Handle callback + callback.onMessage(scope, messageId, byteBuf); + + // Stats + statRxMsg.mark(); + statRxBytes.mark(messageLength); + } finally { + byteBuf.release(); } - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { - try { - int idx = 0; - - // Message length - int messageLength = byteBuf.readInt(); - idx += 4; - - // Message version - @SuppressWarnings("unused") - int messageVersion = byteBuf.readInt(); - idx += 4; - - // Message type - int messageType = byteBuf.readInt(); - idx += 4; - - // Message ID - UUID messageId = new UUID(byteBuf.readLong(), byteBuf.readLong()); - idx += 16; - - // Cluster - byteBuf.readerIndex(idx); - int clusterSize = byteBuf.readInt(); - idx += 4; - checkLength("clusterSize", clusterSize, messageLength); - String clusterName = toNonEmptyString(clusterSize, byteBuf); - idx += clusterSize; - - // Resource - byteBuf.readerIndex(idx); - int resourceSize = byteBuf.readInt(); - idx += 4; - checkLength("resourceSize", resourceSize, messageLength); - String resourceName = toNonEmptyString(resourceSize, byteBuf); - idx += resourceSize; - - // Partition - byteBuf.readerIndex(idx); - int partitionSize = byteBuf.readInt(); - idx += 4; - checkLength("partitionSize", partitionSize, messageLength); - String partitionName = toNonEmptyString(partitionSize, byteBuf); - idx += partitionSize; - - // State - byteBuf.readerIndex(idx); - int stateSize = byteBuf.readInt(); - idx += 4; - checkLength("stateSize", stateSize, messageLength); - String state = toNonEmptyString(stateSize, byteBuf); - idx += stateSize; - - // Source instance - byteBuf.readerIndex(idx); - int srcInstanceSize = byteBuf.readInt(); - idx += 4; - checkLength("srcInstanceSize", srcInstanceSize, messageLength); - String srcInstance = toNonEmptyString(srcInstanceSize, byteBuf); - idx += srcInstanceSize; - - // Destination instance - byteBuf.readerIndex(idx); - int dstInstanceSize = byteBuf.readInt(); - idx += 4; - checkLength("dstInstanceSize", dstInstanceSize, messageLength); - String dstInstance = toNonEmptyString(dstInstanceSize, byteBuf); - idx += dstInstanceSize; - - // Position at message - byteBuf.readerIndex(idx + 4); - - // Error check - if (dstInstance == null) { - throw new IllegalStateException("Received message addressed to null destination from " - + srcInstance); - } else if (!dstInstance.equals(instanceName)) { - throw new IllegalStateException(instanceName - + " received message addressed to " + dstInstance + " from " + srcInstance); - } else if (callbacks.get(messageType) == null) { - throw new IllegalStateException("No callback registered for message type " + messageType); - } - - // Build scope - HelixMessageScope scope = - new HelixMessageScope.Builder().cluster(clusterName).resource(resourceName) - .partition(partitionName).state(state).sourceInstance(srcInstance).build(); - - // Get callback - HelixIPCCallback callback = callbacks.get(messageType); - if (callback == null) { - throw new IllegalStateException("No callback registered for message type " + messageType); - } - - // Handle callback - callback.onMessage(scope, messageId, byteBuf); - - // Stats - statRxMsg.mark(); - statRxBytes.mark(messageLength); - } finally { - byteBuf.release(); - } - - } + } } diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java index f2bfccc9be..68f6fbc1ee 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCService.java @@ -60,7 +60,7 @@ * Provides partition/state-level messaging among nodes in a Helix cluster. *

* The message format is (where len == 4B, and contains the length of the next field) - * + * *

  *      +----------------------+
  *      | totalLength (4B)     |
@@ -86,7 +86,7 @@
  *      | len | message        |
  *      +----------------------+
  * 
- * + * *

*/ public class NettyHelixIPCService implements HelixIPCService { @@ -154,22 +154,22 @@ public void start() throws Exception { new ServerBootstrap().group(eventLoopGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast( - new LengthFieldBasedFrameDecoder( - MAX_FRAME_LENGTH, - LENGTH_FIELD_OFFSET, - LENGTH_FIELD_LENGTH, - LENGTH_ADJUSTMENT, - INITIAL_BYTES_TO_STRIP)); - socketChannel.pipeline().addLast( - new NettyHelixIPCCallbackHandler( - config.getInstanceName(), - callbacks, - statRxMsg, - statRxBytes)); - } + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast( + new LengthFieldBasedFrameDecoder( + MAX_FRAME_LENGTH, + LENGTH_FIELD_OFFSET, + LENGTH_FIELD_LENGTH, + LENGTH_ADJUSTMENT, + INITIAL_BYTES_TO_STRIP)); + socketChannel.pipeline().addLast( + new NettyHelixIPCCallbackHandler( + config.getInstanceName(), + callbacks, + statRxMsg, + statRxBytes)); + } }).bind(new InetSocketAddress(config.getPort())); LOG.info("Listening on port " + config.getPort()); @@ -280,7 +280,7 @@ public void send(HelixAddress destination, int messageType, UUID messageId, Byte // Send NettyHelixIPCBackPressureHandler backPressureHandler - = channel.pipeline().get(NettyHelixIPCBackPressureHandler.class); + = channel.pipeline().get(NettyHelixIPCBackPressureHandler.class); backPressureHandler.waitUntilWritable(channel); channel.writeAndFlush(fullByteBuf); diff --git a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java index b60ea08033..77b91234af 100644 --- a/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java +++ b/helix-ipc/src/main/java/org/apache/helix/ipc/netty/NettyHelixIPCUtils.java @@ -24,41 +24,41 @@ import java.nio.charset.Charset; public class NettyHelixIPCUtils { - /** Writes [s.length(), s] to buf, or [0] if s is null */ - public static void writeStringWithLength(ByteBuf buf, String s) { - if (s == null) { - buf.writeInt(0); - return; - } - - buf.writeInt(s.length()); - for (int i = 0; i < s.length(); i++) { - buf.writeByte(s.charAt(i)); - } + /** Writes [s.length(), s] to buf, or [0] if s is null */ + public static void writeStringWithLength(ByteBuf buf, String s) { + if (s == null) { + buf.writeInt(0); + return; } - /** Returns the length of a string, or 0 if s is null */ - public static int getLength(String s) { - return s == null ? 0 : s.length(); + buf.writeInt(s.length()); + for (int i = 0; i < s.length(); i++) { + buf.writeByte(s.charAt(i)); } + } + + /** Returns the length of a string, or 0 if s is null */ + public static int getLength(String s) { + return s == null ? 0 : s.length(); + } - /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ - public static String toNonEmptyString(int length, ByteBuf byteBuf) { - if (byteBuf.readableBytes() >= length) { - return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); - } - return null; + /** Given a byte buf w/ a certain reader index, encodes the next length bytes as a String */ + public static String toNonEmptyString(int length, ByteBuf byteBuf) { + if (byteBuf.readableBytes() >= length) { + return byteBuf.toString(byteBuf.readerIndex(), length, Charset.defaultCharset()); } + return null; + } - /** - * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM - * exceptions) - */ - public static void checkLength(String fieldName, int length, int messageLength) - throws IllegalArgumentException { - if (length > messageLength) { - throw new IllegalArgumentException(fieldName + "=" + length - + " is greater than messageLength=" + messageLength); - } + /** + * @throws java.lang.IllegalArgumentException if length > messageLength (attempt to prevent OOM + * exceptions) + */ + public static void checkLength(String fieldName, int length, int messageLength) + throws IllegalArgumentException { + if (length > messageLength) { + throw new IllegalArgumentException(fieldName + "=" + length + + " is greater than messageLength=" + messageLength); } + } }