From ef88483653ffc02a80d3b3db65bcd2dad3b45a12 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sun, 12 Jun 2022 10:48:43 -0700 Subject: [PATCH] HBASE-26708 Make Netty channel bytebuf allocator configurable. Support site configuration of the bytebuf allocator that Netty will use for NettyRpcServer channels. Property name is 'hbase.netty.rpcserver.allocator'. Default is no value, which is equivalent to "pooled". Valid values are: - "pooled": use PooledByteBufAllocator - "unpooled": use UnpooledByteBufAllocator - "heap": use HeapByteBufAllocator, which is a PooledByteBufAllocator that preferentially allocates buffers on heap wherever possible --- .../hbase/ipc/HeapByteBufAllocator.java | 41 +++++++++++++++++++ .../hadoop/hbase/ipc/NettyRpcServer.java | 40 +++++++++++++++++- .../hadoop/hbase/ipc/TestNettyRpcServer.java | 28 +++++++++---- 3 files changed, 99 insertions(+), 10 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java new file mode 100644 index 000000000000..ad80ac4c62e3 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/HeapByteBufAllocator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; + +/** + * A PooledByteBufAllocator that does not prefer direct buffers regardless of platform settings. + *

+ * In some cases direct buffers are still required, like IO buffers where the buffer will be used in + * conjunction with a native method call, so we cannot force all buffer usage on heap. But we can + * strongly prefer it. + */ +@InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.CONFIG }) +public class HeapByteBufAllocator extends PooledByteBufAllocator { + + public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator(); + + public HeapByteBufAllocator() { + super(false /* preferDirect */); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 9c8319944e78..b24336e9552c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -35,6 +35,9 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator; import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; @@ -65,18 +68,35 @@ public class NettyRpcServer extends RpcServer { "hbase.netty.eventloop.rpcserver.thread.count"; private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0; + /** + * Name of property to change the byte buf allocator for the netty channels. Default is no value, + * which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled", + * and "heap". + *

+ * "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is + * controlled by platform specific code and documented system properties. + *

+ * "heap" will prefer heap arena allocations. + */ + public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator"; + static final String POOLED_ALLOCATOR_TYPE = "pooled"; + static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled"; + static final String HEAP_ALLOCATOR_TYPE = "heap"; + private final InetSocketAddress bindAddress; private final CountDownLatch closed = new CountDownLatch(1); private final Channel serverChannel; private final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true); + private final ByteBufAllocator channelAllocator; public NettyRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler, boolean reservoirEnabled) throws IOException { super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled); this.bindAddress = bindAddress; + this.channelAllocator = getChannelAllocator(conf); EventLoopGroup eventLoopGroup; Class channelClass; if (server instanceof HRegionServer) { @@ -97,9 +117,9 @@ public NettyRpcServer(Server server, String name, List() { - @Override protected void initChannel(Channel ch) throws Exception { + ch.config().setAllocator(channelAllocator); ChannelPipeline pipeline = ch.pipeline(); FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); preambleDecoder.setSingleDecode(true); @@ -120,6 +140,24 @@ protected void initChannel(Channel ch) throws Exception { this.scheduler.init(new RpcSchedulerContext(this)); } + private ByteBufAllocator getChannelAllocator(Configuration conf) { + final String allocatorType = conf.get(HBASE_NETTY_ALLOCATOR_KEY); + if (allocatorType != null) { + if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(allocatorType)) { + LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); + return PooledByteBufAllocator.DEFAULT; + } else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(allocatorType)) { + LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName()); + return UnpooledByteBufAllocator.DEFAULT; + } else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(allocatorType)) { + LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName()); + return HeapByteBufAllocator.DEFAULT; + } + } + LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName()); + return PooledByteBufAllocator.DEFAULT; + } + @InterfaceAudience.Private protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() { return new NettyRpcServerPreambleHandler(NettyRpcServer.this); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java index b1db87781460..c5f2cffc26b4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyRpcServer.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -33,16 +35,19 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; @Category({ RPCTests.class, MediumTests.class }) +@RunWith(Parameterized.class) public class TestNettyRpcServer { @ClassRule @@ -57,22 +62,27 @@ public class TestNettyRpcServer { private static byte[] FAMILY = Bytes.toBytes("f1"); private static byte[] PRIVATE_COL = Bytes.toBytes("private"); private static byte[] PUBLIC_COL = Bytes.toBytes("public"); + @Parameterized.Parameter + public String allocatorType; - @Before - public void setup() { - TABLE = TableName.valueOf(name.getMethodName()); + @Parameters + public static Collection parameters() { + return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE }, + { NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE } }); } - @BeforeClass - public static void setupBeforeClass() throws Exception { + @Before + public void setup() throws Exception { + TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_')); TEST_UTIL = new HBaseTestingUtil(); TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, NettyRpcServer.class.getName()); + TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType); TEST_UTIL.startMiniCluster(); } - @AfterClass - public static void tearDownAfterClass() throws Exception { + @After + public void tearDown() throws Exception { TEST_UTIL.shutdownMiniCluster(); }