Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
Expand Down Expand Up @@ -52,7 +53,7 @@
* This class tracks the log entries that have been committed in a quorum and
* applies them to the state machine. We let a separate thread do this work
* asynchronously so that this will not block normal raft protocol.
*
* <p>
* If the auto log compaction is enabled, the state machine updater thread will
* trigger a snapshot of the state machine by calling
* {@link StateMachine#takeSnapshot} when the log size exceeds a limit.
Expand Down Expand Up @@ -84,7 +85,8 @@ enum State {
private volatile State state = State.RUNNING;

private final SnapshotRetentionPolicy snapshotRetentionPolicy;
private StateMachineMetrics stateMachineMetrics = null;

private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;

StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
Expand Down Expand Up @@ -112,27 +114,23 @@ public int getNumSnapshotsRetained() {

updater = new Daemon(this);
this.awaitForSignal = new AwaitForSignal(name);
this.stateMachineMetrics = MemoizedSupplier.valueOf(
() -> StateMachineMetrics.getStateMachineMetrics(server, appliedIndex, stateMachine));
}

void start() {
//wait for RaftServerImpl and ServerState constructors to complete
initializeMetrics();
stateMachineMetrics.get();
updater.start();
}

private void initializeMetrics() {
if (stateMachineMetrics == null) {
stateMachineMetrics =
StateMachineMetrics.getStateMachineMetrics(
server, appliedIndex, stateMachine);
}
}

private void stop() {
state = State.STOP;
try {
stateMachine.close();
stateMachineMetrics.unregister();
if (stateMachineMetrics.isInitialized()) {
stateMachineMetrics.get().unregister();
}
} catch(Throwable t) {
LOG.warn(name + ": Failed to close " + JavaUtils.getClassSimpleName(stateMachine.getClass())
+ " " + stateMachine, t);
Expand Down Expand Up @@ -266,7 +264,7 @@ private void checkAndTakeSnapshot(MemoizedSupplier<List<CompletableFuture<Messag
private void takeSnapshot() {
final long i;
try {
Timer.Context takeSnapshotTimerContext = stateMachineMetrics.getTakeSnapshotTimer().time();
Timer.Context takeSnapshotTimerContext = stateMachineMetrics.get().getTakeSnapshotTimer().time();
i = stateMachine.takeSnapshot();
takeSnapshotTimerContext.stop();
server.getSnapshotRequestHandler().completeTakingSnapshot(i);
Expand Down Expand Up @@ -323,6 +321,8 @@ private long getLastAppliedIndex() {
}

long getStateMachineLastAppliedIndex() {
return stateMachine.getLastAppliedTermIndex().getIndex();
return Optional.ofNullable(stateMachine.getLastAppliedTermIndex())
.map(TermIndex::getIndex)
.orElse(RaftLog.INVALID_LOG_INDEX);
}
}