From b36993985271ee58a011cbed6c57acaa859b0ab1 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 30 Oct 2019 17:54:21 -0700 Subject: [PATCH 1/8] WIP: trying to lower netty's direct chunk size --- .../main/java/io/grpc/netty/NettyClientTransport.java | 1 + .../io/grpc/netty/NettyWritableBufferAllocator.java | 2 +- netty/src/main/java/io/grpc/netty/Utils.java | 11 +++++++++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index c1359d28143..9b4e4243771 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -225,6 +225,7 @@ public Runnable start(Listener transportListener) { ChannelHandler negotiationHandler = negotiator.newHandler(handler); Bootstrap b = new Bootstrap(); + b.option(ALLOCATOR, SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR)); b.attr(LOGGER_KEY, channelLogger); b.group(eventLoop); b.channelFactory(channelFactory); diff --git a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java index d0db923636d..4ccbea4da57 100644 --- a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java +++ b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java @@ -37,7 +37,7 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator { private static final int MIN_BUFFER = 4096; // Set the maximum buffer size to 1MB - private static final int MAX_BUFFER = 1024 * 1024; + static final int MAX_BUFFER = 1024 * 1024; private final ByteBufAllocator allocator; diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 41d5699ea78..aed3ad564c4 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -82,6 +82,17 @@ class Utils { = new DefaultEventLoopGroupResource(0, "grpc-nio-worker-ELG", EventLoopGroupType.NIO); public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP; public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; + public static final Resource BYTE_BUF_ALLOCATOR = + new Resource() { + @Override + public ByteBufAllocator create() { + return PooledByteBufAllocator() + // TODO(zhangkun): find out how to copy the default values of most parameters from: + // https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java + // And https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/internal/PlatformDependent.java#L199 + // for whether to prefer direct byte bufs + } + } public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; From 516902437ab05e5ed9c559299fc292134426f635 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Mon, 4 Nov 2019 16:40:27 -0800 Subject: [PATCH 2/8] WIP: setting up PooledByteBufAllocator --- .../io/grpc/netty/NettyClientTransport.java | 4 +++- .../netty/NettyWritableBufferAllocator.java | 2 +- netty/src/main/java/io/grpc/netty/Utils.java | 24 +++++++++++++++---- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 9b4e4243771..e51670ae27e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -101,6 +101,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final LocalSocketPicker localSocketPicker; private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; + private PooledByteBufAllocator allocator; NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, @@ -225,7 +226,8 @@ public Runnable start(Listener transportListener) { ChannelHandler negotiationHandler = negotiator.newHandler(handler); Bootstrap b = new Bootstrap(); - b.option(ALLOCATOR, SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR)); + allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR); + b.option(ALLOCATOR, allocator); b.attr(LOGGER_KEY, channelLogger); b.group(eventLoop); b.channelFactory(channelFactory); diff --git a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java index 4ccbea4da57..d0db923636d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java +++ b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java @@ -37,7 +37,7 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator { private static final int MIN_BUFFER = 4096; // Set the maximum buffer size to 1MB - static final int MAX_BUFFER = 1024 * 1024; + private static final int MAX_BUFFER = 1024 * 1024; private final ByteBufAllocator allocator; diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index aed3ad564c4..1f5aa4b13e9 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -34,6 +34,8 @@ import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders; import io.grpc.netty.NettySocketSupport.NativeSocketOptions; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFactory; @@ -82,15 +84,27 @@ class Utils { = new DefaultEventLoopGroupResource(0, "grpc-nio-worker-ELG", EventLoopGroupType.NIO); public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP; public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; + public static final Resource BYTE_BUF_ALLOCATOR = new Resource() { @Override public ByteBufAllocator create() { - return PooledByteBufAllocator() - // TODO(zhangkun): find out how to copy the default values of most parameters from: - // https://github.com/netty/netty/blob/4.1/buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java - // And https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/internal/PlatformDependent.java#L199 - // for whether to prefer direct byte bufs + int maxOrder; + if (System.getProperty("io.netty.allocator.maxOrder") == null) { + maxOrder = 7; + } else { + maxOrder = PooledByteBufAllocator.defaultMaxOrder(); + } + return PooledByteBufAllocator( + PooledByteBufAllocator.defaultPreferDirect(), + PooledByteBufAllocator.defaultNumHeapArena(), + PooledByteBufAllocator.defaultNumDirectArena(), + PooledByteBufAllocator.defaultPageSize(), + maxOrder, + PooledByteBufAllocator.defaultTinyCacheSize(), + PooledByteBufAllocator.defaultSmallCacheSize(), + PooledByteBufAllocator.defaultNormalCacheSize(), + PooledByteBufAllocator.defaultUseCacheForAllThreads()); } } From 816593e2b627d952304569b170e04ee8b97f0fab Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Tue, 5 Nov 2019 16:53:01 -0800 Subject: [PATCH 3/8] Use custom PooledByteBufAllocator for client --- .../java/io/grpc/netty/NettyChannelBuilder.java | 13 +++++++++++-- .../java/io/grpc/netty/NettyClientTransport.java | 7 +++++-- netty/src/main/java/io/grpc/netty/Utils.java | 9 +++++++-- .../io/grpc/netty/NettyClientTransportTest.java | 10 +++++++--- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 84d47b8feee..8f999403a74 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -40,6 +40,7 @@ import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; @@ -73,6 +74,8 @@ public final class NettyChannelBuilder new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE); private static final ObjectPool DEFAULT_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); + private static final ObjectPool ALLOCATOR_POOL = + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final Map, Object> channelOptions = new HashMap<>(); @@ -419,7 +422,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory( negotiator, channelFactory, channelOptions, - eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), + eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, useGetForSafeMethods); } @@ -534,6 +537,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final Map, ?> channelOptions; private final ObjectPool groupPool; private final EventLoopGroup group; + private final ObjectPool allocatorPool; + private final ByteBufAllocator allocator; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -549,6 +554,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto NettyTransportFactory(ProtocolNegotiator protocolNegotiator, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool groupPool, + ObjectPool allocatorPool, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, @@ -558,6 +564,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto this.channelOptions = new HashMap, Object>(channelOptions); this.groupPool = groupPool; this.group = groupPool.getObject(); + this.allocatorPool = allocatorPool; + this.allocator = allocatorPool.getObject(); this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -596,7 +604,7 @@ public void run() { // TODO(carl-mastrangelo): Pass channelLogger in. NettyClientTransport transport = new NettyClientTransport( - serverAddress, channelFactory, channelOptions, group, + serverAddress, channelFactory, channelOptions, group, allocator, localNegotiator, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), @@ -619,6 +627,7 @@ public void close() { protocolNegotiator.close(); groupPool.returnObject(group); + allocatorPool.returnObject(allocator); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index e51670ae27e..61730c8b53a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -17,6 +17,7 @@ package io.grpc.netty; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; +import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.annotations.VisibleForTesting; @@ -43,6 +44,7 @@ import io.grpc.internal.TransportTracer; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -74,6 +76,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final SocketAddress remoteAddress; private final ChannelFactory channelFactory; private final EventLoopGroup group; + private final ByteBufAllocator allocator; private final ProtocolNegotiator negotiator; private final String authorityString; private final AsciiString authority; @@ -101,11 +104,11 @@ class NettyClientTransport implements ConnectionClientTransport { private final LocalSocketPicker localSocketPicker; private final ChannelLogger channelLogger; private final boolean useGetForSafeMethods; - private PooledByteBufAllocator allocator; NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, EventLoopGroup group, + ByteBufAllocator allocator, ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, @@ -116,6 +119,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.negotiationScheme = this.negotiator.scheme(); this.remoteAddress = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); + this.allocator = Preconditions.checkNotNull(allocator, "allocator"); this.channelFactory = channelFactory; this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); this.flowControlWindow = flowControlWindow; @@ -226,7 +230,6 @@ public Runnable start(Listener transportListener) { ChannelHandler negotiationHandler = negotiator.newHandler(handler); Bootstrap b = new Bootstrap(); - allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR); b.option(ALLOCATOR, allocator); b.attr(LOGGER_KEY, channelLogger); b.group(eventLoop); diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 1f5aa4b13e9..9d4a29f4bff 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -95,7 +95,7 @@ public ByteBufAllocator create() { } else { maxOrder = PooledByteBufAllocator.defaultMaxOrder(); } - return PooledByteBufAllocator( + return new PooledByteBufAllocator( PooledByteBufAllocator.defaultPreferDirect(), PooledByteBufAllocator.defaultNumHeapArena(), PooledByteBufAllocator.defaultNumDirectArena(), @@ -106,7 +106,12 @@ public ByteBufAllocator create() { PooledByteBufAllocator.defaultNormalCacheSize(), PooledByteBufAllocator.defaultUseCacheForAllThreads()); } - } + + @Override + public void close(ByteBufAllocator allocator) { + // TODO(zhangkun83): do anything? + } + }; public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index dc1a4b99105..6a16d61bbe3 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -62,9 +62,11 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestUtils; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelDuplexHandler; @@ -123,6 +125,7 @@ public class NettyClientTransportTest { private final LinkedBlockingQueue serverTransportAttributesList = new LinkedBlockingQueue<>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); + private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR); private final EchoServerListener serverListener = new EchoServerListener(); private final InternalChannelz channelz = new InternalChannelz(); private Runnable tooManyPingsRunnable = new Runnable() { @@ -153,6 +156,7 @@ public void teardown() throws Exception { } group.shutdownGracefully(0, 10, TimeUnit.SECONDS); + SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator); } @Test @@ -190,7 +194,7 @@ public void setSoLingerChannelOption() throws IOException { channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -435,7 +439,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(CantConstructChannel.class), - new HashMap, Object>(), group, + new HashMap, Object>(), group, allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), @@ -705,7 +709,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; } NettyClientTransport transport = new NettyClientTransport( - address, channelFactory, new HashMap, Object>(), group, + address, channelFactory, new HashMap, Object>(), group, allocator, negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, From c1ebf2f7a172a370e7c88a1d8055227a109d7ad2 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Wed, 6 Nov 2019 15:56:22 -0800 Subject: [PATCH 4/8] Use custom PooledByteBufAllocator for server --- .../io/grpc/netty/NettyChannelBuilder.java | 2 +- .../main/java/io/grpc/netty/NettyServer.java | 26 +++++++++++++++---- .../io/grpc/netty/NettyServerBuilder.java | 5 +++- .../grpc/netty/NettyClientTransportTest.java | 4 ++- .../java/io/grpc/netty/NettyServerTest.java | 4 +++ 5 files changed, 33 insertions(+), 8 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 8f999403a74..173ed6533e8 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -625,9 +625,9 @@ public void close() { } closed = true; + allocatorPool.returnObject(allocator); protocolNegotiator.close(); groupPool.returnObject(group); - allocatorPool.returnObject(allocator); } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index f0cfea99d7e..2f168a8cf0f 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; +import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; @@ -37,6 +38,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -73,8 +75,10 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxStreamsPerConnection; private final ObjectPool bossGroupPool; private final ObjectPool workerGroupPool; + private final ObjectPool allocatorPool; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; + private ByteBufAllocator allocator; private ServerListener listener; private Channel channel; private final int flowControlWindow; @@ -87,7 +91,8 @@ class NettyServer implements InternalServer, InternalWithLogId { private final long maxConnectionAgeGraceInNanos; private final boolean permitKeepAliveWithoutCalls; private final long permitKeepAliveTimeInNanos; - private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); + private final ReferenceCounted sharedResourceReferenceCounter = + new SharedResourceReferenceCounter(); private final List streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; private final InternalChannelz channelz; @@ -100,6 +105,7 @@ class NettyServer implements InternalServer, InternalWithLogId { Map, ?> channelOptions, ObjectPool bossGroupPool, ObjectPool workerGroupPool, + ObjectPool allocatorPool, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -115,8 +121,10 @@ class NettyServer implements InternalServer, InternalWithLogId { this.channelOptions = new HashMap, Object>(channelOptions); this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool"); this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool"); + this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool"); this.bossGroup = bossGroupPool.getObject(); this.workerGroup = workerGroupPool.getObject(); + this.allocator = allocatorPool.getObject(); this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracerFactory = transportTracerFactory; @@ -155,6 +163,7 @@ public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); ServerBootstrap b = new ServerBootstrap(); + b.option(ALLOCATOR, allocator); b.group(bossGroup, workerGroup); b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. @@ -210,7 +219,7 @@ public void initChannel(Channel ch) { } // `channel` shutdown can race with `ch` initialization, so this is only safe to increment // inside the lock. - eventLoopReferenceCounter.retain(); + sharedResourceReferenceCounter.retain(); transportListener = listener.transportCreated(transport); } @@ -224,7 +233,7 @@ final class LoopReleaser implements ChannelFutureListener { public void operationComplete(ChannelFuture future) throws Exception { if (!done) { done = true; - eventLoopReferenceCounter.release(); + sharedResourceReferenceCounter.release(); } } } @@ -281,7 +290,7 @@ public void operationComplete(ChannelFuture future) throws Exception { synchronized (NettyServer.this) { listener.serverShutdown(); } - eventLoopReferenceCounter.release(); + sharedResourceReferenceCounter.release(); } }); try { @@ -305,7 +314,7 @@ public String toString() { .toString(); } - class EventLoopReferenceCounter extends AbstractReferenceCounted { + class SharedResourceReferenceCounter extends AbstractReferenceCounted { @Override protected void deallocate() { try { @@ -320,6 +329,13 @@ protected void deallocate() { } } finally { workerGroup = null; + try { + if (allocator != null) { + allocatorPool.returnObject(allocator); + } + } finally { + allocator = null; + } } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 22fab43a2eb..07fa2a0dd71 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -35,6 +35,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -78,6 +79,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); + private static final ObjectPool ALLOCATOR_POOL = + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final List listenAddresses = new ArrayList<>(); @@ -534,7 +537,7 @@ protected List buildTransportServers( for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool, - workerEventLoopGroupPool, negotiator, streamTracerFactories, + workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 6a16d61bbe3..2642f32ba41 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -63,6 +63,7 @@ import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.SharedResourceHolder; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestUtils; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; @@ -727,7 +728,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr TestUtils.testServerAddress(new InetSocketAddress(0)), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator, + new FixedObjectPool<>(group), new FixedObjectPool<>(group), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index 51f0a50f27e..174ca1e0e0e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -62,6 +62,7 @@ public void getPort() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -100,6 +101,7 @@ public void getPort_notStarted() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -138,6 +140,7 @@ public void childChannelOptions() throws Exception { channelOptions, SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -188,6 +191,7 @@ public void channelzListenSocket() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), From ce460319c6e3ff0fca8a09f340338470bf60a388 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Thu, 7 Nov 2019 14:10:03 -0800 Subject: [PATCH 5/8] Also make server channels use custom allocator --- netty/src/main/java/io/grpc/netty/NettyServer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 2f168a8cf0f..a1f534770c0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -164,6 +164,7 @@ public void start(ServerListener serverListener) throws IOException { ServerBootstrap b = new ServerBootstrap(); b.option(ALLOCATOR, allocator); + b.childOption(ALLOCATOR, allocator); b.group(bossGroup, workerGroup); b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. From c7dc30b92d89369e9f862b7bd21ac97701d591ad Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 8 Nov 2019 13:26:38 -0800 Subject: [PATCH 6/8] Clarify comments --- netty/src/main/java/io/grpc/netty/Utils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 9d4a29f4bff..55fb3b02203 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -109,7 +109,7 @@ public ByteBufAllocator create() { @Override public void close(ByteBufAllocator allocator) { - // TODO(zhangkun83): do anything? + // PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC. } }; From 5aafb0b4504a5d67e8c0ddc3a324bd39c2855809 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 8 Nov 2019 13:38:21 -0800 Subject: [PATCH 7/8] More comments --- netty/src/main/java/io/grpc/netty/Utils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 55fb3b02203..2e19d167221 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -91,6 +91,9 @@ class Utils { public ByteBufAllocator create() { int maxOrder; if (System.getProperty("io.netty.allocator.maxOrder") == null) { + // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is + // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be + // 1MiB, thus reducing the maxOrder to 7. maxOrder = 7; } else { maxOrder = PooledByteBufAllocator.defaultMaxOrder(); From 01d6e4df28eade645008a8f72da1c118f77ade31 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Thu, 14 Nov 2019 11:00:49 -0800 Subject: [PATCH 8/8] Raise chunk size to 2MiB for lower performance impact. Make it protected by system property --- netty/src/main/java/io/grpc/netty/Utils.java | 39 +++++++++++--------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 2e19d167221..4822edbdec5 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -89,25 +89,30 @@ class Utils { new Resource() { @Override public ByteBufAllocator create() { - int maxOrder; - if (System.getProperty("io.netty.allocator.maxOrder") == null) { - // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is - // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be - // 1MiB, thus reducing the maxOrder to 7. - maxOrder = 7; + if (Boolean.parseBoolean( + System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) { + int maxOrder; + if (System.getProperty("io.netty.allocator.maxOrder") == null) { + // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is + // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be + // 2MiB, thus reducing the maxOrder to 8. + maxOrder = 8; + } else { + maxOrder = PooledByteBufAllocator.defaultMaxOrder(); + } + return new PooledByteBufAllocator( + PooledByteBufAllocator.defaultPreferDirect(), + PooledByteBufAllocator.defaultNumHeapArena(), + PooledByteBufAllocator.defaultNumDirectArena(), + PooledByteBufAllocator.defaultPageSize(), + maxOrder, + PooledByteBufAllocator.defaultTinyCacheSize(), + PooledByteBufAllocator.defaultSmallCacheSize(), + PooledByteBufAllocator.defaultNormalCacheSize(), + PooledByteBufAllocator.defaultUseCacheForAllThreads()); } else { - maxOrder = PooledByteBufAllocator.defaultMaxOrder(); + return ByteBufAllocator.DEFAULT; } - return new PooledByteBufAllocator( - PooledByteBufAllocator.defaultPreferDirect(), - PooledByteBufAllocator.defaultNumHeapArena(), - PooledByteBufAllocator.defaultNumDirectArena(), - PooledByteBufAllocator.defaultPageSize(), - maxOrder, - PooledByteBufAllocator.defaultTinyCacheSize(), - PooledByteBufAllocator.defaultSmallCacheSize(), - PooledByteBufAllocator.defaultNormalCacheSize(), - PooledByteBufAllocator.defaultUseCacheForAllThreads()); } @Override