Skip to content

Commit

Permalink
NIFI-12370 Fixed Distributed Map Cache Client Service Shutdown
Browse files Browse the repository at this point in the history
- Moved EventLoopGroup from CacheClientChannelPoolFactory to DistributedCacheClient to enable closing the EventLoopGroup after closing the ChannelPool

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8027.
  • Loading branch information
exceptionfactory authored and pvillard31 committed Nov 15, 2023
1 parent 70b9eae commit 28421e3
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.nifi.event.transport.netty.channel.pool.InitializingChannelPoolHandler;
import org.apache.nifi.remote.VersionNegotiatorFactory;
import org.apache.nifi.ssl.SSLContextService;
Expand All @@ -39,10 +37,9 @@
* methods. Cache clients include the NiFi services {@link DistributedSetCacheClientService}
* and {@link DistributedMapCacheClientService}.
*/
public class CacheClientChannelPoolFactory {
class CacheClientChannelPoolFactory {

private static final int MAX_PENDING_ACQUIRES = 1024;
private static final boolean DAEMON_THREAD_ENABLED = true;

private int maxConnections = Runtime.getRuntime().availableProcessors() * 2;

Expand All @@ -64,20 +61,19 @@ public void setMaxConnections(final int maxConnections) {
* @param sslContextService the SSL context (if any) associated with requests to the service; if not specified,
* communications will not be encrypted
* @param factory creator of object used to broker the version of the distributed cache protocol with the service
* @param poolName channel pool name, used for threads name prefix
* @param eventLoopGroup Netty Event Loop Group providing threads for managing connections
* @return a channel pool object from which {@link Channel} objects may be obtained
*/
public ChannelPool createChannelPool(final String hostname,
final int port,
final int timeoutMillis,
final SSLContextService sslContextService,
final VersionNegotiatorFactory factory,
final String poolName) {
final EventLoopGroup eventLoopGroup) {
final SSLContext sslContext = (sslContextService == null) ? null : sslContextService.createContext();
final EventLoopGroup group = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
final Bootstrap bootstrap = new Bootstrap();
final CacheClientChannelInitializer initializer = new CacheClientChannelInitializer(sslContext, factory, Duration.ofMillis(timeoutMillis), Duration.ofMillis(timeoutMillis));
bootstrap.group(group)
bootstrap.group(eventLoopGroup)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeoutMillis)
.remoteAddress(hostname, port)
.channel(NioSocketChannel.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.apache.nifi.distributed.cache.client;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.nifi.distributed.cache.client.adapter.InboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import org.apache.nifi.remote.VersionNegotiatorFactory;
Expand All @@ -31,11 +34,15 @@
*/
public class DistributedCacheClient {

private static final boolean DAEMON_THREAD_ENABLED = true;

/**
* The pool of network connections used to service client requests.
*/
private final ChannelPool channelPool;

private final EventLoopGroup eventLoopGroup;

/**
* Constructor.
*
Expand All @@ -53,9 +60,10 @@ protected DistributedCacheClient(final String hostname,
final SSLContextService sslContextService,
final VersionNegotiatorFactory factory,
final String identifier) {
String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier);
final String poolName = String.format("%s[%s]", getClass().getSimpleName(), identifier);
this.eventLoopGroup = new NioEventLoopGroup(new DefaultThreadFactory(poolName, DAEMON_THREAD_ENABLED));
this.channelPool = new CacheClientChannelPoolFactory().createChannelPool(
hostname, port, timeoutMillis, sslContextService, factory, poolName);
hostname, port, timeoutMillis, sslContextService, factory, eventLoopGroup);
}

/**
Expand All @@ -76,9 +84,10 @@ protected void invoke(final OutboundAdapter outboundAdapter, final InboundAdapte
}

/**
* Shutdown {@link ChannelPool} cleanly.
* Close Channel Pool and supporting Event Loop Group
*/
protected void closeChannelPool() {
channelPool.close();
eventLoopGroup.close();
}
}

0 comments on commit 28421e3

Please sign in to comment.