diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java index 82a529ecd975..905de8dc52d5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelPoolFactory.java @@ -20,13 +20,11 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.ChannelHealthChecker; import io.netty.channel.pool.ChannelPool; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler; import org.apache.nifi.remote.VersionNegotiatorFactory; import org.apache.nifi.ssl.SSLContextService; @@ -39,10 +37,9 @@ * methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService} * and {@link DistributedMapCacheClientService}. */ -public class CacheClientChannelPoolFactory { +class CacheClientChannelPoolFactory { private static final int MAX_PENDING_ACQUIRES = 1024; - private static final boolean DAEMON_THREAD_ENABLED = true; private int maxConnections = Runtime.getRuntime().availableProcessors() * 2; @@ -64,7 +61,7 @@ public void setMaxConnections(final int maxConnections) { * @param sslContextService the SSL context (if any) associated with requests to the service; if not specified, * communications will not be encrypted * @param factory creator of object used to broker the version of the distributed cache protocol with the service - * @param poolName channel pool name, used for threads name prefix + * @param eventLoopGroup Netty Event Loop Group providing threads for managing connections * @return a channel pool object from which {@link Channel} objects may be obtained */ public ChannelPool createChannelPool(final String hostname, @@ -72,12 +69,11 @@ public ChannelPool createChannelPool(final String hostname, final int timeoutMillis, final SSLContextService sslContextService, final VersionNegotiatorFactory factory, - final String poolName) { + final EventLoopGroup eventLoopGroup) { final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext(); - final EventLoopGroup group = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED)); final Bootstrap bootstrap = new Bootstrap(); final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, factory, Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis)); - bootstrap.group(group) + bootstrap.group(eventLoopGroup) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis) .remoteAddress(hostname, port) .channel(NioSocketChannel.class); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java index 8ebf9bacbda4..6a65e26ed190 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedCacheClient.java @@ -17,7 +17,10 @@ package org.apache.nifi.distributed.cache.client; import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.ChannelPool; +import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; import org.apache.nifi.remote.VersionNegotiatorFactory; @@ -31,11 +34,15 @@ */ public class DistributedCacheClient { + private static final boolean DAEMON_THREAD_ENABLED = true; + /** * The pool of network connections used to service client requests. */ private final ChannelPool channelPool; + private final EventLoopGroup eventLoopGroup; + /** * Constructor. * @@ -53,9 +60,10 @@ protected DistributedCacheClient(final String hostname, final SSLContextService sslContextService, final VersionNegotiatorFactory factory, final String identifier) { - String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier); + final String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier); + this.eventLoopGroup = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED)); this.channelPool = new CacheClientChannelPoolFactory().createChannelPool( - hostname, port, timeoutMillis, sslContextService, factory, poolName); + hostname, port, timeoutMillis, sslContextService, factory, eventLoopGroup); } /** @@ -76,9 +84,10 @@ protected void invoke(final OutboundAdapter outboundAdapter, final InboundAdapte } /** - * Shutdown {@link ChannelPool} cleanly. + * Close Channel Pool and supporting Event Loop Group */ protected void closeChannelPool() { channelPool.close(); + eventLoopGroup.close(); } }