Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -75,8 +74,6 @@ public final class NettyChannelBuilder
new ReflectiveChannelFactory<>(Utils.DEFAULT_CLIENT_CHANNEL_TYPE);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final Map<ChannelOption<?>, Object> channelOptions =
new HashMap<>();
Expand Down Expand Up @@ -406,7 +403,7 @@ protected ClientTransportFactory buildTransportFactory() {

return new NettyTransportFactory(
negotiator, channelFactory, channelOptions,
eventLoopGroupPool, ALLOCATOR_POOL, flowControlWindow, maxInboundMessageSize(),
eventLoopGroupPool, flowControlWindow, maxInboundMessageSize(),
maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls,
transportTracerFactory, localSocketPicker, useGetForSafeMethods);
}
Expand Down Expand Up @@ -521,8 +518,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto
private final Map<ChannelOption<?>, ?> channelOptions;
private final ObjectPool<? extends EventLoopGroup> groupPool;
private final EventLoopGroup group;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private final ByteBufAllocator allocator;
private final int flowControlWindow;
private final int maxMessageSize;
private final int maxHeaderListSize;
Expand All @@ -538,7 +533,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto
NettyTransportFactory(ProtocolNegotiator protocolNegotiator,
ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, ObjectPool<? extends EventLoopGroup> groupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls,
TransportTracer.Factory transportTracerFactory, LocalSocketPicker localSocketPicker,
Expand All @@ -548,8 +542,6 @@ private static final class NettyTransportFactory implements ClientTransportFacto
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.groupPool = groupPool;
this.group = groupPool.getObject();
this.allocatorPool = allocatorPool;
this.allocator = allocatorPool.getObject();
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
this.maxHeaderListSize = maxHeaderListSize;
Expand Down Expand Up @@ -588,7 +580,7 @@ public void run() {

// TODO(carl-mastrangelo): Pass channelLogger in.
NettyClientTransport transport = new NettyClientTransport(
serverAddress, channelFactory, channelOptions, group, allocator,
serverAddress, channelFactory, channelOptions, group,
localNegotiator, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, options.getAuthority(), options.getUserAgent(),
Expand All @@ -609,7 +601,6 @@ public void close() {
}
closed = true;

allocatorPool.returnObject(allocator);
protocolNegotiator.close();
groupPool.returnObject(group);
}
Expand Down
6 changes: 1 addition & 5 deletions netty/src/main/java/io/grpc/netty/NettyClientTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.grpc.internal.TransportTracer;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -76,7 +75,6 @@ class NettyClientTransport implements ConnectionClientTransport {
private final SocketAddress remoteAddress;
private final ChannelFactory<? extends Channel> channelFactory;
private final EventLoopGroup group;
private final ByteBufAllocator allocator;
private final ProtocolNegotiator negotiator;
private final String authorityString;
private final AsciiString authority;
Expand Down Expand Up @@ -108,7 +106,6 @@ class NettyClientTransport implements ConnectionClientTransport {
NettyClientTransport(
SocketAddress address, ChannelFactory<? extends Channel> channelFactory,
Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
ByteBufAllocator allocator,
ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
Expand All @@ -119,7 +116,6 @@ class NettyClientTransport implements ConnectionClientTransport {
this.negotiationScheme = this.negotiator.scheme();
this.remoteAddress = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.allocator = Preconditions.checkNotNull(allocator, "allocator");
this.channelFactory = channelFactory;
this.channelOptions = Preconditions.checkNotNull(channelOptions, "channelOptions");
this.flowControlWindow = flowControlWindow;
Expand Down Expand Up @@ -230,7 +226,7 @@ public Runnable start(Listener transportListener) {
ChannelHandler negotiationHandler = negotiator.newHandler(handler);

Bootstrap b = new Bootstrap();
b.option(ALLOCATOR, allocator);
b.option(ALLOCATOR, Utils.getByteBufAllocator());
b.attr(LOGGER_KEY, channelLogger);
b.group(eventLoop);
b.channelFactory(channelFactory);
Expand Down
17 changes: 2 additions & 15 deletions netty/src/main/java/io/grpc/netty/NettyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -75,10 +74,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
private final int maxStreamsPerConnection;
private final ObjectPool<? extends EventLoopGroup> bossGroupPool;
private final ObjectPool<? extends EventLoopGroup> workerGroupPool;
private final ObjectPool<? extends ByteBufAllocator> allocatorPool;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private ByteBufAllocator allocator;
private ServerListener listener;
private Channel channel;
private final int flowControlWindow;
Expand All @@ -105,7 +102,6 @@ class NettyServer implements InternalServer, InternalWithLogId {
Map<ChannelOption<?>, ?> channelOptions,
ObjectPool<? extends EventLoopGroup> bossGroupPool,
ObjectPool<? extends EventLoopGroup> workerGroupPool,
ObjectPool<? extends ByteBufAllocator> allocatorPool,
ProtocolNegotiator protocolNegotiator,
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
Expand All @@ -121,10 +117,8 @@ class NettyServer implements InternalServer, InternalWithLogId {
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroupPool = checkNotNull(bossGroupPool, "bossGroupPool");
this.workerGroupPool = checkNotNull(workerGroupPool, "workerGroupPool");
this.allocatorPool = checkNotNull(allocatorPool, "allocatorPool");
this.bossGroup = bossGroupPool.getObject();
this.workerGroup = workerGroupPool.getObject();
this.allocator = allocatorPool.getObject();
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.transportTracerFactory = transportTracerFactory;
Expand Down Expand Up @@ -163,8 +157,8 @@ public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");

ServerBootstrap b = new ServerBootstrap();
b.option(ALLOCATOR, allocator);
b.childOption(ALLOCATOR, allocator);
b.option(ALLOCATOR, Utils.getByteBufAllocator());
b.childOption(ALLOCATOR, Utils.getByteBufAllocator());
b.group(bossGroup, workerGroup);
b.channelFactory(channelFactory);
// For non-socket based channel, the option will be ignored.
Expand Down Expand Up @@ -331,13 +325,6 @@ protected void deallocate() {
}
} finally {
workerGroup = null;
try {
if (allocator != null) {
allocatorPool.returnObject(allocator);
}
} finally {
allocator = null;
}
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -80,8 +79,6 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP);
private static final ObjectPool<? extends EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP_POOL =
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP);
private static final ObjectPool<ByteBufAllocator> ALLOCATOR_POOL =
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR);

private final List<SocketAddress> listenAddresses = new ArrayList<>();

Expand Down Expand Up @@ -544,7 +541,7 @@ protected List<NettyServer> buildTransportServers(
for (SocketAddress listenAddress : listenAddresses) {
NettyServer transportServer = new NettyServer(
listenAddress, channelFactory, channelOptions, bossEventLoopGroupPool,
workerEventLoopGroupPool, ALLOCATOR_POOL, negotiator, streamTracerFactories,
workerEventLoopGroupPool, negotiator, streamTracerFactories,
getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
Expand Down
68 changes: 34 additions & 34 deletions netty/src/main/java/io/grpc/netty/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,41 +86,37 @@ class Utils {
public static final Resource<EventLoopGroup> DEFAULT_BOSS_EVENT_LOOP_GROUP;
public static final Resource<EventLoopGroup> DEFAULT_WORKER_EVENT_LOOP_GROUP;

public static final Resource<ByteBufAllocator> BYTE_BUF_ALLOCATOR =
new Resource<ByteBufAllocator>() {
@Override
public ByteBufAllocator create() {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}
return new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
return ByteBufAllocator.DEFAULT;
}
// This class is initialized on first use, thus provides delayed allocator creation.
private static final class ByteBufAllocatorHolder {
private static final ByteBufAllocator allocator;

static {
if (Boolean.parseBoolean(
System.getProperty("io.grpc.netty.useCustomAllocator", "false"))) {
int maxOrder;
if (System.getProperty("io.netty.allocator.maxOrder") == null) {
// See the implementation of PooledByteBufAllocator. DEFAULT_MAX_ORDER in there is
// 11, which makes chunk size to be 8192 << 11 = 16 MiB. We want the chunk size to be
// 2MiB, thus reducing the maxOrder to 8.
maxOrder = 8;
} else {
maxOrder = PooledByteBufAllocator.defaultMaxOrder();
}

@Override
public void close(ByteBufAllocator allocator) {
// PooledByteBufAllocator doesn't provide a shutdown method. Leaving it to GC.
}
};
allocator = new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
maxOrder,
PooledByteBufAllocator.defaultTinyCacheSize(),
PooledByteBufAllocator.defaultSmallCacheSize(),
PooledByteBufAllocator.defaultNormalCacheSize(),
PooledByteBufAllocator.defaultUseCacheForAllThreads());
} else {
allocator = ByteBufAllocator.DEFAULT;
}
}
}

public static final ChannelFactory<? extends ServerChannel> DEFAULT_SERVER_CHANNEL_FACTORY;
public static final Class<? extends Channel> DEFAULT_CLIENT_CHANNEL_TYPE;
Expand Down Expand Up @@ -148,6 +144,10 @@ public void close(ByteBufAllocator allocator) {
}
}

public static ByteBufAllocator getByteBufAllocator() {
return ByteBufAllocatorHolder.allocator;
}

public static Metadata convertHeaders(Http2Headers http2Headers) {
if (http2Headers instanceof GrpcHttp2InboundHeaders) {
GrpcHttp2InboundHeaders h = (GrpcHttp2InboundHeaders) http2Headers;
Expand Down
14 changes: 4 additions & 10 deletions netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,9 @@
import io.grpc.internal.ServerStreamListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.SharedResourcePool;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.testing.TestUtils;
import io.grpc.netty.NettyChannelBuilder.LocalSocketPicker;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelDuplexHandler;
Expand Down Expand Up @@ -126,7 +123,6 @@ public class NettyClientTransportTest {
private final LinkedBlockingQueue<Attributes> serverTransportAttributesList =
new LinkedBlockingQueue<>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final ByteBufAllocator allocator = SharedResourceHolder.get(Utils.BYTE_BUF_ALLOCATOR);
private final EchoServerListener serverListener = new EchoServerListener();
private final InternalChannelz channelz = new InternalChannelz();
private Runnable tooManyPingsRunnable = new Runnable() {
Expand Down Expand Up @@ -157,7 +153,6 @@ public void teardown() throws Exception {
}

group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
SharedResourceHolder.release(Utils.BYTE_BUF_ALLOCATOR, allocator);
}

@Test
Expand Down Expand Up @@ -195,7 +190,7 @@ public void setSoLingerChannelOption() throws IOException {
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(NioSocketChannel.class), channelOptions, group,
allocator, newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority,
null /* user agent */, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY,
new SocketPicker(), new FakeChannelLogger(), false);
Expand Down Expand Up @@ -440,7 +435,7 @@ public void failingToConstructChannelShouldFailGracefully() throws Exception {
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, new ReflectiveChannelFactory<>(CantConstructChannel.class),
new HashMap<ChannelOption<?>, Object>(), group, allocator,
new HashMap<ChannelOption<?>, Object>(), group,
newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable, new TransportTracer(), Attributes.EMPTY, new SocketPicker(),
Expand Down Expand Up @@ -710,7 +705,7 @@ private NettyClientTransport newTransport(ProtocolNegotiator negotiator, int max
keepAliveTimeNano = KEEPALIVE_TIME_NANOS_DISABLED;
}
NettyClientTransport transport = new NettyClientTransport(
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group, allocator,
address, channelFactory, new HashMap<ChannelOption<?>, Object>(), group,
negotiator, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable,
Expand All @@ -728,8 +723,7 @@ private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) thr
TestUtils.testServerAddress(new InetSocketAddress(0)),
new ReflectiveChannelFactory<>(NioServerSocketChannel.class),
new HashMap<ChannelOption<?>, Object>(),
new FixedObjectPool<>(group), new FixedObjectPool<>(group),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR), negotiator,
new FixedObjectPool<>(group), new FixedObjectPool<>(group), negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
Expand Down
4 changes: 0 additions & 4 deletions netty/src/test/java/io/grpc/netty/NettyServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class TestProtocolNegotiator implements ProtocolNegotiator {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
protocolNegotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -128,7 +127,6 @@ public void getPort_notStarted() throws Exception {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -167,7 +165,6 @@ public void childChannelOptions() throws Exception {
channelOptions,
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down Expand Up @@ -218,7 +215,6 @@ public void channelzListenSocket() throws Exception {
new HashMap<ChannelOption<?>, Object>(),
SharedResourcePool.forResource(Utils.DEFAULT_BOSS_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP),
SharedResourcePool.forResource(Utils.BYTE_BUF_ALLOCATOR),
ProtocolNegotiators.plaintext(),
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
Expand Down