Skip to content

Commit

Permalink
Add support for configuring the Raft server thread model.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Sep 20, 2017
1 parent de29f41 commit 1b0b81c
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 41 deletions.
Expand Up @@ -534,6 +534,7 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
private static final Duration DEFAULT_ELECTION_TIMEOUT = Duration.ofMillis(750);
private static final Duration DEFAULT_HEARTBEAT_INTERVAL = Duration.ofMillis(250);
private static final Duration DEFAULT_SESSION_TIMEOUT = Duration.ofMillis(5000);
private static final ThreadModel DEFAULT_THREAD_MODEL = ThreadModel.SHARED_THREAD_POOL;
private static final int DEFAULT_THREAD_POOL_SIZE = Runtime.getRuntime().availableProcessors();

protected String name;
Expand All @@ -545,6 +546,7 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
protected Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
protected Duration sessionTimeout = DEFAULT_SESSION_TIMEOUT;
protected final RaftServiceRegistry serviceRegistry = new RaftServiceRegistry();
protected ThreadModel threadModel = DEFAULT_THREAD_MODEL;
protected int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

protected Builder(MemberId localMemberId) {
Expand Down Expand Up @@ -586,6 +588,17 @@ public Builder withProtocol(RaftServerProtocol protocol) {
return this;
}

/**
* Sets the server thread model.
*
* @param threadModel the server thread model
* @return the server builder
*/
public Builder withThreadModel(ThreadModel threadModel) {
this.threadModel = checkNotNull(threadModel, "threadModel cannot be null");
return this;
}

/**
* Sets the storage module.
*
Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft;

import io.atomix.utils.concurrent.SingleThreadContextFactory;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.concurrent.ThreadPoolContextFactory;
import org.slf4j.Logger;

/**
* Raft thread model.
*/
public enum ThreadModel {

/**
* A thread model that creates a thread pool to be shared by all services.
*/
SHARED_THREAD_POOL {
@Override
public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
return new SingleThreadContextFactory(nameFormat, logger);
}
},

/**
* A thread model that creates a thread for each Raft service.
*/
THREAD_PER_SERVICE {
@Override
public ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger) {
return new ThreadPoolContextFactory(nameFormat, threadPoolSize, logger);
}
};

/**
* Returns a thread context factory.
*
* @param nameFormat the thread name format
* @param threadPoolSize the thread pool size
* @param logger the thread logger
* @return the thread context factory
*/
public abstract ThreadContextFactory factory(String nameFormat, int threadPoolSize, Logger logger);
}
Expand Up @@ -253,7 +253,7 @@ public RaftServer build() {
storage = RaftStorage.newBuilder().build();
}

RaftContext raft = new RaftContext(name, type, localMemberId, protocol, storage, serviceRegistry, threadPoolSize);
RaftContext raft = new RaftContext(name, type, localMemberId, protocol, storage, serviceRegistry, threadModel, threadPoolSize);
raft.setElectionTimeout(electionTimeout);
raft.setHeartbeatInterval(heartbeatInterval);
raft.setSessionTimeout(sessionTimeout);
Expand Down
Expand Up @@ -17,6 +17,7 @@

import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.ThreadModel;
import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
Expand All @@ -38,9 +39,9 @@
import io.atomix.protocols.raft.storage.log.RaftLogWriter;
import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
import io.atomix.protocols.raft.storage.system.MetaStore;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import org.slf4j.Logger;
Expand All @@ -51,9 +52,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

Expand Down Expand Up @@ -86,8 +84,8 @@ public class RaftContext implements AutoCloseable {
private RaftLogReader logReader;
private SnapshotStore snapshotStore;
private RaftServiceManager stateMachine;
protected final ScheduledExecutorService threadPool;
protected final ThreadContext stateContext;
private final ThreadContextFactory threadContextFactory;
private final ThreadContext stateContext;
protected RaftRole role = new InactiveRole(this);
private Duration electionTimeout = Duration.ofMillis(500);
private Duration sessionTimeout = Duration.ofMillis(5000);
Expand All @@ -100,7 +98,15 @@ public class RaftContext implements AutoCloseable {
private volatile long lastApplied;

@SuppressWarnings("unchecked")
public RaftContext(String name, RaftMember.Type type, MemberId localMemberId, RaftServerProtocol protocol, RaftStorage storage, RaftServiceRegistry registry, int threadPoolSize) {
public RaftContext(
String name,
RaftMember.Type type,
MemberId localMemberId,
RaftServerProtocol protocol,
RaftStorage storage,
RaftServiceRegistry registry,
ThreadModel threadModel,
int threadPoolSize) {
this.name = checkNotNull(name, "name cannot be null");
this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.storage = checkNotNull(storage, "storage cannot be null");
Expand All @@ -112,7 +118,8 @@ public RaftContext(String name, RaftMember.Type type, MemberId localMemberId, Ra
String baseThreadName = String.format("raft-server-%s", name);
this.threadContext = new SingleThreadContext(namedThreads(baseThreadName, log));
this.stateContext = new SingleThreadContext(namedThreads(baseThreadName + "-state", log));
this.threadPool = Executors.newScheduledThreadPool(threadPoolSize, namedThreads(baseThreadName + "-%d", log));

this.threadContextFactory = threadModel.factory(baseThreadName + "-%d", threadPoolSize, log);

// Open the metadata store.
this.meta = storage.openMetaStore();
Expand Down Expand Up @@ -574,7 +581,7 @@ public void reset() {
snapshotStore = storage.openSnapshotStore();

// Create a new internal server state machine.
this.stateMachine = new RaftServiceManager(this, threadPool, stateContext);
this.stateMachine = new RaftServiceManager(this, threadContextFactory, stateContext);
}

/**
Expand Down Expand Up @@ -803,12 +810,7 @@ public void close() {
stateMachine.close();
threadContext.close();
stateContext.close();
threadPool.shutdownNow();

try {
threadPool.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
threadContextFactory.close();
}

/**
Expand Down
Expand Up @@ -46,6 +46,7 @@
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import org.slf4j.Logger;
Expand All @@ -60,7 +61,6 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -81,7 +81,7 @@ public class RaftServiceManager implements AutoCloseable {

private final Logger logger;
private final RaftContext raft;
private final ScheduledExecutorService threadPool;
private final ThreadContextFactory threadContextFactory;
private final ThreadContext threadContext;
private final RaftLog log;
private final RaftLogReader reader;
Expand All @@ -92,11 +92,11 @@ public class RaftServiceManager implements AutoCloseable {
private long lastPrepared;
private long lastCompacted;

public RaftServiceManager(RaftContext raft, ScheduledExecutorService threadPool, ThreadContext threadContext) {
public RaftServiceManager(RaftContext raft, ThreadContextFactory threadContextFactory, ThreadContext threadContext) {
this.raft = checkNotNull(raft, "state cannot be null");
this.log = raft.getLog();
this.reader = log.openReader(1, RaftLogReader.Mode.COMMITS);
this.threadPool = threadPool;
this.threadContextFactory = threadContextFactory;
this.threadContext = threadContext;
this.loadCounter = new SlidingWindowCounter(WINDOW_SIZE, threadContext);
this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class)
Expand Down Expand Up @@ -263,7 +263,7 @@ private void prepareIndex(long index) {
sessionTimeout,
service,
raft,
threadPool);
threadContextFactory);
session.setTimestamp(sessionTimestamp);
session.setRequestSequence(reader.readLong());
session.setCommandSequence(reader.readLong());
Expand Down Expand Up @@ -389,7 +389,7 @@ private DefaultServiceContext getOrInitializeService(ServiceId serviceId, Servic
serviceFactory.get(),
raft,
sessionManager,
threadPool);
threadContextFactory);
services.put(serviceName, service);
}
return service;
Expand Down Expand Up @@ -418,7 +418,7 @@ private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> entry
entry.entry().timeout(),
service,
raft,
threadPool);
threadContextFactory);
sessionManager.registerSession(session);
return service.openSession(entry.index(), entry.entry().timestamp(), session);
}
Expand Down
Expand Up @@ -44,15 +44,14 @@
import io.atomix.time.WallClockTimestamp;
import io.atomix.utils.SlidingWindowCounter;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadPoolContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import org.slf4j.Logger;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ScheduledExecutorService;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -75,7 +74,7 @@ public class DefaultServiceContext implements ServiceContext {
private final DefaultServiceSessions sessions;
private final ThreadContext serviceExecutor;
private final ThreadContext snapshotExecutor;
private final ScheduledExecutorService threadPool;
private final ThreadContextFactory threadContextFactory;
private final SlidingWindowCounter loadCounter;
private final Map<Long, PendingSnapshot> pendingSnapshots = new ConcurrentSkipListMap<>();
private long snapshotIndex;
Expand All @@ -102,17 +101,17 @@ public DefaultServiceContext(
RaftService service,
RaftContext server,
RaftSessionManager sessionManager,
ScheduledExecutorService threadPool) {
ThreadContextFactory threadContextFactory) {
this.serviceId = checkNotNull(serviceId);
this.serviceName = checkNotNull(serviceName);
this.serviceType = checkNotNull(serviceType);
this.service = checkNotNull(service);
this.server = checkNotNull(server);
this.sessions = new DefaultServiceSessions(serviceId, sessionManager);
this.serviceExecutor = new ThreadPoolContext(threadPool);
this.snapshotExecutor = new ThreadPoolContext(threadPool);
this.serviceExecutor = threadContextFactory.createContext();
this.snapshotExecutor = threadContextFactory.createContext();
this.loadCounter = new SlidingWindowCounter(WINDOW_SIZE, serviceExecutor);
this.threadPool = checkNotNull(threadPool);
this.threadContextFactory = threadContextFactory;
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftService.class)
.addValue(serviceId)
.add("type", serviceType)
Expand Down Expand Up @@ -284,7 +283,7 @@ private void maybeInstallSnapshot(long index) {
sessionTimeout,
this,
server,
threadPool);
threadContextFactory);
session.setTimestamp(sessionTimestamp);
session.setRequestSequence(reader.readLong());
session.setCommandSequence(reader.readLong());
Expand Down
Expand Up @@ -31,7 +31,7 @@
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.utils.TimestampPrinter;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadPoolContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import org.slf4j.Logger;
Expand All @@ -44,7 +44,6 @@
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -88,7 +87,7 @@ public RaftSessionContext(
long timeout,
DefaultServiceContext context,
RaftContext server,
ScheduledExecutorService threadPool) {
ThreadContextFactory threadContextFactory) {
this.sessionId = sessionId;
this.member = member;
this.name = name;
Expand All @@ -101,7 +100,7 @@ public RaftSessionContext(
this.protocol = server.getProtocol();
this.context = context;
this.server = server;
this.eventExecutor = new ThreadPoolContext(threadPool);
this.eventExecutor = threadContextFactory.createContext();
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftSession.class)
.addValue(sessionId)
.add("type", context.serviceType())
Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.atomix.protocols.raft.session.RaftSessionListener;
import io.atomix.protocols.raft.session.SessionId;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import org.junit.Test;

import java.util.concurrent.ArrayBlockingQueue;
Expand Down Expand Up @@ -121,7 +122,7 @@ private RaftSessionContext createSession(long sessionId) {
5000,
context,
server,
mock(ScheduledExecutorService.class));
mock(ThreadContextFactory.class));
}

private class TestSessionListener implements RaftSessionListener {
Expand Down
Expand Up @@ -119,11 +119,11 @@ public class RaftPerformanceTest implements Runnable {

private static final boolean USE_NETTY = true;

private static final int ITERATIONS = 10;
private static final int ITERATIONS = 1;

private static final int TOTAL_OPERATIONS = 1000000;
private static final int WRITE_RATIO = 10;
private static final int NUM_CLIENTS = 20;
private static final int NUM_CLIENTS = 5;

private static final ReadConsistency READ_CONSISTENCY = ReadConsistency.LINEARIZABLE;
private static final CommunicationStrategy COMMUNICATION_STRATEGY = CommunicationStrategy.ANY;
Expand Down Expand Up @@ -464,11 +464,13 @@ private RaftServer createServer(RaftMember member) throws UnknownHostException {
RaftServer.Builder builder = RaftServer.newBuilder(member.memberId())
.withType(member.getType())
.withProtocol(protocol)
.withThreadModel(ThreadModel.THREAD_PER_SERVICE)
.withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.DISK)
.withStorageLevel(StorageLevel.MAPPED)
.withDirectory(new File(String.format("target/perf-logs/%s", member.memberId())))
.withSerializer(storageSerializer)
.withMaxSegmentSize(1024 * 1024)
.withMaxEntriesPerSegment(32768)
.withMaxSegmentSize(1024 * 1024 * 64)
.build())
.addService("test", PerformanceStateMachine::new);

Expand Down

0 comments on commit 1b0b81c

Please sign in to comment.