diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index 84d47b8feee..173ed6533e8 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -40,6 +40,7 @@ import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; @@ -73,6 +74,8 @@ public final class NettyChannelBuilder new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE); private static final ObjectPool DEFAULT_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); + private static final ObjectPool ALLOCATOR_POOL = + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final Map, Object> channelOptions = new HashMap<>(); @@ -419,7 +422,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory( negotiator, channelFactory, channelOptions, - eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), + eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, useGetForSafeMethods); } @@ -534,6 +537,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final Map, ?> channelOptions; private final ObjectPool groupPool; private final EventLoopGroup group; + private final ObjectPool allocatorPool; + private final ByteBufAllocator allocator; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -549,6 +554,7 @@ private static final class NettyTransportFactory implements ClientTransportFacto NettyTransportFactory(ProtocolNegotiator protocolNegotiator, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool groupPool, + ObjectPool allocatorPool, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, @@ -558,6 +564,8 @@ private static final class NettyTransportFactory implements ClientTransportFacto this.channelOptions = new HashMap, Object>(channelOptions); this.groupPool = groupPool; this.group = groupPool.getObject(); + this.allocatorPool = allocatorPool; + this.allocator = allocatorPool.getObject(); this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -596,7 +604,7 @@ public void run() { // TODO(carl-mastrangelo): Pass channelLogger in. NettyClientTransport transport = new NettyClientTransport( - serverAddress, channelFactory, channelOptions, group, + serverAddress, channelFactory, channelOptions, group, allocator, localNegotiator, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), @@ -617,6 +625,7 @@ public void close() { } closed = true; + allocatorPool.returnObject(allocator); protocolNegotiator.close(); groupPool.returnObject(group); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index c1359d28143..61730c8b53a 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -17,6 +17,7 @@ package io.grpc.netty; import static io.grpc.internal.GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED; +import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.annotations.VisibleForTesting; @@ -43,6 +44,7 @@ import io.grpc.internal.TransportTracer; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -74,6 +76,7 @@ class NettyClientTransport implements ConnectionClientTransport { private final SocketAddress remoteAddress; private final ChannelFactory channelFactory; private final EventLoopGroup group; + private final ByteBufAllocator allocator; private final ProtocolNegotiator negotiator; private final String authorityString; private final AsciiString authority; @@ -105,6 +108,7 @@ class NettyClientTransport implements ConnectionClientTransport { NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, EventLoopGroup group, + ByteBufAllocator allocator, ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, @@ -115,6 +119,7 @@ class NettyClientTransport implements ConnectionClientTransport { this.negotiationScheme = this.negotiator.scheme(); this.remoteAddress = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); + this.allocator = Preconditions.checkNotNull(allocator, "allocator"); this.channelFactory = channelFactory; this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); this.flowControlWindow = flowControlWindow; @@ -225,6 +230,7 @@ public Runnable start(Listener transportListener) { ChannelHandler negotiationHandler = negotiator.newHandler(handler); Bootstrap b = new Bootstrap(); + b.option(ALLOCATOR, allocator); b.attr(LOGGER_KEY, channelLogger); b.group(eventLoop); b.channelFactory(channelFactory); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index f0cfea99d7e..a1f534770c0 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; +import static io.netty.channel.ChannelOption.ALLOCATOR; import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; @@ -37,6 +38,7 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -73,8 +75,10 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxStreamsPerConnection; private final ObjectPool bossGroupPool; private final ObjectPool workerGroupPool; + private final ObjectPool allocatorPool; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; + private ByteBufAllocator allocator; private ServerListener listener; private Channel channel; private final int flowControlWindow; @@ -87,7 +91,8 @@ class NettyServer implements InternalServer, InternalWithLogId { private final long maxConnectionAgeGraceInNanos; private final boolean permitKeepAliveWithoutCalls; private final long permitKeepAliveTimeInNanos; - private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); + private final ReferenceCounted sharedResourceReferenceCounter = + new SharedResourceReferenceCounter(); private final List streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; private final InternalChannelz channelz; @@ -100,6 +105,7 @@ class NettyServer implements InternalServer, InternalWithLogId { Map, ?> channelOptions, ObjectPool bossGroupPool, ObjectPool workerGroupPool, + ObjectPool allocatorPool, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -115,8 +121,10 @@ class NettyServer implements InternalServer, InternalWithLogId { this.channelOptions = new HashMap, Object>(channelOptions); this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool"); this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool"); + this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool"); this.bossGroup = bossGroupPool.getObject(); this.workerGroup = workerGroupPool.getObject(); + this.allocator = allocatorPool.getObject(); this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracerFactory = transportTracerFactory; @@ -155,6 +163,8 @@ public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); ServerBootstrap b = new ServerBootstrap(); + b.option(ALLOCATOR, allocator); + b.childOption(ALLOCATOR, allocator); b.group(bossGroup, workerGroup); b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. @@ -210,7 +220,7 @@ public void initChannel(Channel ch) { } // `channel` shutdown can race with `ch` initialization, so this is only safe to increment // inside the lock. - eventLoopReferenceCounter.retain(); + sharedResourceReferenceCounter.retain(); transportListener = listener.transportCreated(transport); } @@ -224,7 +234,7 @@ final class LoopReleaser implements ChannelFutureListener { public void operationComplete(ChannelFuture future) throws Exception { if (!done) { done = true; - eventLoopReferenceCounter.release(); + sharedResourceReferenceCounter.release(); } } } @@ -281,7 +291,7 @@ public void operationComplete(ChannelFuture future) throws Exception { synchronized (NettyServer.this) { listener.serverShutdown(); } - eventLoopReferenceCounter.release(); + sharedResourceReferenceCounter.release(); } }); try { @@ -305,7 +315,7 @@ public String toString() { .toString(); } - class EventLoopReferenceCounter extends AbstractReferenceCounted { + class SharedResourceReferenceCounter extends AbstractReferenceCounted { @Override protected void deallocate() { try { @@ -320,6 +330,13 @@ protected void deallocate() { } } finally { workerGroup = null; + try { + if (allocator != null) { + allocatorPool.returnObject(allocator); + } + } finally { + allocator = null; + } } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 22fab43a2eb..07fa2a0dd71 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -35,6 +35,7 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -78,6 +79,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); + private static final ObjectPool ALLOCATOR_POOL = + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final List listenAddresses = new ArrayList<>(); @@ -534,7 +537,7 @@ protected List buildTransportServers( for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool, - workerEventLoopGroupPool, negotiator, streamTracerFactories, + workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index 41d5699ea78..4822edbdec5 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -34,6 +34,8 @@ import io.grpc.internal.SharedResourceHolder.Resource; import io.grpc.netty.GrpcHttp2HeadersUtils.GrpcHttp2InboundHeaders; import io.grpc.netty.NettySocketSupport.NativeSocketOptions; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelFactory; @@ -83,6 +85,42 @@ class Utils { public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP; public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; + public static final Resource BYTE_BUF_ALLOCATOR = + new Resource() { + @Override + public ByteBufAllocator create() { + if (Boolean.parseBoolean( + System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) { + int maxOrder; + if (System.getProperty("io.netty.allocator.maxOrder") == null) { + // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is + // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be + // 2MiB, thus reducing the maxOrder to 8. + maxOrder = 8; + } else { + maxOrder = PooledByteBufAllocator.defaultMaxOrder(); + } + return new PooledByteBufAllocator( + PooledByteBufAllocator.defaultPreferDirect(), + PooledByteBufAllocator.defaultNumHeapArena(), + PooledByteBufAllocator.defaultNumDirectArena(), + PooledByteBufAllocator.defaultPageSize(), + maxOrder, + PooledByteBufAllocator.defaultTinyCacheSize(), + PooledByteBufAllocator.defaultSmallCacheSize(), + PooledByteBufAllocator.defaultNormalCacheSize(), + PooledByteBufAllocator.defaultUseCacheForAllThreads()); + } else { + return ByteBufAllocator.DEFAULT; + } + } + + @Override + public void close(ByteBufAllocator allocator) { + // PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC. + } + }; + public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index dc1a4b99105..2642f32ba41 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -62,9 +62,12 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; +import io.grpc.internal.SharedResourceHolder; +import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestUtils; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelDuplexHandler; @@ -123,6 +126,7 @@ public class NettyClientTransportTest { private final LinkedBlockingQueue serverTransportAttributesList = new LinkedBlockingQueue<>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); + private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR); private final EchoServerListener serverListener = new EchoServerListener(); private final InternalChannelz channelz = new InternalChannelz(); private Runnable tooManyPingsRunnable = new Runnable() { @@ -153,6 +157,7 @@ public void teardown() throws Exception { } group.shutdownGracefully(0, 10, TimeUnit.SECONDS); + SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator); } @Test @@ -190,7 +195,7 @@ public void setSoLingerChannelOption() throws IOException { channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, - newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -435,7 +440,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(CantConstructChannel.class), - new HashMap, Object>(), group, + new HashMap, Object>(), group, allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), @@ -705,7 +710,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; } NettyClientTransport transport = new NettyClientTransport( - address, channelFactory, new HashMap, Object>(), group, + address, channelFactory, new HashMap, Object>(), group, allocator, negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, @@ -723,7 +728,8 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr TestUtils.testServerAddress(new InetSocketAddress(0)), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator, + new FixedObjectPool<>(group), new FixedObjectPool<>(group), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index 51f0a50f27e..174ca1e0e0e 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -62,6 +62,7 @@ public void getPort() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -100,6 +101,7 @@ public void getPort_notStarted() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -138,6 +140,7 @@ public void childChannelOptions() throws Exception { channelOptions, SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -188,6 +191,7 @@ public void channelzListenSocket() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), + SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(),