Skip to content

Commit

Permalink
Fix linearizable query quorum calculation when learners are present
Browse files Browse the repository at this point in the history
Learners should be excluded from the LINEARIZABLE query quorum, similar to
how they are excluded from the replication quorum.
  • Loading branch information
metanet committed Apr 7, 2023
1 parent d3696fe commit c768342
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 11 deletions.
12 changes: 9 additions & 3 deletions microraft/src/main/java/io/microraft/impl/RaftNodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ public void sendSnapshotChunk(RaftEndpoint follower, long snapshotIndex, int req
.setSnapshotTerm(snapshotEntry.getTerm()).setSnapshotIndex(snapshotEntry.getIndex())
.setTotalSnapshotChunkCount(snapshotEntry.getSnapshotChunkCount()).setSnapshotChunk(snapshotChunk)
.setSnapshottedMembers(snapshottedMembers).setGroupMembersView(snapshotEntry.getGroupMembersView())
.setQuerySequenceNumber(leaderState != null ? leaderState.querySequenceNumber() : 0)
.setQuerySequenceNumber(
(leaderState != null) ? leaderState.querySequenceNumber(state.isVotingMember(follower)) : 0)
.setFlowControlSequenceNumber(followerState != null ? enableBackoff(followerState) : 0).build();

send(follower, request);
Expand Down Expand Up @@ -1290,6 +1291,9 @@ public void sendAppendEntriesRequest(RaftEndpoint target) {
}

long nextIndex = followerState.nextIndex();
// we never send query sequencer number to learners
// since they are excluded from the replication quorum.
long querySequenceNumber = leaderState.querySequenceNumber(state.isVotingMember(target));

// if the first log entry to be sent is put into the snapshot, check if we still
// keep it in the log
Expand All @@ -1306,7 +1310,7 @@ public void sendAppendEntriesRequest(RaftEndpoint target) {
.setSnapshotTerm(snapshotEntry.getTerm()).setSnapshotIndex(snapshotEntry.getIndex())
.setTotalSnapshotChunkCount(snapshotEntry.getSnapshotChunkCount()).setSnapshotChunk(null)
.setSnapshottedMembers(snapshottedMembers).setGroupMembersView(snapshotEntry.getGroupMembersView())
.setQuerySequenceNumber(leaderState.querySequenceNumber())
.setQuerySequenceNumber(querySequenceNumber)
.setFlowControlSequenceNumber(enableBackoff(followerState)).build();

if (LOGGER.isDebugEnabled()) {
Expand All @@ -1322,7 +1326,7 @@ public void sendAppendEntriesRequest(RaftEndpoint target) {

AppendEntriesRequestBuilder requestBuilder = modelFactory.createAppendEntriesRequestBuilder()
.setGroupId(getGroupId()).setSender(getLocalEndpoint()).setTerm(state.term())
.setCommitIndex(state.commitIndex()).setQuerySequenceNumber(leaderState.querySequenceNumber());
.setCommitIndex(state.commitIndex()).setQuerySequenceNumber(querySequenceNumber);
List<LogEntry> entries;
boolean backoff = true;
long lastLogIndex = log.lastLogOrSnapshotIndex();
Expand Down Expand Up @@ -1632,6 +1636,8 @@ public void tryAckQuery(long querySequenceNumber, RaftEndpoint sender) {
LeaderState leaderState = state.leaderState();
if (leaderState == null) {
return;
} else if (!state.isVotingMember(sender)) {
return;
}

QueryState queryState = leaderState.queryState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,8 @@ private boolean updateFollowerIndices(AppendEntriesSuccessResponse response) {
return false;
}

QueryState queryState = leaderState.queryState();

if (queryState.tryAck(response.getQuerySequenceNumber(), follower)) {
if (state.isVotingMember(follower)
&& leaderState.queryState().tryAck(response.getQuerySequenceNumber(), follower)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(localEndpointStr() + " ack from " + follower.getId() + " for query sequence number: "
+ response.getQuerySequenceNumber());
Expand Down Expand Up @@ -131,6 +130,9 @@ private void checkIfQueryAckNeeded(AppendEntriesSuccessResponse response) {
// this can happen if this node was removed from the group when
// the commit index was advanced.
return;
} else if (!state.isVotingMember(response.getSender())) {
// learners are not part of the replication quorum.
return;
}

QueryState queryState = leaderState.queryState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ public QueryState queryState() {

/**
* Returns the query sequence number to be acked by the log replication quorum
* to execute the currently waiting queries.
* to execute the currently waiting queries. Query sequencer numbers are not
* sent to learners since they are excluded from the replication quorum.
*/
public long querySequenceNumber() {
return queryState.querySequenceNumber();
public long querySequenceNumber(boolean forVotingMember) {
return forVotingMember ? queryState.querySequenceNumber() : 0;
}

public boolean isRequestBackoffResetTaskScheduled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ public boolean isKnownMember(RaftEndpoint endpoint) {
return members.contains(endpoint);
}

/**
* Returns true if the given endpoint is a voting member of the Raft group, false
* otherwise.
*/
public boolean isVotingMember(RaftEndpoint endpoint) {
return votingMembers.contains(endpoint);
}

public RaftGroupMembersView populate(RaftGroupMembersViewBuilder builder) {
return builder.setLogIndex(index).setMembers(members).setVotingMembers(votingMembers).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,14 @@ public boolean isKnownMember(RaftEndpoint endpoint) {
return effectiveGroupMembers.isKnownMember(endpoint);
}

/**
* Returns true if the given endpoint is a voting member in the effective group members, false
* otherwise.
*/
public boolean isVotingMember(RaftEndpoint endpoint) {
return effectiveGroupMembers.isVotingMember(endpoint);
}

/**
* Initializes the pre-candidate state for pre-voting and grants a vote for the
* local endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.microraft.impl;

import static io.microraft.MembershipChangeMode.ADD_LEARNER;
import static io.microraft.MembershipChangeMode.ADD_OR_PROMOTE_TO_FOLLOWER;
import static io.microraft.QueryPolicy.LINEARIZABLE;
import static io.microraft.RaftRole.FOLLOWER;
import static io.microraft.impl.local.SimpleStateMachine.applyValue;
Expand Down Expand Up @@ -362,7 +363,7 @@ public void when_followerFallsBehind_then_queryIsExecutedOnLeaderWithAppendEntri
group.dropMessagesTo(newFollower.getLocalEndpoint(), leader.getLocalEndpoint(),
AppendEntriesFailureResponse.class);

leader.changeMembership(newFollower.getLocalEndpoint(), ADD_LEARNER, 0).join();
leader.changeMembership(newFollower.getLocalEndpoint(), ADD_OR_PROMOTE_TO_FOLLOWER, 0).join();

group.dropMessagesTo(leader.getLocalEndpoint(), follower.getLocalEndpoint(), AppendEntriesRequest.class);

Expand Down Expand Up @@ -458,4 +459,36 @@ public void when_newCommitsAreDoneWhileThereAreMembershipChangeAndMultipleQuerie
assertThat(queryResult2.getCommitIndex()).isEqualTo(commitIndex);
}

@Test(timeout = 300_000)
public void when_learnerPresentInRaftGroup_then_queryQuorumIgnoresLearner() {
startGroup(3, TEST_RAFT_CONFIG);

RaftNodeImpl leader = group.waitUntilLeaderElected();
leader.replicate(applyValue("value1")).join();

RaftNodeImpl learner = group.createNewNode();
leader.changeMembership(learner.getLocalEndpoint(), ADD_LEARNER, 0).join();

List<RaftNodeImpl> otherNodes = group.getNodesExcept(leader.getLocalEndpoint());
for (RaftNodeImpl node : otherNodes) {
if (node != learner) {
group.dropMessagesTo(leader.getLocalEndpoint(), node.getLocalEndpoint(), AppendEntriesRequest.class);
group.dropMessagesTo(node.getLocalEndpoint(), leader.getLocalEndpoint(),
AppendEntriesSuccessResponse.class);
group.dropMessagesTo(node.getLocalEndpoint(), leader.getLocalEndpoint(),
AppendEntriesFailureResponse.class);
}
}

CompletableFuture<Ordered<Object>> queryFuture = leader.query(queryLastValue(), LINEARIZABLE, Optional.empty(),
Optional.empty());

try {
queryFuture.join();
fail();
} catch (CompletionException e) {
assertThat(e).hasCauseInstanceOf(NotLeaderException.class);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public static long getLeaderQuerySequenceNumber(RaftNodeImpl leader) {
Callable<Long> task = () -> {
LeaderState leaderState = leader.state().leaderState();
assertNotNull(leader.getLocalEndpoint() + " has no leader state!", leaderState);
return leaderState.querySequenceNumber();
return leaderState.querySequenceNumber(true);
};

return readRaftState(leader, task);
Expand Down

0 comments on commit c768342

Please sign in to comment.