Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-25547: Thread pools should release unused resources #2922

Merged
merged 2 commits into from
Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -50,11 +51,10 @@
* and a <code>Runnable</code> that handles the object that is added to the queue.
*
* <p>In order to create a new service, create an instance of this class and
* then do: <code>instance.startExecutorService("myService");</code>. When done
* call {@link #shutdown()}.
* then do: <code>instance.startExecutorService(executorConfig);</code>. {@link ExecutorConfig}
* wraps the configuration needed by this service. When done call {@link #shutdown()}.
*
* <p>In order to use the service created above, call
* {@link #submit(EventHandler)}.
* <p>In order to use the service created above, call {@link #submit(EventHandler)}.
*/
@InterfaceAudience.Private
public class ExecutorService {
Expand All @@ -81,15 +81,16 @@ public ExecutorService(final String servername) {
/**
* Start an executor service with a given name. If there was a service already
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
* @param config Configuration to use for the executor.
*/
public void startExecutorService(String name, int maxThreads) {
public void startExecutorService(final ExecutorConfig config) {
bharathv marked this conversation as resolved.
Show resolved Hide resolved
final String name = config.getName();
Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) {
throw new RuntimeException("An executor service with the name " + key +
" is already running!");
}
return new Executor(key, maxThreads);
return new Executor(config);
});

LOG.debug(
Expand Down Expand Up @@ -119,34 +120,32 @@ Executor getExecutor(final ExecutorType type) {
}

Executor getExecutor(String name) {
Executor executor = this.executorMap.get(name);
return executor;
return this.executorMap.get(name);
}

public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
return getExecutor(type).getThreadPoolExecutor();
}

public void startExecutorService(final ExecutorType type, final int maxThreads) {
public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service {} already running on {}", this,
this.servername);
return;
}
startExecutorService(name, maxThreads);
startExecutorService(config.setName(name));
}

/**
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
* paths should use this method to get the executor, should not start executor by using
* {@link ExecutorService#startExecutorService(ExecutorType, int)}
* {@link ExecutorService#startExecutorService(ExecutorConfig)}
*/
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
return executorMap
.computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
.getThreadPoolExecutor();
return executorMap.computeIfAbsent(name, (executorName) ->
new Executor(config.setName(name))).getThreadPoolExecutor();
}

public void submit(final EventHandler eh) {
Expand Down Expand Up @@ -182,12 +181,65 @@ public Map<String, ExecutorStatus> getAllExecutorStatuses() {
return ret;
}

/**
* Configuration wrapper for {@link Executor}.
*/
public static class ExecutorConfig {
// Refer to ThreadPoolExecutor javadoc for details of these configuration.
// Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
// implementation.
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
private int corePoolSize = -1;
private boolean allowCoreThreadTimeout = false;
private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
private String name;

public int getCorePoolSize() {
return corePoolSize;
}

public ExecutorConfig setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
return this;
}

public boolean allowCoreThreadTimeout() {
return allowCoreThreadTimeout;
}

/**
* Allows timing out of core threads. Good to set this for non-critical thread pools for
* release of unused resources. Refer to {@link ThreadPoolExecutor#allowCoreThreadTimeOut}
* for additional details.
*/
Comment on lines +210 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect!

public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
this.allowCoreThreadTimeout = allowCoreThreadTimeout;
return this;
}

public String getName() {
return Preconditions.checkNotNull(name);
}

public ExecutorConfig setName(String name) {
this.name = name;
return this;
}

public long getKeepAliveTimeMillis() {
return keepAliveTimeMillis;
}

public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
this.keepAliveTimeMillis = keepAliveTimeMillis;
return this;
}
}

/**
* Executor instance.
*/
static class Executor {
// how long to retain excess threads
static final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
final TrackingThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue
Expand All @@ -196,13 +248,15 @@ static class Executor {
private static final AtomicLong seqids = new AtomicLong(0);
private final long id;

protected Executor(String name, int maxThreads) {
protected Executor(ExecutorConfig config) {
this.id = seqids.incrementAndGet();
this.name = name;
this.name = config.getName();
// create the thread pool executor
this.threadPoolExecutor = new TrackingThreadPoolExecutor(
maxThreads, maxThreads,
keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
// setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
config.getCorePoolSize(), config.getCorePoolSize(),
config.getKeepAliveTimeMillis(), TimeUnit.MILLISECONDS, q);
this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great

// name the threads for this threadpool
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat(this.name + "-%d");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
Expand Down Expand Up @@ -1310,30 +1311,44 @@ public TableStateManager getTableStateManager() {
*/
private void startServiceThreads() throws IOException {
// Start the executor service pools
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
final int masterOpenRegionPoolSize = conf.getInt(
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
final int masterCloseRegionPoolSize = conf.getInt(
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
final int masterServerMetaOpsThreads = conf.getInt(
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));
new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
final int masterLogReplayThreads = conf.getInt(
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
final int masterSnapshotThreads = conf.getInt(
SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
.setAllowCoreThreadTimeout(true));

// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
// tables.
// Any time changing this maxThreads to > 1, pls see the comment at
// AccessController#postCompletedCreateTableAction
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
this.executorService.startExecutorService(
ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
startProcedureExecutor();

// Create cleaner thread pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
Expand Down Expand Up @@ -2040,36 +2041,58 @@ private void startServices() throws IOException {
choreService.scheduleChore(compactedFileDischarger);

// Start executor services
final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
new ExecutorConfig().setCorePoolSize(openRegionThreads));
final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
new ExecutorConfig().setCorePoolSize(openMetaThreads));
final int openPriorityRegionThreads =
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
final int closeRegionThreads =
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
new ExecutorConfig().setCorePoolSize(closeRegionThreads));
final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
new ExecutorConfig().setCorePoolSize(closeMetaThreads));
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
final int storeScannerParallelSeekThreads =
conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
.setAllowCoreThreadTimeout(true));
}
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
final int logReplayOpsThreads = conf.getInt(
HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
// Start the threads for compacted files discharger
final int compactionDischargerThreads =
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
final int regionReplicaFlushThreads = conf.getInt(
"hbase.regionserver.region.replica.flusher.threads", conf.getInt(
"hbase.regionserver.executor.openregion.threads", 3));
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
conf.getInt("hbase.regionserver.region.replica.flusher.threads",
conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
}
final int refreshPeerThreads =
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
final int replaySyncReplicationWALThreads =
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
final int switchRpcThrottleThreads =
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));

Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.wal.WAL;
Expand Down Expand Up @@ -96,8 +97,9 @@ private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest(

ThreadPoolExecutor getInMemoryCompactionPool() {
if (rsServices != null) {
ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
inMemoryPoolSize);
config);
} else {
// this could only happen in tests
return getInMemoryCompactionPoolForTest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
Expand Down Expand Up @@ -58,8 +59,8 @@ public void testMetricsCollect() throws Exception {

// Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService(
ExecutorType.RS_PARALLEL_SEEK, maxThreads);
executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
new ExecutorConfig().setCorePoolSize(maxThreads));

MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null);
Expand Down
Loading