Skip to content

Commit

Permalink
Implement barrier API
Browse files Browse the repository at this point in the history
  • Loading branch information
metanet committed Apr 5, 2023
1 parent c62c819 commit 8fcf980
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 4 deletions.
32 changes: 29 additions & 3 deletions microraft/src/main/java/io/microraft/RaftNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ static RaftNodeBuilder newBuilder() {
* @param queryPolicy
* the query policy to decide how to execute the given query
* @param minCommitIndex
* the optional non-negative minimum commit index that this Raft node
* has to
* (optional) minimum commit index that this Raft node's local commit
* index needs to advance
* @param timeout
* the optional duration to wait before timing out the query if the
* (optional) duration to wait before timing out the query if the
* local commit index cannot advance until the given commit index
* have in order to execute the given query.
* @param <T>
Expand All @@ -312,9 +312,35 @@ static RaftNodeBuilder newBuilder() {
* @see CannotReplicateException
* @see LaggingCommitIndexException
*/
@Nonnull
<T> CompletableFuture<Ordered<T>> query(@Nonnull Object operation, @Nonnull QueryPolicy queryPolicy,
Optional<Long> minCommitIndex, Optional<Duration> timeout);

/**
* The returned future is completed when the Raft node's last applied log index
* becomes greater than or equal to the given commit index. It means all
* operations up to that log index are committed and applied to the state
* machine. If timeout occurs before the Raft node's commit index advances up to
* the given commit index, the returned future is completed with
* {@link LaggingCommitIndexException}.
* <p>
* This is a barrier-like API which can be used for achieving read your writes.
* After a client replicates an operation via the leader, it can learn the
* commit index of its operation, and waits until a particular follower applies
* the operation on the returned commit index.
* <p>
* Basically, this method is a shorthand for RaftNode.query(no-op,
* QueryPolicy.EVENTUAL_CONSISTENCY, minCommitIndex, timeout).
*
* @param minCommitIndex
* (optional) minimum commit index that this Raft node's local commit
* index needs to advance
* @param timeout
* (optional) duration to wait before timing out the return future
*/
@Nonnull
CompletableFuture<Ordered<Object>> waitFor(long minCommitIndex, Duration timeout);

/**
* Replicates and commits the given membership change to the Raft group, if the
* given group members commit index is equal to the current group members commit
Expand Down
8 changes: 8 additions & 0 deletions microraft/src/main/java/io/microraft/impl/RaftNodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import io.microraft.impl.state.RaftTermState;
import io.microraft.impl.state.QueryState.QueryContainer;
import io.microraft.impl.statemachine.InternalCommitAware;
import io.microraft.impl.statemachine.NoOp;
import io.microraft.impl.task.HeartbeatTask;
import io.microraft.impl.task.LeaderBackoffResetTask;
import io.microraft.impl.task.LeaderElectionTimeoutTask;
Expand Down Expand Up @@ -708,6 +709,13 @@ public <T> CompletableFuture<Ordered<T>> query(@Nonnull Object operation, @Nonnu
return executeIfRunning(task, future);
}

@Nonnull
@Override
public CompletableFuture<Ordered<Object>> waitFor(long minCommitIndex, Duration timeout) {
return query(NoOp.INSTANCE, QueryPolicy.EVENTUAL_CONSISTENCY, Optional.of(minCommitIndex),
Optional.of(timeout));
}

@Nonnull
@Override
public CompletableFuture<Ordered<RaftGroupMembers>> changeMembership(@Nonnull RaftEndpoint endpoint,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import io.microraft.RaftEndpoint;
import io.microraft.impl.statemachine.NoOp;
import io.microraft.impl.util.OrderedFuture;
import io.microraft.statemachine.StateMachine;

Expand Down Expand Up @@ -204,7 +205,10 @@ public QueryContainer(Object operation, OrderedFuture future) {

public void run(long commitIndex, StateMachine stateMachine) {
try {
Object result = stateMachine.runOperation(commitIndex, operation);
Object result = null;
if (!(operation instanceof NoOp)) {
result = stateMachine.runOperation(commitIndex, operation);
}
future.complete(commitIndex, result);
} catch (Throwable t) {
fail(t);
Expand Down
10 changes: 10 additions & 0 deletions microraft/src/main/java/io/microraft/impl/statemachine/NoOp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.microraft.impl.statemachine;

public final class NoOp {

public static final NoOp INSTANCE = new NoOp();

private NoOp() {
}

}
120 changes: 120 additions & 0 deletions microraft/src/test/java/io/microraft/impl/QueryTimeoutTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,24 @@ public void when_followerQueriedWithFutureCommitIndex_then_queryTimesOutIfCommit
}
}

@Test(timeout = 300_000)
public void when_followerQueriedWithFutureCommitIndexAndNegativeTimeout_then_queryFailsImmediately() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();
RaftNodeImpl follower = group.getAnyNodeExcept(leader.getLocalEndpoint());

long commitIndex = leader.replicate(applyValue("value")).join().getCommitIndex();

try {
follower.query(queryLastValue(), EVENTUAL_CONSISTENCY, Optional.of(commitIndex + 1),
Optional.of(Duration.ofSeconds(-1))).join();
fail();
} catch (CompletionException e) {
assertThat(e).hasCauseInstanceOf(LaggingCommitIndexException.class);
}
}

@Test(timeout = 300_000)
public void when_leaderQueriedWithLeaderLeaseAndFutureCommitIndex_then_queryFailsImmediately() {
group = LocalRaftGroup.start(3);
Expand Down Expand Up @@ -266,4 +284,106 @@ public void when_leaderQueriedWithFutureCommitIndex_then_queryFailsWhenLeaderLea
}
}

@Test(timeout = 300_000)
public void when_followerWaitsForPastCommitIndex_then_barrierCompletesImmediately() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();
RaftNodeImpl follower = group.getAnyNodeExcept(leader.getLocalEndpoint());

long commitIndex1 = leader.replicate(applyValue("value1")).join().getCommitIndex();
long commitIndex2 = leader.replicate(applyValue("value2")).join().getCommitIndex();

eventually(() -> {
assertThat(getCommitIndex(follower)).isEqualTo(commitIndex2);
});

Ordered<Object> o = follower.waitFor(commitIndex1, Duration.ofSeconds(100)).join();

assertThat(o.getResult()).isNull();
assertThat(o.getCommitIndex()).isEqualTo(commitIndex2);
}

@Test(timeout = 300_000)
public void when_followerWaitsForCurrentCommitIndex_then_barrierCompletesImmediately() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();
RaftNodeImpl follower = group.getAnyNodeExcept(leader.getLocalEndpoint());

long commitIndex = leader.replicate(applyValue("value1")).join().getCommitIndex();

eventually(() -> {
assertThat(getCommitIndex(follower)).isEqualTo(commitIndex);
});

Ordered<Object> o = follower.waitFor(commitIndex, Duration.ofSeconds(100)).join();

assertThat(o.getResult()).isNull();
assertThat(o.getCommitIndex()).isEqualTo(commitIndex);
}

@Test(timeout = 300_000)
public void when_followerWaitsForFutureCommitIndex_then_barrierFailsWithTimeout() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();
RaftNodeImpl follower = group.getAnyNodeExcept(leader.getLocalEndpoint());
group.dropMessagesTo(leader.getLocalEndpoint(), follower.getLocalEndpoint(), AppendEntriesRequest.class);

long commitIndex = leader.replicate(applyValue("value1")).join().getCommitIndex();

try {
follower.waitFor(commitIndex, Duration.ofSeconds(5)).join();
fail();
} catch (CompletionException e) {
assertThat(e).hasCauseInstanceOf(LaggingCommitIndexException.class);
}
}

@Test(timeout = 300_000)
public void when_leaderWaitsForPastCommitIndex_then_barrierCompletesImmediately() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();

long commitIndex1 = leader.replicate(applyValue("value1")).join().getCommitIndex();
long commitIndex2 = leader.replicate(applyValue("value2")).join().getCommitIndex();

Ordered<Object> o = leader.waitFor(commitIndex1, Duration.ofSeconds(100)).join();

assertThat(o.getResult()).isNull();
assertThat(o.getCommitIndex()).isEqualTo(commitIndex2);
}

@Test(timeout = 300_000)
public void when_leaderWaitsForCurrentCommitIndex_then_barrierCompletesImmediately() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();

long commitIndex = leader.replicate(applyValue("value1")).join().getCommitIndex();

Ordered<Object> o = leader.waitFor(commitIndex, Duration.ofSeconds(100)).join();

assertThat(o.getResult()).isNull();
assertThat(o.getCommitIndex()).isEqualTo(commitIndex);
}

@Test(timeout = 300_000)
public void when_leaderWaitsForFutureCommitIndex_then_barrierFailsWithTimeout() {
group = LocalRaftGroup.start(3);

RaftNodeImpl leader = group.waitUntilLeaderElected();

long commitIndex = leader.replicate(applyValue("value1")).join().getCommitIndex();

try {
leader.waitFor(commitIndex + 1, Duration.ofSeconds(5)).join();
fail();
} catch (CompletionException e) {
assertThat(e).hasCauseInstanceOf(LaggingCommitIndexException.class);
}
}

}

0 comments on commit 8fcf980

Please sign in to comment.