Skip to content

Commit

Permalink
GEODE-7361: sever ties to PoolStatHelper & CoreLoggingExecutors (#4238)
Browse files Browse the repository at this point in the history
TcpServer no longer depends on PoolStateHelper or CoreLoggingExecutors
  • Loading branch information
Bill committed Oct 29, 2019
1 parent 2cb2c48 commit 7868cb6
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
Expand Up @@ -45,6 +45,7 @@
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.net.SSLConfigurationFactory;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
Expand Down Expand Up @@ -200,8 +201,9 @@ private static class FakeTcpServer extends TcpServer {
public FakeTcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, RestartableTcpHandler handler, PoolStatHelper poolHelper,
String threadName) {
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName,
(socket, input, firstByte) -> false, DistributionStats::getStatTime);
super(port, bind_address, sslConfig, cfg, handler, threadName,
(socket, input, firstByte) -> false, DistributionStats::getStatTime,
TcpServerFactory.createExecutorServiceSupplier(poolHelper));
if (cfg == null) {
cfg = new DistributionConfigImpl(sslConfig);
}
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.apache.geode.distributed.internal.RestartableTcpHandler;
import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.net.DummySocketCreator;
import org.apache.geode.internal.net.SSLConfigurationFactory;
import org.apache.geode.internal.net.SocketCreator;
Expand Down Expand Up @@ -142,8 +143,9 @@ private class DummyTcpServer extends TcpServer {
public DummyTcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, RestartableTcpHandler handler, PoolStatHelper poolHelper,
String threadName) {
super(port, bind_address, sslConfig, cfg, handler, poolHelper, threadName,
(socket, input, firstByte) -> false, DistributionStats::getStatTime);
super(port, bind_address, sslConfig, cfg, handler, threadName,
(socket, input, firstByte) -> false, DistributionStats::getStatTime,
TcpServerFactory.createExecutorServiceSupplier(poolHelper));
if (cfg == null) {
cfg = new DistributionConfigImpl(sslConfig);
}
Expand Down
Expand Up @@ -32,9 +32,9 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import javax.net.ssl.SSLException;

Expand All @@ -46,9 +46,7 @@
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.internal.GemFireVersion;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThread;
import org.apache.geode.internal.net.SocketCreator;
Expand Down Expand Up @@ -105,9 +103,6 @@ public class TcpServer {
public static int OLDTESTVERSION = OLDGOSSIPVERSION;

public static final long SHUTDOWN_WAIT_TIME = 60 * 1000;
private static final int MAX_POOL_SIZE =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.MAX_POOL_SIZE", 100);
private static final int POOL_IDLE_TIMEOUT = 60 * 1000;

private static final Logger logger = LogService.getLogger();

Expand All @@ -123,10 +118,11 @@ public class TcpServer {
private ServerSocket srv_sock = null;
private InetAddress bind_address;
private volatile boolean shuttingDown = false;
private final PoolStatHelper poolHelper;
private final TcpHandler handler;

private ExecutorService executor;
private final Supplier<ExecutorService> executorServiceSupplier;

private final String threadName;
private volatile Thread serverThread;

Expand All @@ -148,15 +144,16 @@ private static Map<Integer, Short> createGossipToVersionMap() {
}

public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
DistributionConfigImpl cfg, TcpHandler handler,
String threadName, ProtocolChecker protocolChecker,
final LongSupplier nanoTimeSupplier) {
final LongSupplier nanoTimeSupplier,
final Supplier<ExecutorService> executorServiceSupplier) {
this.port = port;
this.bind_address = bind_address;
this.handler = handler;
this.poolHelper = poolHelper;
this.protocolChecker = protocolChecker;
this.executor = createExecutor(poolHelper);
this.executorServiceSupplier = executorServiceSupplier;
this.executor = executorServiceSupplier.get();
this.threadName = threadName;
this.nanoTimeSupplier = nanoTimeSupplier;

Expand All @@ -176,15 +173,10 @@ protected SocketCreator getSocketCreator() {
return socketCreator;
}

private static ExecutorService createExecutor(PoolStatHelper poolHelper) {
return CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("locator request thread ",
MAX_POOL_SIZE, poolHelper, POOL_IDLE_TIMEOUT, new ThreadPoolExecutor.CallerRunsPolicy());
}

public void restarting() throws IOException {
this.shuttingDown = false;
startServerThread();
this.executor = createExecutor(this.poolHelper);
this.executor = executorServiceSupplier.get();
logger.info("TcpServer@" + System.identityHashCode(this)
+ " restarting: completed. Server thread=" + this.serverThread + '@'
+ System.identityHashCode(this.serverThread) + ";alive=" + this.serverThread.isAlive());
Expand Down
Expand Up @@ -17,9 +17,13 @@

import java.net.InetAddress;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Supplier;

import org.apache.logging.log4j.Logger;

import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalLocator;
Expand All @@ -28,9 +32,13 @@
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolServiceLoader;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.logging.LogService;

public class TcpServerFactory {
private static final int MAX_POOL_SIZE =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "TcpServer.MAX_POOL_SIZE", 100);
private static final int POOL_IDLE_TIMEOUT = 60 * 1000;
private final ClientProtocolServiceLoader clientProtocolServiceLoader;
static final Logger logger = LogService.getLogger();

Expand All @@ -42,8 +50,13 @@ public TcpServer makeTcpServer(int port, InetAddress bind_address, Properties ss
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
String threadName, InternalLocator internalLocator) {

return new TcpServer(port, bind_address, sslConfig, cfg, handler, poolHelper,
return new TcpServer(port, bind_address, sslConfig, cfg, handler,
threadName, new ProtocolCheckerImpl(internalLocator, clientProtocolServiceLoader),
DistributionStats::getStatTime);
DistributionStats::getStatTime, createExecutorServiceSupplier(poolHelper));
}

public static Supplier<ExecutorService> createExecutorServiceSupplier(PoolStatHelper poolHelper) {
return () -> CoreLoggingExecutors.newThreadPoolWithSynchronousFeed("locator request thread ",
MAX_POOL_SIZE, poolHelper, POOL_IDLE_TIMEOUT, new ThreadPoolExecutor.CallerRunsPolicy());
}
}
Expand Up @@ -31,9 +31,7 @@
import org.apache.geode.SystemFailure;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.PoolStatHelper;
import org.apache.geode.internal.GemFireVersion;
import org.apache.geode.internal.logging.CoreLoggingExecutors;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingExecutors;
import org.apache.geode.internal.logging.LoggingThread;
Expand Down Expand Up @@ -75,14 +73,9 @@ public class TcpServerDependenciesTest {
.or(type(SocketCreatorFactory.class))
.or(type(SSLConfigurationFactory.class))

// TODO - stats
.or(type(PoolStatHelper.class))
.or(type(CoreLoggingExecutors.class))

// TODO - cancel excpetion
.or(type(CancelException.class))


// TODO - config
.or(type(DistributionConfigImpl.class))
.or(type(DistributionConfig.class))
Expand Down

0 comments on commit 7868cb6

Please sign in to comment.