diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java index b8de4082b75..0a0f952c531 100644 --- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java @@ -40,7 +40,6 @@ import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; @@ -75,8 +74,6 @@ public final class NettyChannelBuilder new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE); private static final ObjectPool DEFAULT_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); - private static final ObjectPool ALLOCATOR_POOL = - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final Map, Object> channelOptions = new HashMap<>(); @@ -406,7 +403,7 @@ protected ClientTransportFactory buildTransportFactory() { return new NettyTransportFactory( negotiator, channelFactory, channelOptions, - eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(), + eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(), maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls, transportTracerFactory, localSocketPicker, useGetForSafeMethods); } @@ -521,8 +518,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto private final Map, ?> channelOptions; private final ObjectPool groupPool; private final EventLoopGroup group; - private final ObjectPool allocatorPool; - private final ByteBufAllocator allocator; private final int flowControlWindow; private final int maxMessageSize; private final int maxHeaderListSize; @@ -538,7 +533,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto NettyTransportFactory(ProtocolNegotiator protocolNegotiator, ChannelFactory channelFactory, Map, ?> channelOptions, ObjectPool groupPool, - ObjectPool allocatorPool, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker, @@ -548,8 +542,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto this.channelOptions = new HashMap, Object>(channelOptions); this.groupPool = groupPool; this.group = groupPool.getObject(); - this.allocatorPool = allocatorPool; - this.allocator = allocatorPool.getObject(); this.flowControlWindow = flowControlWindow; this.maxMessageSize = maxMessageSize; this.maxHeaderListSize = maxHeaderListSize; @@ -588,7 +580,7 @@ public void run() { // TODO(carl-mastrangelo): Pass channelLogger in. NettyClientTransport transport = new NettyClientTransport( - serverAddress, channelFactory, channelOptions, group, allocator, + serverAddress, channelFactory, channelOptions, group, localNegotiator, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos, keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(), @@ -609,7 +601,6 @@ public void close() { } closed = true; - allocatorPool.returnObject(allocator); protocolNegotiator.close(); groupPool.returnObject(group); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 61730c8b53a..753c1e4842b 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -44,7 +44,6 @@ import io.grpc.internal.TransportTracer; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -76,7 +75,6 @@ class NettyClientTransport implements ConnectionClientTransport { private final SocketAddress remoteAddress; private final ChannelFactory channelFactory; private final EventLoopGroup group; - private final ByteBufAllocator allocator; private final ProtocolNegotiator negotiator; private final String authorityString; private final AsciiString authority; @@ -108,7 +106,6 @@ class NettyClientTransport implements ConnectionClientTransport { NettyClientTransport( SocketAddress address, ChannelFactory channelFactory, Map, ?> channelOptions, EventLoopGroup group, - ByteBufAllocator allocator, ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize, int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent, @@ -119,7 +116,6 @@ class NettyClientTransport implements ConnectionClientTransport { this.negotiationScheme = this.negotiator.scheme(); this.remoteAddress = Preconditions.checkNotNull(address, "address"); this.group = Preconditions.checkNotNull(group, "group"); - this.allocator = Preconditions.checkNotNull(allocator, "allocator"); this.channelFactory = channelFactory; this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions"); this.flowControlWindow = flowControlWindow; @@ -230,7 +226,7 @@ public Runnable start(Listener transportListener) { ChannelHandler negotiationHandler = negotiator.newHandler(handler); Bootstrap b = new Bootstrap(); - b.option(ALLOCATOR, allocator); + b.option(ALLOCATOR, Utils.getByteBufAllocator()); b.attr(LOGGER_KEY, channelLogger); b.group(eventLoop); b.channelFactory(channelFactory); diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index ec1dfc2c914..5c400198817 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -38,7 +38,6 @@ import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; @@ -75,10 +74,8 @@ class NettyServer implements InternalServer, InternalWithLogId { private final int maxStreamsPerConnection; private final ObjectPool bossGroupPool; private final ObjectPool workerGroupPool; - private final ObjectPool allocatorPool; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; - private ByteBufAllocator allocator; private ServerListener listener; private Channel channel; private final int flowControlWindow; @@ -105,7 +102,6 @@ class NettyServer implements InternalServer, InternalWithLogId { Map, ?> channelOptions, ObjectPool bossGroupPool, ObjectPool workerGroupPool, - ObjectPool allocatorPool, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -121,10 +117,8 @@ class NettyServer implements InternalServer, InternalWithLogId { this.channelOptions = new HashMap, Object>(channelOptions); this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool"); this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool"); - this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool"); this.bossGroup = bossGroupPool.getObject(); this.workerGroup = workerGroupPool.getObject(); - this.allocator = allocatorPool.getObject(); this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories"); this.transportTracerFactory = transportTracerFactory; @@ -163,8 +157,8 @@ public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); ServerBootstrap b = new ServerBootstrap(); - b.option(ALLOCATOR, allocator); - b.childOption(ALLOCATOR, allocator); + b.option(ALLOCATOR, Utils.getByteBufAllocator()); + b.childOption(ALLOCATOR, Utils.getByteBufAllocator()); b.group(bossGroup, workerGroup); b.channelFactory(channelFactory); // For non-socket based channel, the option will be ignored. @@ -331,13 +325,6 @@ protected void deallocate() { } } finally { workerGroup = null; - try { - if (allocator != null) { - allocatorPool.returnObject(allocator); - } - } finally { - allocator = null; - } } } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 96d087f0357..d9547c0e6c1 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -35,7 +35,6 @@ import io.grpc.internal.KeepAliveManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SharedResourcePool; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; @@ -80,8 +79,6 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL = SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP); - private static final ObjectPool ALLOCATOR_POOL = - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR); private final List listenAddresses = new ArrayList<>(); @@ -544,7 +541,7 @@ protected List buildTransportServers( for (SocketAddress listenAddress : listenAddresses) { NettyServer transportServer = new NettyServer( listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool, - workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories, + workerEventLoopGroupPool, negotiator, streamTracerFactories, getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, diff --git a/netty/src/main/java/io/grpc/netty/Utils.java b/netty/src/main/java/io/grpc/netty/Utils.java index df8d7a51cc8..c5df0cb15d5 100644 --- a/netty/src/main/java/io/grpc/netty/Utils.java +++ b/netty/src/main/java/io/grpc/netty/Utils.java @@ -86,41 +86,37 @@ class Utils { public static final Resource DEFAULT_BOSS_EVENT_LOOP_GROUP; public static final Resource DEFAULT_WORKER_EVENT_LOOP_GROUP; - public static final Resource BYTE_BUF_ALLOCATOR = - new Resource() { - @Override - public ByteBufAllocator create() { - if (Boolean.parseBoolean( - System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) { - int maxOrder; - if (System.getProperty("io.netty.allocator.maxOrder") == null) { - // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is - // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be - // 2MiB, thus reducing the maxOrder to 8. - maxOrder = 8; - } else { - maxOrder = PooledByteBufAllocator.defaultMaxOrder(); - } - return new PooledByteBufAllocator( - PooledByteBufAllocator.defaultPreferDirect(), - PooledByteBufAllocator.defaultNumHeapArena(), - PooledByteBufAllocator.defaultNumDirectArena(), - PooledByteBufAllocator.defaultPageSize(), - maxOrder, - PooledByteBufAllocator.defaultTinyCacheSize(), - PooledByteBufAllocator.defaultSmallCacheSize(), - PooledByteBufAllocator.defaultNormalCacheSize(), - PooledByteBufAllocator.defaultUseCacheForAllThreads()); - } else { - return ByteBufAllocator.DEFAULT; - } + // This class is initialized on first use, thus provides delayed allocator creation. + private static final class ByteBufAllocatorHolder { + private static final ByteBufAllocator allocator; + + static { + if (Boolean.parseBoolean( + System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) { + int maxOrder; + if (System.getProperty("io.netty.allocator.maxOrder") == null) { + // See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is + // 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be + // 2MiB, thus reducing the maxOrder to 8. + maxOrder = 8; + } else { + maxOrder = PooledByteBufAllocator.defaultMaxOrder(); } - - @Override - public void close(ByteBufAllocator allocator) { - // PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC. - } - }; + allocator = new PooledByteBufAllocator( + PooledByteBufAllocator.defaultPreferDirect(), + PooledByteBufAllocator.defaultNumHeapArena(), + PooledByteBufAllocator.defaultNumDirectArena(), + PooledByteBufAllocator.defaultPageSize(), + maxOrder, + PooledByteBufAllocator.defaultTinyCacheSize(), + PooledByteBufAllocator.defaultSmallCacheSize(), + PooledByteBufAllocator.defaultNormalCacheSize(), + PooledByteBufAllocator.defaultUseCacheForAllThreads()); + } else { + allocator = ByteBufAllocator.DEFAULT; + } + } + } public static final ChannelFactory DEFAULT_SERVER_CHANNEL_FACTORY; public static final Class DEFAULT_CLIENT_CHANNEL_TYPE; @@ -148,6 +144,10 @@ public void close(ByteBufAllocator allocator) { } } + public static ByteBufAllocator getByteBufAllocator() { + return ByteBufAllocatorHolder.allocator; + } + public static Metadata convertHeaders(Http2Headers http2Headers) { if (http2Headers instanceof GrpcHttp2InboundHeaders) { GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers; diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 2642f32ba41..dc1a4b99105 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -62,12 +62,9 @@ import io.grpc.internal.ServerStreamListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; -import io.grpc.internal.SharedResourceHolder; -import io.grpc.internal.SharedResourcePool; import io.grpc.internal.TransportTracer; import io.grpc.internal.testing.TestUtils; import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker; -import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelConfig; import io.netty.channel.ChannelDuplexHandler; @@ -126,7 +123,6 @@ public class NettyClientTransportTest { private final LinkedBlockingQueue serverTransportAttributesList = new LinkedBlockingQueue<>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); - private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR); private final EchoServerListener serverListener = new EchoServerListener(); private final InternalChannelz channelz = new InternalChannelz(); private Runnable tooManyPingsRunnable = new Runnable() { @@ -157,7 +153,6 @@ public void teardown() throws Exception { } group.shutdownGracefully(0, 10, TimeUnit.SECONDS); - SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator); } @Test @@ -195,7 +190,7 @@ public void setSoLingerChannelOption() throws IOException { channelOptions.put(ChannelOption.SO_LINGER, soLinger); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group, - allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, + newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), new FakeChannelLogger(), false); @@ -440,7 +435,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception { authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); NettyClientTransport transport = new NettyClientTransport( address, new ReflectiveChannelFactory<>(CantConstructChannel.class), - new HashMap, Object>(), group, allocator, + new HashMap, Object>(), group, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority, null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(), @@ -710,7 +705,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED; } NettyClientTransport transport = new NettyClientTransport( - address, channelFactory, new HashMap, Object>(), group, allocator, + address, channelFactory, new HashMap, Object>(), group, negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize, keepAliveTimeNano, keepAliveTimeoutNano, false, authority, userAgent, tooManyPingsRunnable, @@ -728,8 +723,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr TestUtils.testServerAddress(new InetSocketAddress(0)), new ReflectiveChannelFactory<>(NioServerSocketChannel.class), new HashMap, Object>(), - new FixedObjectPool<>(group), new FixedObjectPool<>(group), - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator, + new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index f1f96791cd5..b873785e4ff 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -83,7 +83,6 @@ class TestProtocolNegotiator implements ProtocolNegotiator { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), protocolNegotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -128,7 +127,6 @@ public void getPort_notStarted() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -167,7 +165,6 @@ public void childChannelOptions() throws Exception { channelOptions, SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(), @@ -218,7 +215,6 @@ public void channelzListenSocket() throws Exception { new HashMap, Object>(), SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP), SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP), - SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), ProtocolNegotiators.plaintext(), Collections.emptyList(), TransportTracer.getDefaultFactory(),