Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27111 Make Netty channel bytebuf allocator configurable. #4525

Merged
merged 1 commit into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ public static <T> T instantiate(final String className, Constructor<T> ctor, Obj
}
}

@SuppressWarnings("unchecked")
public static <T> T newInstance(String className, Object... params) {
Class<T> type;
try {
type = (Class<T>) getClassLoader().loadClass(className);
} catch (ClassNotFoundException | ClassCastException e) {
throw new UnsupportedOperationException("Unable to load specified class " + className, e);
}
return instantiate(type.getName(), findConstructor(type, params), params);
}

public static ClassLoader getClassLoader() {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = ReflectionUtils.class.getClassLoader();
}
if (cl == null) {
cl = ClassLoader.getSystemClassLoader();
}
if (cl == null) {
throw new RuntimeException("A ClassLoader could not be found");
}
return cl;
}

public static <T> T newInstance(Class<T> type, Object... params) {
return instantiate(type.getName(), findConstructor(type, params), params);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;

/**
* A pooled ByteBufAllocator that does not prefer direct buffers regardless of platform settings.
* <p>
* In some cases direct buffers are still required, like IO buffers where the buffer will be used in
* conjunction with a native method call, so we cannot force all buffer usage on heap. But we can
* strongly prefer it.
*/
@InterfaceAudience.Private
public class HeapByteBufAllocator extends AbstractByteBufAllocator {

public static final HeapByteBufAllocator DEFAULT = new HeapByteBufAllocator();

private final PooledByteBufAllocator delegate =
new PooledByteBufAllocator(false /* preferDirect */);

@Override
public boolean isDirectBufferPooled() {
return delegate.isDirectBufferPooled();
}

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return delegate.heapBuffer(initialCapacity, maxCapacity);
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
return delegate.directBuffer(initialCapacity, maxCapacity);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -65,18 +69,35 @@ public class NettyRpcServer extends RpcServer {
"hbase.netty.eventloop.rpcserver.thread.count";
private static final int EVENTLOOP_THREADCOUNT_DEFAULT = 0;

/**
* Name of property to change the byte buf allocator for the netty channels. Default is no value,
* which causes us to use PooledByteBufAllocator. Valid settings here are "pooled", "unpooled",
* and "heap", or, the name of a class implementing ByteBufAllocator.
* <p>
* "pooled" and "unpooled" may prefer direct memory depending on netty configuration, which is
* controlled by platform specific code and documented system properties.
* <p>
* "heap" will prefer heap arena allocations.
*/
public static final String HBASE_NETTY_ALLOCATOR_KEY = "hbase.netty.rpcserver.allocator";
static final String POOLED_ALLOCATOR_TYPE = "pooled";
static final String UNPOOLED_ALLOCATOR_TYPE = "unpooled";
static final String HEAP_ALLOCATOR_TYPE = "heap";

private final InetSocketAddress bindAddress;

private final CountDownLatch closed = new CountDownLatch(1);
private final Channel serverChannel;
private final ChannelGroup allChannels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE, true);
private final ByteBufAllocator channelAllocator;

public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterface> services,
InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler,
boolean reservoirEnabled) throws IOException {
super(server, name, services, bindAddress, conf, scheduler, reservoirEnabled);
this.bindAddress = bindAddress;
this.channelAllocator = getChannelAllocator(conf);
EventLoopGroup eventLoopGroup;
Class<? extends ServerChannel> channelClass;
if (server instanceof HRegionServer) {
Expand All @@ -97,9 +118,9 @@ public NettyRpcServer(Server server, String name, List<BlockingServiceAndInterfa
.childOption(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childHandler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {
ch.config().setAllocator(channelAllocator);
ChannelPipeline pipeline = ch.pipeline();
FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6);
preambleDecoder.setSingleDecode(true);
Expand All @@ -120,6 +141,36 @@ protected void initChannel(Channel ch) throws Exception {
this.scheduler.init(new RpcSchedulerContext(this));
}

private ByteBufAllocator getChannelAllocator(Configuration conf) throws IOException {
final String value = conf.get(HBASE_NETTY_ALLOCATOR_KEY);
if (value != null) {
if (POOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
return PooledByteBufAllocator.DEFAULT;
} else if (UNPOOLED_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
LOG.info("Using {} for buffer allocation", UnpooledByteBufAllocator.class.getName());
return UnpooledByteBufAllocator.DEFAULT;
} else if (HEAP_ALLOCATOR_TYPE.equalsIgnoreCase(value)) {
LOG.info("Using {} for buffer allocation", HeapByteBufAllocator.class.getName());
return HeapByteBufAllocator.DEFAULT;
} else {
// If the value is none of the recognized labels, treat it as a class name. This allows the
// user to supply a custom implementation, perhaps for debugging.
try {
// ReflectionUtils throws UnsupportedOperationException if there are any problems.
ByteBufAllocator alloc = (ByteBufAllocator) ReflectionUtils.newInstance(value);
LOG.info("Using {} for buffer allocation", value);
return alloc;
} catch (ClassCastException | UnsupportedOperationException e) {
throw new IOException(e);
}
}
} else {
LOG.info("Using {} for buffer allocation", PooledByteBufAllocator.class.getName());
return PooledByteBufAllocator.DEFAULT;
}
}

@InterfaceAudience.Private
protected NettyRpcServerPreambleHandler createNettyRpcServerPreambleHandler() {
return new NettyRpcServerPreambleHandler(NettyRpcServer.this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.buffer.AbstractByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;

/**
* A custom byte buf allocator for TestNettyRpcServer.
*/
public class SimpleByteBufAllocator extends AbstractByteBufAllocator implements ByteBufAllocator {

static final Logger LOG = LoggerFactory.getLogger(SimpleByteBufAllocator.class);

@Override
public boolean isDirectBufferPooled() {
return false;
}

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
LOG.info("newHeapBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, this is good for debugging

return Unpooled.buffer(initialCapacity, maxCapacity);
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
LOG.info("newDirectBuffer initialCapacity={}, maxCapacity={}", initialCapacity, maxCapacity);
return Unpooled.directBuffer(initialCapacity, maxCapacity);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand All @@ -33,16 +35,19 @@
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RPCTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;

@Category({ RPCTests.class, MediumTests.class })
@RunWith(Parameterized.class)
public class TestNettyRpcServer {

@ClassRule
Expand All @@ -57,22 +62,28 @@ public class TestNettyRpcServer {
private static byte[] FAMILY = Bytes.toBytes("f1");
private static byte[] PRIVATE_COL = Bytes.toBytes("private");
private static byte[] PUBLIC_COL = Bytes.toBytes("public");
@Parameterized.Parameter
public String allocatorType;

@Before
public void setup() {
TABLE = TableName.valueOf(name.getMethodName());
@Parameters
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] { { NettyRpcServer.POOLED_ALLOCATOR_TYPE },
{ NettyRpcServer.UNPOOLED_ALLOCATOR_TYPE }, { NettyRpcServer.HEAP_ALLOCATOR_TYPE },
{ SimpleByteBufAllocator.class.getName() } });
}

@BeforeClass
public static void setupBeforeClass() throws Exception {
@Before
public void setup() throws Exception {
TABLE = TableName.valueOf(name.getMethodName().replace('[', '_').replace(']', '_'));
TEST_UTIL = new HBaseTestingUtil();
TEST_UTIL.getConfiguration().set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY,
NettyRpcServer.class.getName());
TEST_UTIL.getConfiguration().set(NettyRpcServer.HBASE_NETTY_ALLOCATOR_KEY, allocatorType);
TEST_UTIL.startMiniCluster();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

Expand Down