Skip to content

Commit

Permalink
Implement hard disk usage limit after which writes to the Raft cluste…
Browse files Browse the repository at this point in the history
…r are blocked.
  • Loading branch information
kuujo committed Oct 27, 2017
1 parent 2154af4 commit 5a4ce28
Show file tree
Hide file tree
Showing 14 changed files with 848 additions and 525 deletions.
Expand Up @@ -19,7 +19,7 @@
import io.atomix.protocols.raft.cluster.RaftCluster;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.impl.DefaultRaftServer;
import io.atomix.protocols.raft.impl.RaftServiceRegistry;
import io.atomix.protocols.raft.impl.RaftServiceFactoryRegistry;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.storage.RaftStorage;
Expand Down Expand Up @@ -553,7 +553,7 @@ abstract class Builder implements io.atomix.utils.Builder<RaftServer> {
protected Duration electionTimeout = DEFAULT_ELECTION_TIMEOUT;
protected Duration heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL;
protected Duration sessionTimeout = DEFAULT_SESSION_TIMEOUT;
protected final RaftServiceRegistry serviceRegistry = new RaftServiceRegistry();
protected final RaftServiceFactoryRegistry serviceRegistry = new RaftServiceFactoryRegistry();
protected ThreadModel threadModel = DEFAULT_THREAD_MODEL;
protected int threadPoolSize = DEFAULT_THREAD_POOL_SIZE;

Expand Down
Expand Up @@ -34,11 +34,13 @@
import io.atomix.protocols.raft.roles.RaftRole;
import io.atomix.protocols.raft.roles.ReserveRole;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.protocols.raft.storage.compactor.RaftLogCompactor;
import io.atomix.protocols.raft.storage.log.RaftLog;
import io.atomix.protocols.raft.storage.log.RaftLogReader;
import io.atomix.protocols.raft.storage.log.RaftLogWriter;
import io.atomix.protocols.raft.storage.snapshot.SnapshotStore;
import io.atomix.protocols.raft.storage.system.MetaStore;
import io.atomix.protocols.raft.utils.LoadMonitor;
import io.atomix.utils.concurrent.SingleThreadContext;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
Expand Down Expand Up @@ -67,25 +69,34 @@
* is stored in the cluster state. This includes Raft-specific state like the current leader and term, the log, and the cluster configuration.
*/
public class RaftContext implements AutoCloseable {
private static final int LOAD_WINDOW_SIZE = 5;
private static final int LOAD_WINDOW = 2;
private static final int HIGH_LOAD_THRESHOLD = 10;

private final Logger log;
private final Set<Consumer<RaftServer.Role>> roleChangeListeners = new CopyOnWriteArraySet<>();
private final Set<Consumer<State>> stateChangeListeners = new CopyOnWriteArraySet<>();
private final Set<Consumer<RaftMember>> electionListeners = new CopyOnWriteArraySet<>();
protected final String name;
protected final ThreadContext threadContext;
protected final RaftServiceRegistry registry;
protected final RaftServiceFactoryRegistry serviceFactories;
protected final RaftClusterContext cluster;
protected final RaftServerProtocol protocol;
protected final RaftStorage storage;
protected final RaftServiceRegistry services = new RaftServiceRegistry();
private final LoadMonitor loadMonitor;
private volatile State state = State.ACTIVE;
private MetaStore meta;
private RaftLog raftLog;
private RaftLogWriter logWriter;
private RaftLogReader logReader;
private RaftLogCompactor logCompactor;
private SnapshotStore snapshotStore;
private RaftServiceManager stateMachine;
private final ThreadContextFactory threadContextFactory;
private final ThreadContext stateContext;
private final ThreadContext loadContext;
private final ThreadContext compactionContext;
protected RaftRole role = new InactiveRole(this);
private Duration electionTimeout = Duration.ofMillis(500);
private Duration sessionTimeout = Duration.ofMillis(5000);
Expand All @@ -104,23 +115,27 @@ public RaftContext(
MemberId localMemberId,
RaftServerProtocol protocol,
RaftStorage storage,
RaftServiceRegistry registry,
RaftServiceFactoryRegistry serviceFactories,
ThreadModel threadModel,
int threadPoolSize) {
this.name = checkNotNull(name, "name cannot be null");
this.protocol = checkNotNull(protocol, "protocol cannot be null");
this.storage = checkNotNull(storage, "storage cannot be null");
this.registry = checkNotNull(registry, "registry cannot be null");
this.serviceFactories = checkNotNull(serviceFactories, "registry cannot be null");
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class)
.addValue(name)
.build());

String baseThreadName = String.format("raft-server-%s", name);
this.threadContext = new SingleThreadContext(namedThreads(baseThreadName, log));
this.stateContext = new SingleThreadContext(namedThreads(baseThreadName + "-state", log));
this.loadContext = new SingleThreadContext(namedThreads(baseThreadName + "-load", log));
this.compactionContext = new SingleThreadContext(namedThreads(baseThreadName + "-compaction", log));

this.threadContextFactory = threadModel.factory(baseThreadName + "-%d", threadPoolSize, log);

this.loadMonitor = new LoadMonitor(LOAD_WINDOW_SIZE, HIGH_LOAD_THRESHOLD, loadContext);

// Open the metadata store.
this.meta = storage.openMetaStore();

Expand Down Expand Up @@ -501,6 +516,15 @@ public long getLastApplied() {
return lastApplied;
}

/**
* Returns the server load monitor.
*
* @return the server load monitor
*/
public LoadMonitor getLoadMonitor() {
return loadMonitor;
}

/**
* Returns the server state machine.
*
Expand All @@ -510,13 +534,22 @@ public RaftServiceManager getStateMachine() {
return stateMachine;
}

/**
* Returns the server service registry.
*
* @return the server service registry
*/
public RaftServiceRegistry getServices() {
return services;
}

/**
* Returns the server state machine registry.
*
* @return The server state machine registry.
*/
public RaftServiceRegistry getServiceRegistry() {
return registry;
public RaftServiceFactoryRegistry getServiceFactories() {
return serviceFactories;
}

/**
Expand Down Expand Up @@ -573,6 +606,15 @@ public RaftLogReader getLogReader() {
return logReader;
}

/**
* Returns the Raft log compactor.
*
* @return the Raft log compactor
*/
public RaftLogCompactor getLogCompactor() {
return logCompactor;
}

/**
* Resets the log and state machine.
*/
Expand All @@ -593,12 +635,13 @@ public void reset() {
raftLog = storage.openLog();
logWriter = raftLog.writer();
logReader = raftLog.openReader(1, RaftLogReader.Mode.ALL);
logCompactor = new RaftLogCompactor(this, compactionContext);

// Open the snapshot store.
snapshotStore = storage.openSnapshotStore();

// Create a new internal server state machine.
this.stateMachine = new RaftServiceManager(this, threadContextFactory, stateContext);
this.stateMachine = new RaftServiceManager(this, threadContextFactory);
}

/**
Expand Down Expand Up @@ -827,6 +870,8 @@ public void close() {
stateMachine.close();
threadContext.close();
stateContext.close();
loadContext.close();
compactionContext.close();
threadContextFactory.close();
}

Expand Down
@@ -0,0 +1,82 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.protocols.raft.impl;

import io.atomix.protocols.raft.service.RaftService;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* State machine registry.
*/
public class RaftServiceFactoryRegistry {
private final Map<String, Supplier<RaftService>> stateMachines = new ConcurrentHashMap<>();

/**
* Returns the number of registered state machines.
*
* @return The number of registered state machines.
*/
public int size() {
return stateMachines.size();
}

/**
* Registers a new state machine type.
*
* @param type The state machine type to register.
* @param factory The state machine factory.
* @return The state machine registry.
*/
public RaftServiceFactoryRegistry register(String type, Supplier<RaftService> factory) {
stateMachines.put(checkNotNull(type, "type cannot be null"), checkNotNull(factory, "factory cannot be null"));
return this;
}

/**
* Unregisters the given state machine type.
*
* @param type The state machine type to unregister.
* @return The state machine registry.
*/
public RaftServiceFactoryRegistry unregister(String type) {
stateMachines.remove(type);
return this;
}

/**
* Returns the factory for the given state machine type.
*
* @param type The state machine type for which to return the factory.
* @return The factory for the given state machine type or {@code null} if the type is not registered.
*/
public Supplier<RaftService> getFactory(String type) {
return stateMachines.get(type);
}

@Override
public String toString() {
return toStringHelper(this)
.add("stateMachines", stateMachines)
.toString();
}

}

0 comments on commit 5a4ce28

Please sign in to comment.