Skip to content

Commit

Permalink
HBASE-25547: Thread pools should release unused resources
Browse files Browse the repository at this point in the history
Enabled timeout of core threads for some low priority thread pools to
reclaim unused resources.
  • Loading branch information
bharathv committed Feb 16, 2021
1 parent b6649a8 commit 881e4d9
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -50,11 +51,10 @@
* and a <code>Runnable</code> that handles the object that is added to the queue.
*
* <p>In order to create a new service, create an instance of this class and
* then do: <code>instance.startExecutorService("myService");</code>. When done
* call {@link #shutdown()}.
* then do: <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig}
* wraps the configuration needed by this service. When done call {@link #shutdown()}.
*
* <p>In order to use the service created above, call
* {@link #submit(EventHandler)}.
* <p>In order to use the service created above, call {@link #submit(EventHandler)}.
*/
@InterfaceAudience.Private
public class ExecutorService {
Expand All @@ -81,15 +81,16 @@ public ExecutorService(final String servername) {
/**
* Start an executor service with a given name. If there was a service already
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
* @param config Configuration to use for the executor.
*/
public void startExecutorService(String name, int maxThreads) {
public void startExecutorService(final ExecutorConfig config) {
final String name = config.getName();
Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) {
throw new RuntimeException("An executor service with the name " + key +
" is already running!");
}
return new Executor(key, maxThreads);
return new Executor(config);
});

LOG.debug(
Expand Down Expand Up @@ -119,34 +120,32 @@ Executor getExecutor(final ExecutorType type) {
}

Executor getExecutor(String name) {
Executor executor = this.executorMap.get(name);
return executor;
return this.executorMap.get(name);
}

public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
return getExecutor(type).getThreadPoolExecutor();
}

public void startExecutorService(final ExecutorType type, final int maxThreads) {
public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service {} already running on {}", this,
this.servername);
return;
}
startExecutorService(name, maxThreads);
startExecutorService(config.setName(name));
}

/**
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
* paths should use this method to get the executor, should not start executor by using
* {@link ExecutorService#startExecutorService(ExecutorType, int)}
* {@link ExecutorService#startExecutorService(ExecutorConfig)}
*/
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
return executorMap
.computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
.getThreadPoolExecutor();
return executorMap.computeIfAbsent(name, (executorName) ->
new Executor(config.setName(name))).getThreadPoolExecutor();
}

public void submit(final EventHandler eh) {
Expand Down Expand Up @@ -182,12 +181,60 @@ public Map<String, ExecutorStatus> getAllExecutorStatuses() {
return ret;
}

/**
* Configuration wrapper for {@link Executor}.
*/
public static class ExecutorConfig {
// Refer to ThreadPoolExecutor javadoc for details of these configuration.
// Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
// implementation.
public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
private int corePoolSize = -1;
private boolean allowCoreThreadTimeout = false;
private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
private String name;

public int getCorePoolSize() {
return corePoolSize;
}

public ExecutorConfig setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}

public boolean allowCoreThreadTimeout() {
return allowCoreThreadTimeout;
}

public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
return this;
}

public String getName() {
return Preconditions.checkNotNull(name);
}

public ExecutorConfig setName(String name) {
this.name = name;
return this;
}

public long getKeepAliveTimeMillis() {
return keepAliveTimeMillis;
}

public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
this.keepAliveTimeMillis = keepAliveTimeMillis;
return this;
}
}

/**
* Executor instance.
*/
static class Executor {
// how long to retain excess threads
static final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
final TrackingThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue
Expand All @@ -196,13 +243,15 @@ static class Executor {
private static final AtomicLong seqids = new AtomicLong(0);
private final long id;

protected Executor(String name, int maxThreads) {
protected Executor(ExecutorConfig config) {
this.id = seqids.incrementAndGet();
this.name = name;
this.name = config.getName();
// create the thread pool executor
this.threadPoolExecutor = new TrackingThreadPoolExecutor(
maxThreads, maxThreads,
keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
// setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
config.getCorePoolSize(), config.getCorePoolSize(),
config.getKeepAliveTimeMillis(), TimeUnit.MILLISECONDS, q);
this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
// name the threads for this threadpool
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat(this.name + "-%d");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
Expand Down Expand Up @@ -1310,30 +1311,45 @@ public TableStateManager getTableStateManager() {
*/
private void startServiceThreads() throws IOException {
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
final int masterOpenRegionPoolSize = conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
final int masterCloseRegionPoolSize = conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
final int masterServerMetaOpsThreads = conf.getInt(
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));
new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
final int masterLogReplayThreads = conf.getInt(
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
final int masterSnapshotThreads = conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
// Allow core threads to timeout since this is non-critical.
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
.setAllowCoreThreadTimeout(true));

// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
// tables.
// Any time changing this maxThreads to > 1, pls see the comment at
// AccessController#postCompletedCreateTableAction
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
this.executorService.startExecutorService(
ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
startProcedureExecutor();

// Create cleaner thread pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
Expand Down Expand Up @@ -2040,36 +2041,58 @@ private void startServices() throws IOException {
choreService.scheduleChore(compactedFileDischarger);

// Start executor services
final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
new ExecutorConfig().setCorePoolSize(openRegionThreads));
final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
new ExecutorConfig().setCorePoolSize(openMetaThreads));
final int openPriorityRegionThreads =
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
final int closeRegionThreads =
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
new ExecutorConfig().setCorePoolSize(closeRegionThreads));
final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
new ExecutorConfig().setCorePoolSize(closeMetaThreads));
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
final int storeScannerParallelSeekThreads =
conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
.setAllowCoreThreadTimeout(true));
}
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
final int logReplayOpsThreads = conf.getInt(
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
// Start the threads for compacted files discharger
final int compactionDischargerThreads =
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
final int regionReplicaFlushThreads = conf.getInt(
"hbase.regionserver.region.replica.flusher.threads", conf.getInt(
"hbase.regionserver.executor.openregion.threads", 3));
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
}
final int refreshPeerThreads =
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
final int replaySyncReplicationWALThreads =
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
final int switchRpcThrottleThreads =
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));

Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.wal.WAL;
Expand Down Expand Up @@ -96,8 +97,9 @@ private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest(

ThreadPoolExecutor getInMemoryCompactionPool() {
if (rsServices != null) {
ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
inMemoryPoolSize);
config);
} else {
// this could only happen in tests
return getInMemoryCompactionPoolForTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
Expand Down Expand Up @@ -58,8 +59,8 @@ public void testMetricsCollect() throws Exception {

// Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService(
ExecutorType.RS_PARALLEL_SEEK, maxThreads);
executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
new ExecutorConfig().setCorePoolSize(maxThreads));

MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null);
Expand Down
Loading

0 comments on commit 881e4d9

Please sign in to comment.