Skip to content

Commit

Permalink
Add timeout parameter to RaftNode#query() method
Browse files Browse the repository at this point in the history
This commit implements a new RaftNode#query() override which contains a timeout
parameter. When a monotonic reads query is issued with
QueuePolicy#EVENTUAL_CONSISTENCY, a minimum commit index and a timeout,
RaftNode waits for the given timeout duration until the local commit index
becomes greater than or equal to the given minimum commit index.
  • Loading branch information
metanet committed Apr 2, 2023
1 parent b06a95c commit bf41594
Show file tree
Hide file tree
Showing 12 changed files with 739 additions and 190 deletions.
20 changes: 17 additions & 3 deletions microraft/src/main/java/io/microraft/QueryPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,24 @@

/**
* Policies to decide how a query operation will be executed on the state
* machine. Each policy offers different consistency guarantees.
* machine. Each policy offers a different consistency guarantee.
*
* @see RaftNode#query(Object, QueryPolicy, long)
* @see RaftNode#query(Object, QueryPolicy, java.util.Optional,
* java.util.Optional)
*/
public enum QueryPolicy {

/**
* Runs the query on the local state machine of any Raft node.
* <p>
* Reading stale value is highly likely if queries are issued on follower or
* learner Raft nodes when the leader Raft node is committing new operations.
* Reading stale value is likely if queries are issued on follower or learner
* Raft nodes while the leader Raft node is committing new operations.
* <p>
* Callers can achieve monotonic reads by keeping track of highest commit index
* they observed via return values of RaftNode's methods and passing it to
* queries. In this case, RaftNode executes a given query only if its local
* commit index is greater than equal to the given commit index.
*/
EVENTUAL_CONSISTENCY,

Expand All @@ -42,6 +51,11 @@ public enum QueryPolicy {
* Reading stale value is possible when a follower or a learner Raft node lags
* behind the leader but the staleness is bounded by the leader heartbeat
* timeout configuration.
* <p>
* Callers can achieve monotonic reads by keeping track of highest commit index
* they observed via return values of RaftNode's methods and passing it to
* queries. In this case, RaftNode executes a given query only if its local
* commit index is greater than equal to the given commit index.
*/
BOUNDED_STALENESS,

Expand Down
165 changes: 113 additions & 52 deletions microraft/src/main/java/io/microraft/RaftNode.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import io.microraft.QueryPolicy;
import io.microraft.RaftEndpoint;
import io.microraft.RaftNode;
import java.util.Optional;

/**
* Thrown when a Raft node's current commit index is smaller than the commit
* index specified in a {@link RaftNode#query(Object, QueryPolicy, long)} call.
* This exception means that the Raft node instance cannot execute the given
* query by preserving the monotonicity of the observed state. Please see the
* <i>Section: 6.4 Processing read-only queries more efficiently</i> of the Raft
* index specified in a {@link RaftNode#query(Object, QueryPolicy, long)} or
* {@link RaftNode#query(Object, QueryPolicy, Optional, Optional)} call. This
* exception means that the Raft node instance cannot execute the given query by
* preserving the monotonicity of the observed state. Please see the <i>Section:
* 6.4 Processing read-only queries more efficiently</i> of the Raft
* dissertation for more details.
*/
public class LaggingCommitIndexException extends RaftException {
Expand Down
189 changes: 87 additions & 102 deletions microraft/src/main/java/io/microraft/impl/RaftNodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,25 @@
import static java.util.Collections.shuffle;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -92,6 +94,7 @@
import io.microraft.impl.state.RaftGroupMembersState;
import io.microraft.impl.state.RaftState;
import io.microraft.impl.state.RaftTermState;
import io.microraft.impl.state.QueryState.QueryContainer;
import io.microraft.impl.task.HeartbeatTask;
import io.microraft.impl.task.LeaderBackoffResetTask;
import io.microraft.impl.task.LeaderElectionTimeoutTask;
Expand All @@ -103,7 +106,6 @@
import io.microraft.impl.task.RaftStateSummaryPublishTask;
import io.microraft.impl.task.ReplicateTask;
import io.microraft.impl.task.TransferLeadershipTask;
import io.microraft.impl.util.Long2ObjectHashMap;
import io.microraft.impl.util.OrderedFuture;
import io.microraft.lifecycle.RaftNodeLifecycleAware;
import io.microraft.model.RaftModelFactory;
Expand Down Expand Up @@ -162,7 +164,6 @@ public final class RaftNodeImpl implements RaftNode {
private final RaftStore store;
private final RaftNodeReportListener raftNodeReportListener;
private final String localEndpointStr;
private final Long2ObjectHashMap<OrderedFuture> futures = new Long2ObjectHashMap<>();

private final Random random;
private final Clock clock;
Expand Down Expand Up @@ -616,12 +617,11 @@ public CompletableFuture<Ordered<Object>> terminate() {
try {
if (shouldTerminate) {
toFollower(state.term());
invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
setStatus(TERMINATED);
state.completeLeadershipTransfer(newNotLeaderException());
} else {
setStatus(TERMINATED);
}
setStatus(TERMINATED);
state.invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
state.invalidateScheduledQueries();
state.completeLeadershipTransfer(newNotLeaderException());
} catch (Throwable t) {
failure = t;
LOGGER.error("Failure during termination of " + localEndpointStr, t);
Expand Down Expand Up @@ -704,7 +704,18 @@ public <T> CompletableFuture<Ordered<T>> replicate(@Nonnull Object operation) {
public <T> CompletableFuture<Ordered<T>> query(@Nonnull Object operation, @Nonnull QueryPolicy queryPolicy,
long minCommitIndex) {
OrderedFuture<T> future = new OrderedFuture<>();
Runnable task = new QueryTask(this, requireNonNull(operation), queryPolicy, minCommitIndex, future);
Runnable task = new QueryTask(this, requireNonNull(operation), queryPolicy, Math.max(minCommitIndex, 0L),
Optional.empty(), future);
return executeIfRunning(task, future);
}

@Nonnull
@Override
public <T> CompletableFuture<Ordered<T>> query(@Nonnull Object operation, @Nonnull QueryPolicy queryPolicy,
Optional<Long> minCommitIndex, Optional<Duration> timeout) {
OrderedFuture<T> future = new OrderedFuture<>();
Runnable task = new QueryTask(this, requireNonNull(operation), queryPolicy,
Math.max(minCommitIndex.orElse(0L), 0L), timeout, future);
return executeIfRunning(task, future);
}

Expand Down Expand Up @@ -823,6 +834,13 @@ private RuntimeException newNotRunningException() {
return new IllegalStateException(localEndpointStr + " is not running!");
}

public RaftException newLaggingCommitIndexException(long minCommitIndex) {
assert minCommitIndex > state.commitIndex()
: "Cannot create LaggingCommitIndexException since min commit index: " + minCommitIndex
+ " is not greater than commit index: " + state.commitIndex();
return new LaggingCommitIndexException(state.commitIndex(), minCommitIndex, state.leader());
}

/**
* Returns the leader Raft endpoint currently known by the local Raft node. The
* returned leader information might be stale.
Expand Down Expand Up @@ -947,7 +965,7 @@ private void applyLogEntry(LogEntry entry) {
}

state.lastApplied(logIndex);
completeFuture(logIndex, response);
state.completeFuture(logIndex, response);
}

/**
Expand All @@ -966,60 +984,6 @@ public RaftState state() {
return state;
}

/**
* Registers the given future with its {@code entryIndex}. This future will be
* notified when the corresponding operation is committed or its log entry is
* reverted.
*
* @param entryIndex
* the log index to register the given future
* @param future
* the future object to register
*/
public void registerFuture(long entryIndex, OrderedFuture future) {
OrderedFuture f = futures.put(entryIndex, future);
assert f == null : localEndpointStr + " future object is already registered for entry index: " + entryIndex;
}

private void completeFuture(long logIndex, Object result) {
OrderedFuture f = futures.remove(logIndex);
if (f != null) {
if (result instanceof Throwable) {
f.fail((Throwable) result);
} else {
f.complete(logIndex, result);
}
}
}

/**
* Completes futures with the given exception for indices greater than or equal
* to the given index. Note that the given index is inclusive.
*
* @param startIndexInclusive
* the (inclusive) starting log index to complete registered futures
* @param e
* the RaftException object to complete registered futures
*/
public void invalidateFuturesFrom(long startIndexInclusive, RaftException e) {
int count = 0;
Iterator<Entry<Long, OrderedFuture>> iterator = futures.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, OrderedFuture> entry = iterator.next();
long index = entry.getKey();
if (index >= startIndexInclusive) {
entry.getValue().fail(e);
iterator.remove();
count++;
}
}

if (count > 0) {
LOGGER.warn("{} Invalidated {} futures from log index: {} with: {}", localEndpointStr, count,
startIndexInclusive, e);
}
}

private void takeSnapshot(RaftLog log, long snapshotIndex) {
if (snapshotIndex == log.snapshotIndex()) {
LOGGER.warn("{} is skipping to take snapshot at index: {} because it is the latest snapshot index.",
Expand Down Expand Up @@ -1168,10 +1132,11 @@ public void installSnapshot(SnapshotEntry snapshotEntry) {
}

state.lastApplied(snapshotEntry.getIndex());
invalidateFuturesUntil(snapshotEntry.getIndex(), new IndeterminateStateException(state.leader()));

LOGGER.info("{} snapshot is installed at commit index: {}", localEndpointStr, snapshotEntry.getIndex());

state.invalidateFuturesUntil(snapshotEntry.getIndex(), new IndeterminateStateException(state.leader()));
runScheduledQueries();

// log.setSnapshot() truncates stale log entries from disk.
// we are submitting an async flush task here to flush those
// changes to the storage.
Expand Down Expand Up @@ -1646,13 +1611,14 @@ private void commitEntries(long commitIndex) {

if (status == ACTIVE) {
applyLogEntries();
broadcastAppendEntriesRequest();
tryRunQueries();
} else {
tryRunQueries();
applyLogEntries();
broadcastAppendEntriesRequest();
runScheduledQueries();
}

broadcastAppendEntriesRequest();
}

public void tryAckQuery(long querySequenceNumber, RaftEndpoint sender) {
Expand Down Expand Up @@ -1688,20 +1654,38 @@ public void tryRunQueries() {
return;
}

Collection<Entry<Object, OrderedFuture>> operations = queryState.queries();
Collection<QueryContainer> operations = queryState.queries();

if (LOGGER.isDebugEnabled()) {
LOGGER.debug(localEndpointStr + " running " + operations.size() + " queries at commit index: " + commitIndex
+ ", query sequence number: " + queryState.querySequenceNumber());
}

for (Entry<Object, OrderedFuture> t : operations) {
runQuery(t.getKey(), state.commitIndex(), t.getValue());
for (QueryContainer query : operations) {
query.run(commitIndex, stateMachine);
}

queryState.reset();
}

public void runScheduledQueries() {
if (status == TERMINATED) {
state.invalidateScheduledQueries();
return;
}

long lastApplied = state.lastApplied();
Collection<QueryContainer> queries = state.collectScheduledQueriesToExecute();
for (QueryContainer query : queries) {
query.run(lastApplied, stateMachine);
}

if (queries.size() > 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug("{} executed {} waiting queries at log index: {}.", localEndpointStr, queries.size(),
lastApplied);
}
}

/**
* Executes the given query operation and sets execution result to the future if
* the current commit index is greater than or equal to the given commit index.
Expand All @@ -1720,17 +1704,40 @@ public void tryRunQueries() {
* if the current commit index is smaller than the given commit
* index
*/
public void runQuery(Object operation, long minCommitIndex, OrderedFuture future) {
public void runOrScheduleQuery(QueryContainer query, long minCommitIndex, Optional<Duration> timeout) {
try {
long commitIndex = state.commitIndex();
if (commitIndex >= minCommitIndex) {
Object result = stateMachine.runOperation(commitIndex, operation);
future.complete(commitIndex, result);
long lastApplied = state.lastApplied();
if (lastApplied >= minCommitIndex) {
query.run(lastApplied, stateMachine);
} else if (timeout.isPresent()) {
long timeoutNanos = timeout.get().toNanos();
if (timeoutNanos <= 0) {
query.fail(newLaggingCommitIndexException(minCommitIndex));
} else {
state.addScheduledQuery(minCommitIndex, query);
executor.schedule(() -> {
try {
if (state.removeScheduledQuery(minCommitIndex, query)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"{} query waiting to be executed at commit index: {} timed out! Current commit index: {}",
localEndpointStr, minCommitIndex, state.commitIndex());
}
query.fail(newLaggingCommitIndexException(minCommitIndex));
}
} catch (Throwable t) {
LOGGER.error(localEndpointStr + " timing out of query for expected commit index: "
+ minCommitIndex + " failed.", t);
query.fail(t);
}
}, timeoutNanos, NANOSECONDS);
}
} else {
future.fail(new LaggingCommitIndexException(state.commitIndex(), minCommitIndex, state.leader()));
query.fail(newLaggingCommitIndexException(minCommitIndex));
}
} catch (Throwable t) {
future.fail(t);
LOGGER.error(localEndpointStr + " query scheduling failed with {}", t);
query.fail(t);
}
}

Expand Down Expand Up @@ -1819,7 +1826,7 @@ public boolean demoteToFollowerIfQuorumHeartbeatTimeoutElapsed() {
"{} Demoting to {} since not received append entries responses from majority recently. Latest quorum timestamp: {}",
localEndpointStr, FOLLOWER, quorumTimestamp.get());
toFollower(state.term());
invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
state.invalidateFuturesFrom(state.commitIndex() + 1, new IndeterminateStateException());
}

return demoteToFollower;
Expand All @@ -1834,29 +1841,6 @@ private Optional<Long> getQuorumHeartbeatTimestamp() {
return Optional.of(leaderState.quorumResponseTimestamp(state.logReplicationQuorumSize(), clock.millis()));
}

/**
* Completes futures with the given exception for indices smaller than or equal
* to the given index. Note that the given index is inclusive.
*/
private void invalidateFuturesUntil(long endIndexInclusive, RaftException e) {
int count = 0;
Iterator<Entry<Long, OrderedFuture>> iterator = futures.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Long, OrderedFuture> entry = iterator.next();
long index = entry.getKey();
if (index <= endIndexInclusive) {
entry.getValue().fail(e);
iterator.remove();
count++;
}
}

if (count > 0) {
LOGGER.warn("{} Completed {} futures until log index: {} with {}", localEndpointStr, count,
endIndexInclusive, e);
}
}

/**
* Switches this node to the follower role by clearing the known leader endpoint
* and (pre) candidate states, and updating the term. If this Raft node was
Expand Down Expand Up @@ -1904,4 +1888,5 @@ public void toSingletonLeader() {
public Clock getClock() {
return clock;
}

}

0 comments on commit bf41594

Please sign in to comment.