Skip to content

Commit

Permalink
Fix a linearizable query commit index bug
Browse files Browse the repository at this point in the history
This commit fixes a rare bug that can occur during membership changes.
The fix is validated in the new test case added to LinearizableQueryTest.java.
When a membership change is appended to the leader but not yet committed,
if the current commit index is X and the last log index is Y where Y > X,
when a linearizable query is executed, Y is passed to
StateMachine.runOperation() instead of X. This does not violate any consistency
property since the query is executed on the latest commit state of
the StateMachine, however the incorrect commit index passed to StateMachine
might be used for secondary purposes, such as logging, etc.

Secondarily, this commit fixes another small bug where a leader Raft node does
not gracefully shutdown its components automatically which required
a RaftNode#terminate() call after the membership change is committed.
  • Loading branch information
metanet committed Apr 4, 2023
1 parent bf41594 commit 8cef993
Show file tree
Hide file tree
Showing 13 changed files with 275 additions and 42 deletions.
46 changes: 27 additions & 19 deletions microraft/src/main/java/io/microraft/impl/RaftNodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import io.microraft.impl.state.RaftState;
import io.microraft.impl.state.RaftTermState;
import io.microraft.impl.state.QueryState.QueryContainer;
import io.microraft.impl.statemachine.InternalCommitAware;
import io.microraft.impl.task.HeartbeatTask;
import io.microraft.impl.task.LeaderBackoffResetTask;
import io.microraft.impl.task.LeaderElectionTimeoutTask;
Expand Down Expand Up @@ -619,9 +620,7 @@ public CompletableFuture<Ordered<Object>> terminate() {
toFollower(state.term());
}
setStatus(TERMINATED);
state.invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
state.invalidateScheduledQueries();
state.completeLeadershipTransfer(newNotLeaderException());
} catch (Throwable t) {
failure = t;
LOGGER.error("Failure during termination of " + localEndpointStr, t);
Expand Down Expand Up @@ -950,6 +949,10 @@ private void applyLogEntry(LogEntry entry) {
}

response = state.committedGroupMembers();

if (stateMachine instanceof InternalCommitAware) {
((InternalCommitAware) stateMachine).onInternalCommit(logIndex);
}
} else {
response = new IllegalArgumentException("Invalid Raft group operation: " + operation);
}
Expand Down Expand Up @@ -1045,6 +1048,9 @@ private void takeSnapshot(RaftLog log, long snapshotIndex) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(localEndpointStr + " " + snapshotEntry + " is taken. " + truncatedEntryCount + " entries are "
+ "truncated.");
} else {
LOGGER.info("{} took snapshot at term: {} and log index: {} and truncated {} entries.", localEndpointStr,
snapshotEntry.getTerm(), snapshotEntry.getIndex(), truncatedEntryCount);
}

publishRaftNodeReport(RaftNodeReportReason.TAKE_SNAPSHOT);
Expand Down Expand Up @@ -1135,7 +1141,7 @@ public void installSnapshot(SnapshotEntry snapshotEntry) {
LOGGER.info("{} snapshot is installed at commit index: {}", localEndpointStr, snapshotEntry.getIndex());

state.invalidateFuturesUntil(snapshotEntry.getIndex(), new IndeterminateStateException(state.leader()));
runScheduledQueries();
tryRunScheduledQueries();

// log.setSnapshot() truncates stale log entries from disk.
// we are submitting an async flush task here to flush those
Expand Down Expand Up @@ -1608,16 +1614,19 @@ private void commitEntries(long commitIndex) {
}

state.commitIndex(commitIndex);

if (status == ACTIVE) {
applyLogEntries();
broadcastAppendEntriesRequest();
applyLogEntries();
// the leader might have left the Raft group, but still we can send
// an append request at this point
broadcastAppendEntriesRequest();
if (status != TERMINATED) {
// the leader is still part of the Raft group
tryRunQueries();
tryRunScheduledQueries();
} else {
tryRunQueries();
applyLogEntries();
broadcastAppendEntriesRequest();
runScheduledQueries();
// the leader has left the Raft group
state.invalidateScheduledQueries();
toFollower(state.term());
terminateComponents();
}
}

Expand Down Expand Up @@ -1648,7 +1657,12 @@ public String localEndpointStr() {
}

public void tryRunQueries() {
QueryState queryState = state.leaderState().queryState();
LeaderState leaderState = state.leaderState();
if (leaderState == null) {
return;
}

QueryState queryState = leaderState.queryState();
long commitIndex = state.commitIndex();
if (!queryState.isQuorumAckReceived(commitIndex, state.logReplicationQuorumSize())) {
return;
Expand All @@ -1668,12 +1682,7 @@ public void tryRunQueries() {
queryState.reset();
}

public void runScheduledQueries() {
if (status == TERMINATED) {
state.invalidateScheduledQueries();
return;
}

public void tryRunScheduledQueries() {
long lastApplied = state.lastApplied();
Collection<QueryContainer> queries = state.collectScheduledQueriesToExecute();
for (QueryContainer query : queries) {
Expand Down Expand Up @@ -1826,7 +1835,6 @@ public boolean demoteToFollowerIfQuorumHeartbeatTimeoutElapsed() {
"{} Demoting to {} since not received append entries responses from majority recently. Latest quorum timestamp: {}",
localEndpointStr, FOLLOWER, quorumTimestamp.get());
toFollower(state.term());
state.invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
}

return demoteToFollower;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected void handle(@Nonnull AppendEntriesRequest request) {
}

if (commitIndexAdvanced) {
node.runScheduledQueries();
node.tryRunScheduledQueries();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,14 @@ private boolean updateFollowerIndices(AppendEntriesSuccessResponse response) {
}

private void checkIfQueryAckNeeded(AppendEntriesSuccessResponse response) {
QueryState queryState = state.leaderState().queryState();
LeaderState leaderState = state.leaderState();
if (leaderState == null) {
// this can happen if this node was removed from the group when
// the commit index was advanced.
return;
}

QueryState queryState = leaderState.queryState();
if (queryState.isAckNeeded(response.getSender(), state.logReplicationQuorumSize())) {
node.sendAppendEntriesRequest(response.getSender());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -43,6 +42,7 @@

import io.microraft.RaftEndpoint;
import io.microraft.RaftRole;
import io.microraft.exception.IndeterminateStateException;
import io.microraft.exception.LaggingCommitIndexException;
import io.microraft.exception.NotLeaderException;
import io.microraft.exception.RaftException;
Expand Down Expand Up @@ -447,6 +447,7 @@ public void toFollower(int term) {
// this is done here to read the updated leader field
currentLeaderState.queryState().fail(new NotLeaderException(localEndpoint, leader()));
}
invalidateFuturesFrom(commitIndex + 1, new IndeterminateStateException());
}

private void persistTerm(RaftTermState termStateToPersist) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2023, MicroRaft.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.microraft.impl.statemachine;

// mainly used for testing
public interface InternalCommitAware {

default void onInternalCommit(long commitIndex) {
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023, MicroRaft.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.microraft.model.persistence;

import javax.annotation.Nonnull;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023, MicroRaft.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.microraft.model.persistence;

import javax.annotation.Nonnegative;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright (c) 2023, MicroRaft.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.microraft.persistence;

import javax.annotation.Nonnull;
Expand Down

0 comments on commit 8cef993

Please sign in to comment.