diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
index 8d2d7e2afb64..558c418b1216 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorService.java
@@ -37,6 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -50,11 +51,10 @@
* and a Runnable
that handles the object that is added to the queue.
*
*
In order to create a new service, create an instance of this class and
- * then do: instance.startExecutorService("myService");
. When done
- * call {@link #shutdown()}.
+ * then do: instance.startExecutorService(executorConfig);
. {@link ExecutorConfig}
+ * wraps the configuration needed by this service. When done call {@link #shutdown()}.
*
- *
In order to use the service created above, call
- * {@link #submit(EventHandler)}.
+ *
In order to use the service created above, call {@link #submit(EventHandler)}.
*/
@InterfaceAudience.Private
public class ExecutorService {
@@ -81,15 +81,16 @@ public ExecutorService(final String servername) {
/**
* Start an executor service with a given name. If there was a service already
* started with the same name, this throws a RuntimeException.
- * @param name Name of the service to start.
+ * @param config Configuration to use for the executor.
*/
- public void startExecutorService(String name, int maxThreads) {
+ public void startExecutorService(final ExecutorConfig config) {
+ final String name = config.getName();
Executor hbes = this.executorMap.compute(name, (key, value) -> {
if (value != null) {
throw new RuntimeException("An executor service with the name " + key +
" is already running!");
}
- return new Executor(key, maxThreads);
+ return new Executor(config);
});
LOG.debug(
@@ -119,34 +120,32 @@ Executor getExecutor(final ExecutorType type) {
}
Executor getExecutor(String name) {
- Executor executor = this.executorMap.get(name);
- return executor;
+ return this.executorMap.get(name);
}
public ThreadPoolExecutor getExecutorThreadPool(final ExecutorType type) {
return getExecutor(type).getThreadPoolExecutor();
}
- public void startExecutorService(final ExecutorType type, final int maxThreads) {
+ public void startExecutorService(final ExecutorType type, final ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
if (isExecutorServiceRunning(name)) {
LOG.debug("Executor service {} already running on {}", this,
this.servername);
return;
}
- startExecutorService(name, maxThreads);
+ startExecutorService(config.setName(name));
}
/**
* Initialize the executor lazily, Note if an executor need to be initialized lazily, then all
* paths should use this method to get the executor, should not start executor by using
- * {@link ExecutorService#startExecutorService(ExecutorType, int)}
+ * {@link ExecutorService#startExecutorService(ExecutorConfig)}
*/
- public ThreadPoolExecutor getExecutorLazily(ExecutorType type, int maxThreads) {
+ public ThreadPoolExecutor getExecutorLazily(ExecutorType type, ExecutorConfig config) {
String name = type.getExecutorName(this.servername);
- return executorMap
- .computeIfAbsent(name, (executorName) -> new Executor(executorName, maxThreads))
- .getThreadPoolExecutor();
+ return executorMap.computeIfAbsent(name, (executorName) ->
+ new Executor(config.setName(name))).getThreadPoolExecutor();
}
public void submit(final EventHandler eh) {
@@ -182,12 +181,60 @@ public Map getAllExecutorStatuses() {
return ret;
}
+ /**
+ * Configuration wrapper for {@link Executor}.
+ */
+ public static class ExecutorConfig {
+ // Refer to ThreadPoolExecutor javadoc for details of these configuration.
+ // Argument validation and bound checks delegated to the underlying ThreadPoolExecutor
+ // implementation.
+ public static final long KEEP_ALIVE_TIME_MILLIS_DEFAULT = 1000;
+ private int corePoolSize = -1;
+ private boolean allowCoreThreadTimeout = false;
+ private long keepAliveTimeMillis = KEEP_ALIVE_TIME_MILLIS_DEFAULT;
+ private String name;
+
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ public ExecutorConfig setCorePoolSize(int corePoolSize) {
+ this.corePoolSize = corePoolSize;
+ return this;
+ }
+
+ public boolean allowCoreThreadTimeout() {
+ return allowCoreThreadTimeout;
+ }
+
+ public ExecutorConfig setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
+ this.allowCoreThreadTimeout = allowCoreThreadTimeout;
+ return this;
+ }
+
+ public String getName() {
+ return Preconditions.checkNotNull(name);
+ }
+
+ public ExecutorConfig setName(String name) {
+ this.name = name;
+ return this;
+ }
+
+ public long getKeepAliveTimeMillis() {
+ return keepAliveTimeMillis;
+ }
+
+ public ExecutorConfig setKeepAliveTimeMillis(long keepAliveTimeMillis) {
+ this.keepAliveTimeMillis = keepAliveTimeMillis;
+ return this;
+ }
+ }
+
/**
* Executor instance.
*/
static class Executor {
- // how long to retain excess threads
- static final long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
final TrackingThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue
@@ -196,13 +243,15 @@ static class Executor {
private static final AtomicLong seqids = new AtomicLong(0);
private final long id;
- protected Executor(String name, int maxThreads) {
+ protected Executor(ExecutorConfig config) {
this.id = seqids.incrementAndGet();
- this.name = name;
+ this.name = config.getName();
// create the thread pool executor
this.threadPoolExecutor = new TrackingThreadPoolExecutor(
- maxThreads, maxThreads,
- keepAliveTimeInMillis, TimeUnit.MILLISECONDS, q);
+ // setting maxPoolSize > corePoolSize has no effect since we use an unbounded task queue.
+ config.getCorePoolSize(), config.getCorePoolSize(),
+ config.getKeepAliveTimeMillis(), TimeUnit.MILLISECONDS, q);
+ this.threadPoolExecutor.allowCoreThreadTimeOut(config.allowCoreThreadTimeout());
// name the threads for this threadpool
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat(this.name + "-%d");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 9911f014d639..fcda0576ec15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -88,6 +88,7 @@
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.MasterStoppedException;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.favored.FavoredNodesManager;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -1310,30 +1311,45 @@ public TableStateManager getTableStateManager() {
*/
private void startServiceThreads() throws IOException {
// Start the executor service pools
- this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
- HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
- this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
- HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
+ final int masterOpenRegionPoolSize = conf.getInt(
+ HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT);
+ this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
+ new ExecutorConfig().setCorePoolSize(masterOpenRegionPoolSize));
+ final int masterCloseRegionPoolSize = conf.getInt(
+ HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT);
+ this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
+ new ExecutorConfig().setCorePoolSize(masterCloseRegionPoolSize));
+ final int masterServerOpThreads = conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
+ HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
- conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
- HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
+ new ExecutorConfig().setCorePoolSize(masterServerOpThreads));
+ final int masterServerMetaOpsThreads = conf.getInt(
+ HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
+ HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT);
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
- conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
- HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
- this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
- HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
- this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, conf.getInt(
- SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT));
- this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS, conf.getInt(
- HConstants.MASTER_MERGE_DISPATCH_THREADS,
- HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT));
+ new ExecutorConfig().setCorePoolSize(masterServerMetaOpsThreads));
+ final int masterLogReplayThreads = conf.getInt(
+ HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT);
+ this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
+ new ExecutorConfig().setCorePoolSize(masterLogReplayThreads));
+ final int masterSnapshotThreads = conf.getInt(
+ SnapshotManager.SNAPSHOT_POOL_THREADS_KEY, SnapshotManager.SNAPSHOT_POOL_THREADS_DEFAULT);
+ this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
+ new ExecutorConfig().setCorePoolSize(masterSnapshotThreads).setAllowCoreThreadTimeout(true));
+ final int masterMergeDispatchThreads = conf.getInt(HConstants.MASTER_MERGE_DISPATCH_THREADS,
+ HConstants.MASTER_MERGE_DISPATCH_THREADS_DEFAULT);
+ // Allow core threads to timeout since this is non-critical.
+ this.executorService.startExecutorService(ExecutorType.MASTER_MERGE_OPERATIONS,
+ new ExecutorConfig().setCorePoolSize(masterMergeDispatchThreads)
+ .setAllowCoreThreadTimeout(true));
// We depend on there being only one instance of this executor running
// at a time. To do concurrency, would need fencing of enable/disable of
// tables.
// Any time changing this maxThreads to > 1, pls see the comment at
// AccessController#postCompletedCreateTableAction
- this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
+ this.executorService.startExecutorService(
+ ExecutorType.MASTER_TABLE_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
startProcedureExecutor();
// Create cleaner thread pool
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index e40e25158269..f2379ddd34d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -102,6 +102,7 @@
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer;
@@ -2040,36 +2041,58 @@ private void startServices() throws IOException {
choreService.scheduleChore(compactedFileDischarger);
// Start executor services
+ final int openRegionThreads = conf.getInt("hbase.regionserver.executor.openregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_REGION,
- conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
+ new ExecutorConfig().setCorePoolSize(openRegionThreads));
+ final int openMetaThreads = conf.getInt("hbase.regionserver.executor.openmeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_META,
- conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
+ new ExecutorConfig().setCorePoolSize(openMetaThreads));
+ final int openPriorityRegionThreads =
+ conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_OPEN_PRIORITY_REGION,
- conf.getInt("hbase.regionserver.executor.openpriorityregion.threads", 3));
+ new ExecutorConfig().setCorePoolSize(openPriorityRegionThreads));
+ final int closeRegionThreads =
+ conf.getInt("hbase.regionserver.executor.closeregion.threads", 3);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_REGION,
- conf.getInt("hbase.regionserver.executor.closeregion.threads", 3));
+ new ExecutorConfig().setCorePoolSize(closeRegionThreads));
+ final int closeMetaThreads = conf.getInt("hbase.regionserver.executor.closemeta.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_CLOSE_META,
- conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
+ new ExecutorConfig().setCorePoolSize(closeMetaThreads));
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
+ final int storeScannerParallelSeekThreads =
+ conf.getInt("hbase.storescanner.parallel.seek.threads", 10);
this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
- conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
+ new ExecutorConfig().setCorePoolSize(storeScannerParallelSeekThreads)
+ .setAllowCoreThreadTimeout(true));
}
- this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt(
- HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER));
+ final int logReplayOpsThreads = conf.getInt(
+ HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER);
+ this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
+ new ExecutorConfig().setCorePoolSize(logReplayOpsThreads).setAllowCoreThreadTimeout(true));
// Start the threads for compacted files discharger
+ final int compactionDischargerThreads =
+ conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10);
this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER,
- conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10));
+ new ExecutorConfig().setCorePoolSize(compactionDischargerThreads));
if (ServerRegionReplicaUtil.isRegionReplicaWaitForPrimaryFlushEnabled(conf)) {
+ final int regionReplicaFlushThreads = conf.getInt(
+ "hbase.regionserver.region.replica.flusher.threads", conf.getInt(
+ "hbase.regionserver.executor.openregion.threads", 3));
this.executorService.startExecutorService(ExecutorType.RS_REGION_REPLICA_FLUSH_OPS,
- conf.getInt("hbase.regionserver.region.replica.flusher.threads",
- conf.getInt("hbase.regionserver.executor.openregion.threads", 3)));
+ new ExecutorConfig().setCorePoolSize(regionReplicaFlushThreads));
}
+ final int refreshPeerThreads =
+ conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2);
this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER,
- conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2));
+ new ExecutorConfig().setCorePoolSize(refreshPeerThreads));
+ final int replaySyncReplicationWALThreads =
+ conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL,
- conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1));
+ new ExecutorConfig().setCorePoolSize(replaySyncReplicationWALThreads));
+ final int switchRpcThrottleThreads =
+ conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1);
this.executorService.startExecutorService(ExecutorType.RS_SWITCH_RPC_THROTTLE,
- conf.getInt("hbase.regionserver.executor.switch.rpc.throttle.threads", 1));
+ new ExecutorConfig().setCorePoolSize(switchRpcThrottleThreads));
Threads.setDaemonThreadRunning(this.walRoller, getName() + ".logRoller",
uncaughtExceptionHandler);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
index 06795a58545e..fc6b371193af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServicesForStores.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.wal.WAL;
@@ -96,8 +97,9 @@ private static synchronized ThreadPoolExecutor getInMemoryCompactionPoolForTest(
ThreadPoolExecutor getInMemoryCompactionPool() {
if (rsServices != null) {
+ ExecutorConfig config = new ExecutorConfig().setCorePoolSize(inMemoryPoolSize);
return rsServices.getExecutorService().getExecutorLazily(ExecutorType.RS_IN_MEMORY_COMPACTION,
- inMemoryPoolSize);
+ config);
} else {
// this could only happen in tests
return getInMemoryCompactionPoolForTest();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java
index ce3e8ffdf3f1..96df8ee5646d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestExecutorStatusChore.java
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.executor.TestExecutorService.TestEventHandler;
import org.apache.hadoop.hbase.regionserver.MetricsRegionServerSource;
@@ -58,8 +59,8 @@ public void testMetricsCollect() throws Exception {
// Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test");
- executorService.startExecutorService(
- ExecutorType.RS_PARALLEL_SEEK, maxThreads);
+ executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
+ new ExecutorConfig().setCorePoolSize(maxThreads));
MetricsRegionServerSource serverSource = CompatibilitySingletonFactory
.getInstance(MetricsRegionServerSourceFactory.class).createServer(null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
index c93e951a4b25..1021d233a1ba 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/executor/TestExecutorService.java
@@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -70,8 +71,8 @@ public void testExecutorService() throws Exception {
// Start an executor service pool with max 5 threads
ExecutorService executorService = new ExecutorService("unit_test");
- executorService.startExecutorService(
- ExecutorType.MASTER_SERVER_OPERATIONS, maxThreads);
+ executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
+ new ExecutorConfig().setCorePoolSize(maxThreads));
Executor executor =
executorService.getExecutor(ExecutorType.MASTER_SERVER_OPERATIONS);
@@ -138,7 +139,7 @@ public void testExecutorService() throws Exception {
}
// Make sure threads are still around even after their timetolive expires.
- Thread.sleep(ExecutorService.Executor.keepAliveTimeInMillis * 2);
+ Thread.sleep(ExecutorConfig.KEEP_ALIVE_TIME_MILLIS_DEFAULT * 2);
assertEquals(maxThreads, pool.getPoolSize());
executorService.shutdown();
@@ -197,7 +198,7 @@ public void testAborting() throws Exception {
ExecutorService executorService = new ExecutorService("unit_test");
executorService.startExecutorService(
- ExecutorType.MASTER_SERVER_OPERATIONS, 1);
+ ExecutorType.MASTER_SERVER_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
executorService.submit(new EventHandler(server, EventType.M_SERVER_SHUTDOWN) {
@@ -229,7 +230,8 @@ public void testSnapshotHandlers() throws Exception {
when(server.getConfiguration()).thenReturn(conf);
ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
- executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
+ executorService.startExecutorService(
+ ExecutorType.MASTER_SNAPSHOT_OPERATIONS, new ExecutorConfig().setCorePoolSize(1));
CountDownLatch latch = new CountDownLatch(1);
CountDownLatch waitForEventToStart = new CountDownLatch(1);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 9ecdc455f5f1..8080a4f4ba36 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -62,6 +62,7 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResultImpl;
@@ -192,8 +193,7 @@ public void setUp() throws Exception {
String string = org.apache.hadoop.hbase.executor.EventType.RS_COMPACTED_FILES_DISCHARGER
.toString();
ExecutorService es = new ExecutorService(string);
- es.startExecutorService(
- string+"-"+string, 1);
+ es.startExecutorService(new ExecutorConfig().setCorePoolSize(1).setName(string + "-" + string));
when(rss.getExecutorService()).thenReturn(es);
primaryRegion = HRegion.createHRegion(primaryHri, rootDir, CONF, htd, walPrimary);
primaryRegion.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
index ce5466c57c9a..cd950d6b7afb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -211,7 +212,8 @@ public void setup() throws Exception {
SplitLogCounters.resetCounters();
executorService = new ExecutorService("TestSplitLogWorker");
- executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, 10);
+ executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS,
+ new ExecutorConfig().setCorePoolSize(10));
}
@After