Skip to content

Commit

Permalink
Introduce a flow control to fix exponential backoff behaviour for CP …
Browse files Browse the repository at this point in the history
…subsystem [HZ-2702] (#25055)

Added `flowControlSequenceNumber` to Append/InstallSnapshot requests and
responses to perform matching between them.
This allows reset the backoff only for the corresponding request.

Fixes #24958

Breaking changes (list specific methods/types/messages):
* `AppendRequest`, `InstallSnapshot`, `AppendSuccessResponse`,`AppendFailureResponse`
  • Loading branch information
arodionov committed Jul 25, 2023
1 parent b394495 commit 8e3ffce
Show file tree
Hide file tree
Showing 17 changed files with 682 additions and 54 deletions.
9 changes: 8 additions & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ The class com.hazelcast.jet.kafka.impl.ResumeTransactionUtil contains
code derived from the Apache Flink project.

The class com.hazelcast.internal.util.ConcurrentReferenceHashMap contains code written by Doug Lea
and updated within the WildFly project (https://github.com/wildfly/wildfly).
and updated within the WildFly project (https://github.com/wildfly/wildfly).

The classes:
com.hazelcast.cp.internal.raft.impl.state.FollowerState
com.hazelcast.cp.internal.raft.impl.state.FollowerStateTest
com.hazelcast.cp.internal.raft.impl.SlowFollowerBackoffTest

contain code originating from the MicroRaft project (https://github.com/MicroRaft/MicroRaft)
5 changes: 5 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<suppress checks="Header" files="com[\\/]hazelcast[\\/]internal[\\/]cluster[\\/]fd[\\/]PhiAccrualFailureDetector"/>
<suppress checks="Header" files="com[\\/]hazelcast[\\/]instance[\\/]impl[\\/]MobyNames"/>

<!-- Suppress checking of copyright notice, adapted from the MicroRaft project -->
<suppress checks="Header" files="com[\\/]hazelcast[\\/]cp[\\/]internal[\\/]raft[\\/]impl[\\/]state[\\/]FollowerState"/>
<suppress checks="Header" files="com[\\/]hazelcast[\\/]cp[\\/]internal[\\/]raft[\\/]impl[\\/]state[\\/]FollowerStateTest"/>
<suppress checks="Header" files="com[\\/]hazelcast[\\/]cp[\\/]internal[\\/]raft[\\/]impl[\\/]SlowFollowerBackoffTest"/>

<!-- Suppress checking of copyright notice, adapted from Agrona project -->
<suppress checks="Header" files="com[\\/]hazelcast[\\/]internal[\\/]util[\\/]HashUtil"/>
<suppress checks="Header" files="com[\\/]hazelcast[\\/]internal[\\/]util[\\/]QuickMath"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,14 +683,15 @@ public void sendAppendRequest(RaftEndpoint follower) {
if (nextIndex <= raftLog.snapshotIndex()
&& (!raftLog.containsLogEntry(nextIndex) || (nextIndex > 1 && !raftLog.containsLogEntry(nextIndex - 1)))) {
InstallSnapshot installSnapshot = new InstallSnapshot(state.localEndpoint(), state.term(), raftLog.snapshot(),
leaderState.queryRound());
leaderState.queryRound(), followerState.setAppendRequestBackoff());
if (logger.isFineEnabled()) {
logger.fine("Sending " + installSnapshot + " to " + follower + " since next index: " + nextIndex
+ " <= snapshot index: " + raftLog.snapshotIndex());
}

// no need to submit the flush task here because we send committed state...
raftIntegration.send(installSnapshot, follower);
// due to the potentially large size of the snapshot, set the backoff to the maximum value
followerState.setMaxAppendRequestBackoff();
scheduleAppendAckResetTask();
return;
Expand Down Expand Up @@ -735,8 +736,12 @@ public void sendAppendRequest(RaftEndpoint follower) {
shouldBackoff = false;
}

if (shouldBackoff) {
followerState.setAppendRequestBackoff();
}

AppendRequest request = new AppendRequest(getLocalMember(), state.term(), prevEntryTerm, prevEntryIndex,
state.commitIndex(), entries, leaderState.queryRound());
state.commitIndex(), entries, leaderState.queryRound(), followerState.flowControlSequenceNumber());

if (logger.isFineEnabled()) {
logger.fine("Sending " + request + " to " + follower + " with next index: " + nextIndex);
Expand All @@ -753,7 +758,6 @@ public void sendAppendRequest(RaftEndpoint follower) {
}

if (shouldBackoff) {
followerState.setAppendRequestBackoff();
scheduleAppendAckResetTask();
}
}
Expand Down Expand Up @@ -1437,22 +1441,24 @@ private class AppendRequestBackoffResetTask extends RaftNodeStatusAwareTask {
@Override
protected void innerRun() {
appendRequestBackoffResetTaskScheduled = false;

LeaderState leaderState = state.leaderState();
if (leaderState == null) {
return;
}

if (leaderState != null) {
Map<RaftEndpoint, FollowerState> followerStates = leaderState.getFollowerStates();
for (Entry<RaftEndpoint, FollowerState> entry : followerStates.entrySet()) {
FollowerState followerState = entry.getValue();
if (!followerState.isAppendRequestBackoffSet()) {
continue;
}
Map<RaftEndpoint, FollowerState> followerStates = leaderState.getFollowerStates();
for (Entry<RaftEndpoint, FollowerState> entry : followerStates.entrySet()) {
FollowerState followerState = entry.getValue();
if (followerState.isAppendRequestBackoffSet()) {
if (followerState.completeAppendRequestBackoffRound()) {
// This follower has not sent a response to the last append request.
// Send another append request
sendAppendRequest(entry.getKey());
} else {
// Schedule the task again, we still have backoff flag set followers
scheduleAppendAckResetTask();
}
// Schedule the task again, we still have backoff flag set followers
scheduleAppendAckResetTask();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;

import java.io.EOFException;
import java.io.IOException;

/**
Expand All @@ -39,14 +40,17 @@ public class AppendFailureResponse implements IdentifiedDataSerializable {
private RaftEndpoint follower;
private int term;
private long expectedNextIndex;
private long flowControlSequenceNumber;

public AppendFailureResponse() {
}

public AppendFailureResponse(RaftEndpoint follower, int term, long expectedNextIndex) {
public AppendFailureResponse(RaftEndpoint follower, int term, long expectedNextIndex,
long flowControlSequenceNumber) {
this.follower = follower;
this.term = term;
this.expectedNextIndex = expectedNextIndex;
this.flowControlSequenceNumber = flowControlSequenceNumber;
}

public RaftEndpoint follower() {
Expand All @@ -61,6 +65,10 @@ public long expectedNextIndex() {
return expectedNextIndex;
}

public long flowControlSequenceNumber() {
return flowControlSequenceNumber;
}

@Override
public int getFactoryId() {
return RaftDataSerializerHook.F_ID;
Expand All @@ -76,19 +84,26 @@ public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(term);
out.writeObject(follower);
out.writeLong(expectedNextIndex);
out.writeLong(flowControlSequenceNumber);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
term = in.readInt();
follower = in.readObject();
expectedNextIndex = in.readLong();
try {
flowControlSequenceNumber = in.readLong();
// TODO RU_COMPAT_5_3 added for Version 5.3 compatibility. Should be removed at Version 5.5
} catch (EOFException e) {
flowControlSequenceNumber = -1;
}
}

@Override
public String toString() {
return "AppendFailureResponse{" + "follower=" + follower + ", term=" + term + ", expectedNextIndex="
+ expectedNextIndex + '}';
+ expectedNextIndex + ", flowControlSequenceNumber=" + flowControlSequenceNumber + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;

Expand All @@ -46,20 +47,22 @@ public class AppendRequest implements IdentifiedDataSerializable {
private long leaderCommitIndex;
private LogEntry[] entries;
private long queryRound;
private long flowControlSequenceNumber;

public AppendRequest() {
}

@SuppressFBWarnings("EI_EXPOSE_REP2")
public AppendRequest(RaftEndpoint leader, int term, int prevLogTerm, long prevLogIndex, long leaderCommitIndex,
LogEntry[] entries, long queryRound) {
LogEntry[] entries, long queryRound, long flowControlSequenceNumber) {
this.leader = leader;
this.term = term;
this.prevLogTerm = prevLogTerm;
this.prevLogIndex = prevLogIndex;
this.leaderCommitIndex = leaderCommitIndex;
this.entries = entries;
this.queryRound = queryRound;
this.flowControlSequenceNumber = flowControlSequenceNumber;
}

public RaftEndpoint leader() {
Expand Down Expand Up @@ -95,6 +98,10 @@ public long queryRound() {
return queryRound;
}

public long flowControlSequenceNumber() {
return flowControlSequenceNumber;
}

@Override
public int getFactoryId() {
return RaftDataSerializerHook.F_ID;
Expand All @@ -119,6 +126,7 @@ public void writeData(ObjectDataOutput out) throws IOException {
}

out.writeLong(queryRound);
out.writeLong(flowControlSequenceNumber);
}

@Override
Expand All @@ -136,13 +144,20 @@ public void readData(ObjectDataInput in) throws IOException {
}

queryRound = in.readLong();

try {
flowControlSequenceNumber = in.readLong();
// TODO RU_COMPAT_5_3 added for Version 5.3 compatibility. Should be removed at Version 5.5
} catch (EOFException e) {
flowControlSequenceNumber = -1;
}
}

@Override
public String toString() {
return "AppendRequest{" + "leader=" + leader + ", term=" + term + ", prevLogTerm=" + prevLogTerm
+ ", prevLogIndex=" + prevLogIndex + ", leaderCommitIndex=" + leaderCommitIndex + ", queryRound=" + queryRound
+ ", entries=" + Arrays.toString(entries) + '}';
+ ", flowControlSequenceNumber=" + flowControlSequenceNumber + ", entries=" + Arrays.toString(entries) + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;

import java.io.EOFException;
import java.io.IOException;

/**
Expand All @@ -40,15 +41,18 @@ public class AppendSuccessResponse implements IdentifiedDataSerializable {
private int term;
private long lastLogIndex;
private long queryRound;
private long flowControlSequenceNumber;

public AppendSuccessResponse() {
}

public AppendSuccessResponse(RaftEndpoint follower, int term, long lastLogIndex, long queryRound) {
public AppendSuccessResponse(RaftEndpoint follower, int term, long lastLogIndex, long queryRound,
long flowControlSequenceNumber) {
this.follower = follower;
this.term = term;
this.lastLogIndex = lastLogIndex;
this.queryRound = queryRound;
this.flowControlSequenceNumber = flowControlSequenceNumber;
}

public RaftEndpoint follower() {
Expand All @@ -67,6 +71,10 @@ public long queryRound() {
return queryRound;
}

public long flowControlSequenceNumber() {
return flowControlSequenceNumber;
}

@Override
public int getFactoryId() {
return RaftDataSerializerHook.F_ID;
Expand All @@ -83,6 +91,7 @@ public void writeData(ObjectDataOutput out) throws IOException {
out.writeObject(follower);
out.writeLong(lastLogIndex);
out.writeLong(queryRound);
out.writeLong(flowControlSequenceNumber);
}

@Override
Expand All @@ -91,12 +100,18 @@ public void readData(ObjectDataInput in) throws IOException {
follower = in.readObject();
lastLogIndex = in.readLong();
queryRound = in.readLong();
try {
flowControlSequenceNumber = in.readLong();
// TODO RU_COMPAT_5_3 added for Version 5.3 compatibility. Should be removed at Version 5.5
} catch (EOFException e) {
flowControlSequenceNumber = -1;
}
}

@Override
public String toString() {
return "AppendSuccessResponse{" + "follower=" + follower + ", term=" + term + ", lastLogIndex="
+ lastLogIndex + ", queryRound=" + queryRound + '}';
+ lastLogIndex + ", queryRound=" + queryRound + ", flowControlSequenceNumber=" + flowControlSequenceNumber + '}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;

import java.io.EOFException;
import java.io.IOException;

/**
Expand All @@ -41,15 +42,18 @@ public class InstallSnapshot implements IdentifiedDataSerializable {
private int term;
private SnapshotEntry snapshot;
private long queryRound;
private long flowControlSequenceNumber;

public InstallSnapshot() {
}

public InstallSnapshot(RaftEndpoint leader, int term, SnapshotEntry snapshot, long queryRound) {
public InstallSnapshot(RaftEndpoint leader, int term, SnapshotEntry snapshot, long queryRound,
long flowControlSequenceNumber) {
this.leader = leader;
this.term = term;
this.snapshot = snapshot;
this.queryRound = queryRound;
this.flowControlSequenceNumber = flowControlSequenceNumber;
}

public RaftEndpoint leader() {
Expand All @@ -68,6 +72,10 @@ public long queryRound() {
return queryRound;
}

public long flowControlSequenceNumber() {
return flowControlSequenceNumber;
}

@Override
public int getFactoryId() {
return RaftDataSerializerHook.F_ID;
Expand All @@ -84,6 +92,7 @@ public void writeData(ObjectDataOutput out) throws IOException {
out.writeInt(term);
out.writeObject(snapshot);
out.writeLong(queryRound);
out.writeLong(flowControlSequenceNumber);
}

@Override
Expand All @@ -92,12 +101,18 @@ public void readData(ObjectDataInput in) throws IOException {
term = in.readInt();
snapshot = in.readObject();
queryRound = in.readLong();
try {
flowControlSequenceNumber = in.readLong();
// TODO RU_COMPAT_5_3 added for Version 5.3 compatibility. Should be removed at Version 5.5
} catch (EOFException e) {
flowControlSequenceNumber = -1;
}
}

@Override
public String toString() {
return "InstallSnapshot{" + "leader=" + leader + ", term=" + term + ", snapshot=" + snapshot + ", queryRound="
+ queryRound + '}';
+ queryRound + ", flowControlSequenceNumber=" + flowControlSequenceNumber + '}';
}

}

0 comments on commit 8e3ffce

Please sign in to comment.