Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,59 @@


/**
* Utility class for setting limits in the PooledByteBufAllocator.
* Utility class that creates a {@link PooledByteBufAllocator} with a reduced number of direct arenas to limit the
* direct memory retained by the pool, and owns the process-wide shared instance of it. Thread-safe.
*/
public class PooledByteBufAllocatorWithLimits {
private static final Logger LOGGER = LoggerFactory.getLogger(PooledByteBufAllocatorWithLimits.class);
private static volatile PooledByteBufAllocator _sharedBufferAllocatorWithLimits;

private PooledByteBufAllocatorWithLimits() {
}

/**
* Returns the shared allocator, creating it on first use. All unshaded Netty query transports within the process
* (all broker side {@link ServerChannels} and the server side {@link QueryServer}) must share this single
* allocator: pooled arenas retain chunk memory after the buffers allocated from them are released, and free space
* in one allocator's pool can never serve another allocator's allocations, so per-connection allocators can retain
* many times the intended amount of direct memory and exhaust it. Note that the reduced arena count limits the
* worst case retention but is not a hard cap on direct memory usage. The gRPC based transports use shaded Netty
* classes and maintain their own allocators.
*/
public static PooledByteBufAllocator getSharedBufferAllocatorWithLimits() {
PooledByteBufAllocator sharedAllocator = _sharedBufferAllocatorWithLimits;
if (sharedAllocator == null) {
synchronized (PooledByteBufAllocatorWithLimits.class) {
sharedAllocator = _sharedBufferAllocatorWithLimits;
if (sharedAllocator == null) {
sharedAllocator = getBufferAllocatorWithLimits(PooledByteBufAllocator.DEFAULT.metric());
_sharedBufferAllocatorWithLimits = sharedAllocator;
}
}
}
return sharedAllocator;
}

// Reduce the number of direct arenas when using netty channels on broker and server side to limit the direct
// memory usage
public static PooledByteBufAllocator getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
private static PooledByteBufAllocator getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
int defaultPageSize = SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
int defaultMaxOrder = SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 9);
final int defaultChunkSize = defaultPageSize << defaultMaxOrder;
long maxDirectMemory = PlatformDependent.maxDirectMemory();
long remainingDirectMemory = maxDirectMemory - getReservedMemory();

// Floor the default at 1: this allocator is created once and shared for the lifetime of the process, so a
// depleted direct memory snapshot at creation time must not permanently disable pooling. An explicit
// io.netty.allocator.numDirectArenas=0 still disables direct arenas.
int numDirectArenas = Math.max(0, SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
(int) Math.min(defaultMinNumArena, remainingDirectMemory / defaultChunkSize / 5)));
(int) Math.max(1, Math.min(defaultMinNumArena, remainingDirectMemory / defaultChunkSize / 5))));
boolean useCacheForAllThreads = SystemPropertyUtil.getBoolean("io.netty.allocator.useCacheForAllThreads", false);

LOGGER.info("Creating PooledByteBufAllocator with numDirectArenas: {}, numHeapArenas: {}, chunkSize: {}, "
+ "remainingDirectMemory: {}", numDirectArenas, metric.numHeapArenas(), defaultChunkSize,
remainingDirectMemory);
return new PooledByteBufAllocator(true, metric.numHeapArenas(), numDirectArenas, defaultPageSize, defaultMaxOrder,
metric.smallCacheSize(), metric.normalCacheSize(), useCacheForAllThreads);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.PlatformDependent;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.NettyConfig;
Expand Down Expand Up @@ -117,12 +118,10 @@ public void start() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();

PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
ServerMetrics metrics = ServerMetrics.get();
PooledByteBufAllocator bufAllocatorWithLimits =
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
metric = bufAllocatorWithLimits.metric();
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
PooledByteBufAllocatorMetric metric = bufAllocatorWithLimits.metric();
ServerMetrics metrics = ServerMetrics.get();
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT, metric::numDirectArenas);
Expand Down Expand Up @@ -186,4 +185,9 @@ ServerSocketChannel getChannel() {
int getConnectedChannelCount() {
return _allChannels.size();
}

@VisibleForTesting
Set<SocketChannel> getConnectedChannels() {
return _allChannels.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class ServerChannels {
private final EventLoopGroup _eventLoopGroup;
private final Class<? extends SocketChannel> _channelClass;
private final ThreadAccountant _threadAccountant;
private final PooledByteBufAllocator _bufAllocatorWithLimits;

private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel> _serverToChannelMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -126,6 +127,17 @@ public ServerChannels(QueryRouter queryRouter, @Nullable NettyConfig nettyConfig
_queryRouter = queryRouter;
_tlsConfig = tlsConfig;
_threadAccountant = threadAccountant;

_bufAllocatorWithLimits = PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
PooledByteBufAllocatorMetric metric = _bufAllocatorWithLimits.metric();
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT, metric::numDirectArenas);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP, metric::numHeapArenas);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL, metric::smallCacheSize);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL, metric::normalCacheSize);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE, metric::numThreadLocalCaches);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, metric::chunkSize);
}

public void sendRequest(String rawTableName, AsyncQueryResponse asyncQueryResponse,
Expand Down Expand Up @@ -165,22 +177,8 @@ class ServerChannel {

ServerChannel(ServerRoutingInstance serverRoutingInstance) {
_serverRoutingInstance = serverRoutingInstance;
PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
PooledByteBufAllocator bufAllocatorWithLimits =
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
metric = bufAllocatorWithLimits.metric();
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY, metric::usedDirectMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY, metric::usedHeapMemory);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT, metric::numDirectArenas);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP, metric::numHeapArenas);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL, metric::smallCacheSize);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL, metric::normalCacheSize);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE, metric::numThreadLocalCaches);
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, metric::chunkSize);

_bootstrap = new Bootstrap().remoteAddress(serverRoutingInstance.getHostname(), serverRoutingInstance.getPort())
.option(ChannelOption.ALLOCATOR, bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
.option(ChannelOption.ALLOCATOR, _bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
.option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.core.transport;

import io.netty.channel.ChannelHandler;
import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.commons.io.IOUtils;
Expand All @@ -34,6 +35,7 @@

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;


Expand All @@ -58,6 +60,10 @@ public void startAndStop(final boolean nativeTransportEnabled) {
QueryServer server = new QueryServer(0, nettyConfig, tlsConfig, channelHandler);
server.start();

// The server should use the shared process-wide bounded allocator
assertSame(server.getChannel().config().getAllocator(),
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());

final InetSocketAddress serverAddress = server.getChannel().localAddress();

assertTrue(connectionOk(serverAddress));
Expand All @@ -82,6 +88,11 @@ public void testAllChannelsCleanupOnClose()
TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() > 0, 5_000L,
"Channel was not registered in _allChannels");

// The accepted child channels (which allocate the request/response buffers) must also use the shared allocator
SocketChannel connectedChannel = server.getConnectedChannels().iterator().next();
assertSame(connectedChannel.config().getAllocator(),
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());

socket.close();

TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() == 0, 5_000L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pinot.core.transport;

import com.sun.net.httpserver.HttpServer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import org.apache.pinot.common.config.NettyConfig;
Expand All @@ -42,6 +44,8 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertSame;


public class ServerChannelsTest {
Expand Down Expand Up @@ -91,6 +95,33 @@ public void testConnect(boolean nativeTransportEnabled)
}
}

@Test
public void testChannelsShareBufferAllocator() {
ServerChannels serverChannels =
new ServerChannels(mock(QueryRouter.class), null, null, ThreadAccountantUtils.getNoOpAccountant());
ServerChannels otherServerChannels =
new ServerChannels(mock(QueryRouter.class), null, null, ThreadAccountantUtils.getNoOpAccountant());
try {
ByteBufAllocator allocator = getBootstrapAllocator(
serverChannels.getOrCreateServerChannel(new ServerRoutingInstance("localhost", 12345, TableType.OFFLINE)));
assertNotNull(allocator);
assertSame(allocator, PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
// All channels created by a ServerChannels use the same allocator
assertSame(getBootstrapAllocator(serverChannels.getOrCreateServerChannel(
new ServerRoutingInstance("localhost", 12346, TableType.REALTIME))), allocator);
// Channels created by another ServerChannels instance (e.g. the TLS one) share it as well
assertSame(getBootstrapAllocator(otherServerChannels.getOrCreateServerChannel(
new ServerRoutingInstance("localhost", 12347, TableType.OFFLINE))), allocator);
} finally {
serverChannels.shutDown();
otherServerChannels.shutDown();
}
}

private static ByteBufAllocator getBootstrapAllocator(ServerChannels.ServerChannel serverChannel) {
return (ByteBufAllocator) serverChannel._bootstrap.config().options().get(ChannelOption.ALLOCATOR);
}

@SuppressWarnings("unchecked")
@Test
public void testWriteFailureClosesChannelAndFailsQuery() {
Expand Down
Loading