diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftServer.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftServer.java index a0b8c382b2..b04b17d060 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftServer.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/RaftServer.java @@ -534,6 +534,7 @@ abstract class Builder implements io.atomix.utils.Builder { private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(750); private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250); private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000); + private static final ThreadModel DEFAULT_THREAD_MODEL = ThreadModel.SHARED_THREAD_POOL; private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors(); protected String name; @@ -545,6 +546,7 @@ abstract class Builder implements io.atomix.utils.Builder { protected Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL; protected Duration sessionTimeout = DEFAULT_SESSION_TIMEOUT; protected final RaftServiceRegistry serviceRegistry = new RaftServiceRegistry(); + protected ThreadModel threadModel = DEFAULT_THREAD_MODEL; protected int threadPoolSize = DEFAULT_THREAD_POOL_SIZE; protected Builder(MemberId localMemberId) { @@ -586,6 +588,17 @@ public Builder withProtocol(RaftServerProtocol protocol) { return this; } + /** + * Sets the server thread model. + * + * @param threadModel the server thread model + * @return the server builder + */ + public Builder withThreadModel(ThreadModel threadModel) { + this.threadModel = checkNotNull(threadModel, "threadModel cannot be null"); + return this; + } + /** * Sets the storage module. * diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/ThreadModel.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/ThreadModel.java new file mode 100644 index 0000000000..5e2c520168 --- /dev/null +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/ThreadModel.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.protocols.raft; + +import io.atomix.utils.concurrent.SingleThreadContextFactory; +import io.atomix.utils.concurrent.ThreadContextFactory; +import io.atomix.utils.concurrent.ThreadPoolContextFactory; +import org.slf4j.Logger; + +/** + * Raft thread model. + */ +public enum ThreadModel { + + /** + * A thread model that creates a thread pool to be shared by all services. + */ + SHARED_THREAD_POOL { + @Override + public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) { + return new SingleThreadContextFactory(nameFormat, logger); + } + }, + + /** + * A thread model that creates a thread for each Raft service. + */ + THREAD_PER_SERVICE { + @Override + public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) { + return new ThreadPoolContextFactory(nameFormat, threadPoolSize, logger); + } + }; + + /** + * Returns a thread context factory. + * + * @param nameFormat the thread name format + * @param threadPoolSize the thread pool size + * @param logger the thread logger + * @return the thread context factory + */ + public abstract ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger); +} diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftServer.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftServer.java index 76b20be716..5c4eb309fc 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftServer.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/DefaultRaftServer.java @@ -253,7 +253,7 @@ public RaftServer build() { storage = RaftStorage.newBuilder().build(); } - RaftContext raft = new RaftContext(name, type, localMemberId, protocol, storage, serviceRegistry, threadPoolSize); + RaftContext raft = new RaftContext(name, type, localMemberId, protocol, storage, serviceRegistry, threadModel, threadPoolSize); raft.setElectionTimeout(electionTimeout); raft.setHeartbeatInterval(heartbeatInterval); raft.setSessionTimeout(sessionTimeout); diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftContext.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftContext.java index 593a4e3650..184caf0b83 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftContext.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftContext.java @@ -17,6 +17,7 @@ import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftServer; +import io.atomix.protocols.raft.ThreadModel; import io.atomix.protocols.raft.cluster.MemberId; import io.atomix.protocols.raft.cluster.RaftMember; import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember; @@ -38,9 +39,9 @@ import io.atomix.protocols.raft.storage.log.RaftLogWriter; import io.atomix.protocols.raft.storage.snapshot.SnapshotStore; import io.atomix.protocols.raft.storage.system.MetaStore; -import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.SingleThreadContext; import io.atomix.utils.concurrent.ThreadContext; +import io.atomix.utils.concurrent.ThreadContextFactory; import io.atomix.utils.logging.ContextualLoggerFactory; import io.atomix.utils.logging.LoggerContext; import org.slf4j.Logger; @@ -51,9 +52,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Supplier; @@ -86,8 +84,8 @@ public class RaftContext implements AutoCloseable { private RaftLogReader logReader; private SnapshotStore snapshotStore; private RaftServiceManager stateMachine; - protected final ScheduledExecutorService threadPool; - protected final ThreadContext stateContext; + private final ThreadContextFactory threadContextFactory; + private final ThreadContext stateContext; protected RaftRole role = new InactiveRole(this); private Duration electionTimeout = Duration.ofMillis(500); private Duration sessionTimeout = Duration.ofMillis(5000); @@ -100,7 +98,15 @@ public class RaftContext implements AutoCloseable { private volatile long lastApplied; @SuppressWarnings("unchecked") - public RaftContext(String name, RaftMember.Type type, MemberId localMemberId, RaftServerProtocol protocol, RaftStorage storage, RaftServiceRegistry registry, int threadPoolSize) { + public RaftContext( + String name, + RaftMember.Type type, + MemberId localMemberId, + RaftServerProtocol protocol, + RaftStorage storage, + RaftServiceRegistry registry, + ThreadModel threadModel, + int threadPoolSize) { this.name = checkNotNull(name, "name cannot be null"); this.protocol = checkNotNull(protocol, "protocol cannot be null"); this.storage = checkNotNull(storage, "storage cannot be null"); @@ -112,7 +118,8 @@ public RaftContext(String name, RaftMember.Type type, MemberId localMemberId, Ra String baseThreadName = String.format("raft-server-%s", name); this.threadContext = new SingleThreadContext(namedThreads(baseThreadName, log)); this.stateContext = new SingleThreadContext(namedThreads(baseThreadName + "-state", log)); - this.threadPool = Executors.newScheduledThreadPool(threadPoolSize, namedThreads(baseThreadName + "-%d", log)); + + this.threadContextFactory = threadModel.factory(baseThreadName + "-%d", threadPoolSize, log); // Open the metadata store. this.meta = storage.openMetaStore(); @@ -574,7 +581,7 @@ public void reset() { snapshotStore = storage.openSnapshotStore(); // Create a new internal server state machine. - this.stateMachine = new RaftServiceManager(this, threadPool, stateContext); + this.stateMachine = new RaftServiceManager(this, threadContextFactory, stateContext); } /** @@ -803,12 +810,7 @@ public void close() { stateMachine.close(); threadContext.close(); stateContext.close(); - threadPool.shutdownNow(); - - try { - threadPool.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - } + threadContextFactory.close(); } /** diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftServiceManager.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftServiceManager.java index f7c252d520..011ac90ff4 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftServiceManager.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/impl/RaftServiceManager.java @@ -46,6 +46,7 @@ import io.atomix.utils.concurrent.ComposableFuture; import io.atomix.utils.concurrent.Futures; import io.atomix.utils.concurrent.ThreadContext; +import io.atomix.utils.concurrent.ThreadContextFactory; import io.atomix.utils.logging.ContextualLoggerFactory; import io.atomix.utils.logging.LoggerContext; import org.slf4j.Logger; @@ -60,7 +61,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkNotNull; @@ -81,7 +81,7 @@ public class RaftServiceManager implements AutoCloseable { private final Logger logger; private final RaftContext raft; - private final ScheduledExecutorService threadPool; + private final ThreadContextFactory threadContextFactory; private final ThreadContext threadContext; private final RaftLog log; private final RaftLogReader reader; @@ -92,11 +92,11 @@ public class RaftServiceManager implements AutoCloseable { private long lastPrepared; private long lastCompacted; - public RaftServiceManager(RaftContext raft, ScheduledExecutorService threadPool, ThreadContext threadContext) { + public RaftServiceManager(RaftContext raft, ThreadContextFactory threadContextFactory, ThreadContext threadContext) { this.raft = checkNotNull(raft, "state cannot be null"); this.log = raft.getLog(); this.reader = log.openReader(1, RaftLogReader.Mode.COMMITS); - this.threadPool = threadPool; + this.threadContextFactory = threadContextFactory; this.threadContext = threadContext; this.loadCounter = new SlidingWindowCounter(WINDOW_SIZE, threadContext); this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class) @@ -263,7 +263,7 @@ private void prepareIndex(long index) { sessionTimeout, service, raft, - threadPool); + threadContextFactory); session.setTimestamp(sessionTimestamp); session.setRequestSequence(reader.readLong()); session.setCommandSequence(reader.readLong()); @@ -389,7 +389,7 @@ private DefaultServiceContext getOrInitializeService(ServiceId serviceId, Servic serviceFactory.get(), raft, sessionManager, - threadPool); + threadContextFactory); services.put(serviceName, service); } return service; @@ -418,7 +418,7 @@ private CompletableFuture applyOpenSession(Indexed entry entry.entry().timeout(), service, raft, - threadPool); + threadContextFactory); sessionManager.registerSession(session); return service.openSession(entry.index(), entry.entry().timestamp(), session); } diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/service/impl/DefaultServiceContext.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/service/impl/DefaultServiceContext.java index a6c0207790..3cc6c4746d 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/service/impl/DefaultServiceContext.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/service/impl/DefaultServiceContext.java @@ -44,7 +44,7 @@ import io.atomix.time.WallClockTimestamp; import io.atomix.utils.SlidingWindowCounter; import io.atomix.utils.concurrent.ThreadContext; -import io.atomix.utils.concurrent.ThreadPoolContext; +import io.atomix.utils.concurrent.ThreadContextFactory; import io.atomix.utils.logging.ContextualLoggerFactory; import io.atomix.utils.logging.LoggerContext; import org.slf4j.Logger; @@ -52,7 +52,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ScheduledExecutorService; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkNotNull; @@ -75,7 +74,7 @@ public class DefaultServiceContext implements ServiceContext { private final DefaultServiceSessions sessions; private final ThreadContext serviceExecutor; private final ThreadContext snapshotExecutor; - private final ScheduledExecutorService threadPool; + private final ThreadContextFactory threadContextFactory; private final SlidingWindowCounter loadCounter; private final Map pendingSnapshots = new ConcurrentSkipListMap<>(); private long snapshotIndex; @@ -102,17 +101,17 @@ public DefaultServiceContext( RaftService service, RaftContext server, RaftSessionManager sessionManager, - ScheduledExecutorService threadPool) { + ThreadContextFactory threadContextFactory) { this.serviceId = checkNotNull(serviceId); this.serviceName = checkNotNull(serviceName); this.serviceType = checkNotNull(serviceType); this.service = checkNotNull(service); this.server = checkNotNull(server); this.sessions = new DefaultServiceSessions(serviceId, sessionManager); - this.serviceExecutor = new ThreadPoolContext(threadPool); - this.snapshotExecutor = new ThreadPoolContext(threadPool); + this.serviceExecutor = threadContextFactory.createContext(); + this.snapshotExecutor = threadContextFactory.createContext(); this.loadCounter = new SlidingWindowCounter(WINDOW_SIZE, serviceExecutor); - this.threadPool = checkNotNull(threadPool); + this.threadContextFactory = threadContextFactory; this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftService.class) .addValue(serviceId) .add("type", serviceType) @@ -284,7 +283,7 @@ private void maybeInstallSnapshot(long index) { sessionTimeout, this, server, - threadPool); + threadContextFactory); session.setTimestamp(sessionTimestamp); session.setRequestSequence(reader.readLong()); session.setCommandSequence(reader.readLong()); diff --git a/protocols/raft/src/main/java/io/atomix/protocols/raft/session/impl/RaftSessionContext.java b/protocols/raft/src/main/java/io/atomix/protocols/raft/session/impl/RaftSessionContext.java index e0011d71b4..8509f187b4 100644 --- a/protocols/raft/src/main/java/io/atomix/protocols/raft/session/impl/RaftSessionContext.java +++ b/protocols/raft/src/main/java/io/atomix/protocols/raft/session/impl/RaftSessionContext.java @@ -31,7 +31,7 @@ import io.atomix.protocols.raft.session.SessionId; import io.atomix.utils.TimestampPrinter; import io.atomix.utils.concurrent.ThreadContext; -import io.atomix.utils.concurrent.ThreadPoolContext; +import io.atomix.utils.concurrent.ThreadContextFactory; import io.atomix.utils.logging.ContextualLoggerFactory; import io.atomix.utils.logging.LoggerContext; import org.slf4j.Logger; @@ -44,7 +44,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledExecutorService; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; @@ -88,7 +87,7 @@ public RaftSessionContext( long timeout, DefaultServiceContext context, RaftContext server, - ScheduledExecutorService threadPool) { + ThreadContextFactory threadContextFactory) { this.sessionId = sessionId; this.member = member; this.name = name; @@ -101,7 +100,7 @@ public RaftSessionContext( this.protocol = server.getProtocol(); this.context = context; this.server = server; - this.eventExecutor = new ThreadPoolContext(threadPool); + this.eventExecutor = threadContextFactory.createContext(); this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftSession.class) .addValue(sessionId) .add("type", context.serviceType()) diff --git a/protocols/raft/src/test/java/io/atomix/protocols/raft/session/impl/RaftSessionManagerTest.java b/protocols/raft/src/test/java/io/atomix/protocols/raft/session/impl/RaftSessionManagerTest.java index 1ca9130da0..7be62bfc2c 100644 --- a/protocols/raft/src/test/java/io/atomix/protocols/raft/session/impl/RaftSessionManagerTest.java +++ b/protocols/raft/src/test/java/io/atomix/protocols/raft/session/impl/RaftSessionManagerTest.java @@ -26,6 +26,7 @@ import io.atomix.protocols.raft.session.RaftSessionListener; import io.atomix.protocols.raft.session.SessionId; import io.atomix.utils.concurrent.ThreadContext; +import io.atomix.utils.concurrent.ThreadContextFactory; import org.junit.Test; import java.util.concurrent.ArrayBlockingQueue; @@ -121,7 +122,7 @@ private RaftSessionContext createSession(long sessionId) { 5000, context, server, - mock(ScheduledExecutorService.class)); + mock(ThreadContextFactory.class)); } private class TestSessionListener implements RaftSessionListener { diff --git a/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java b/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java index 959f75b914..f148a09049 100644 --- a/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java +++ b/tests/src/main/java/io/atomix/protocols/raft/RaftPerformanceTest.java @@ -119,11 +119,11 @@ public class RaftPerformanceTest implements Runnable { private static final boolean USE_NETTY = true; - private static final int ITERATIONS = 10; + private static final int ITERATIONS = 1; private static final int TOTAL_OPERATIONS = 1000000; private static final int WRITE_RATIO = 10; - private static final int NUM_CLIENTS = 20; + private static final int NUM_CLIENTS = 5; private static final ReadConsistency READ_CONSISTENCY = ReadConsistency.LINEARIZABLE; private static final CommunicationStrategy COMMUNICATION_STRATEGY = CommunicationStrategy.ANY; @@ -464,11 +464,13 @@ private RaftServer createServer(RaftMember member) throws UnknownHostException { RaftServer.Builder builder = RaftServer.newBuilder(member.memberId()) .withType(member.getType()) .withProtocol(protocol) + .withThreadModel(ThreadModel.THREAD_PER_SERVICE) .withStorage(RaftStorage.newBuilder() - .withStorageLevel(StorageLevel.DISK) + .withStorageLevel(StorageLevel.MAPPED) .withDirectory(new File(String.format("target/perf-logs/%s", member.memberId()))) .withSerializer(storageSerializer) - .withMaxSegmentSize(1024 * 1024) + .withMaxEntriesPerSegment(32768) + .withMaxSegmentSize(1024 * 1024 * 64) .build()) .addService("test", PerformanceStateMachine::new); diff --git a/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java b/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java index 1daf3e3dc9..d2e49b62c9 100644 --- a/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java +++ b/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContext.java @@ -42,7 +42,7 @@ * @author Jordan Halterman */ public class SingleThreadContext implements ThreadContext { - private static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(SingleThreadContext.class); private final ScheduledExecutorService executor; private final Executor wrappedExecutor = new Executor() { @Override diff --git a/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContextFactory.java b/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContextFactory.java new file mode 100644 index 0000000000..bae76d2dce --- /dev/null +++ b/utils/src/main/java/io/atomix/utils/concurrent/SingleThreadContextFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.utils.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.ThreadFactory; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.atomix.utils.concurrent.Threads.namedThreads; + +/** + * Single thread context factory. + */ +public class SingleThreadContextFactory implements ThreadContextFactory { + private final ThreadFactory threadFactory; + + public SingleThreadContextFactory(String nameFormat, Logger logger) { + this(namedThreads(nameFormat, logger)); + } + + public SingleThreadContextFactory(ThreadFactory threadFactory) { + this.threadFactory = checkNotNull(threadFactory); + } + + @Override + public ThreadContext createContext() { + return new SingleThreadContext(threadFactory); + } +} diff --git a/utils/src/main/java/io/atomix/utils/concurrent/ThreadContextFactory.java b/utils/src/main/java/io/atomix/utils/concurrent/ThreadContextFactory.java new file mode 100644 index 0000000000..09b625696d --- /dev/null +++ b/utils/src/main/java/io/atomix/utils/concurrent/ThreadContextFactory.java @@ -0,0 +1,36 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.utils.concurrent; + +/** + * Thread context factory. + */ +public interface ThreadContextFactory { + + /** + * Creates a new thread context. + * + * @return a new thread context + */ + ThreadContext createContext(); + + /** + * Closes the factory. + */ + default void close() { + } + +} diff --git a/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContextFactory.java b/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContextFactory.java new file mode 100644 index 0000000000..4ed128f58e --- /dev/null +++ b/utils/src/main/java/io/atomix/utils/concurrent/ThreadPoolContextFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright 2017-present Open Networking Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.atomix.utils.concurrent; + +import org.slf4j.Logger; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; + +import static io.atomix.utils.concurrent.Threads.namedThreads; + +/** + * Thread pool context factory. + */ +public class ThreadPoolContextFactory implements ThreadContextFactory { + private final ScheduledExecutorService executor; + + public ThreadPoolContextFactory(String name, int threadPoolSize, Logger logger) { + this(threadPoolSize, namedThreads(name, logger)); + } + + public ThreadPoolContextFactory(int threadPoolSize, ThreadFactory threadFactory) { + this(Executors.newScheduledThreadPool(threadPoolSize, threadFactory)); + } + + public ThreadPoolContextFactory(ScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public ThreadContext createContext() { + return new ThreadPoolContext(executor); + } + + @Override + public void close() { + executor.shutdownNow(); + } +}