Skip to content

Commit

Permalink
merge: #10707
Browse files Browse the repository at this point in the history
10707: Periodically log the progress while partition is waiting to be ready r=deepthidevaki a=deepthidevaki

## Description

Introduced a throttled logger that logs only once every configured interval. Used this logger to log when new entries are committed while a node is waiting to be ready. Since using the thtrottle logger, it will not flood the log by logging each commit. 

To enable this, this PR also refactored `setCommitIndex`. Previously, setting first commit index and check if node is ready is done for every commit. This is unnecessary, as it is relevant only until the node is ready. So this is moved to a commit listener which is only  active while the node is waiting to be ready. So that path is not executed unnecessarily during normal execution. 

Also remove "volatile" from firstCommitIndex. It is not accessed outside of raft context. There are also other volatile fields, which I think can be removed. But they need to be done carefully. So I will leave it out of this PR. 

## Related issues

closes #9963 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 13, 2022
2 parents 17e6b64 + 6ae07c7 commit 17161f6
Show file tree
Hide file tree
Showing 5 changed files with 386 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package io.atomix.raft;

/**
* Will be notified when a new batch is committed. It is not guaranteed to get notified for each
* index. For example, if records at index 1 to 5 are committed together, the listener can be
* invoked just once with index 5.
*/
@FunctionalInterface
public interface RaftCommitListener {

Expand Down
39 changes: 29 additions & 10 deletions atomix/cluster/src/main/java/io/atomix/raft/impl/RaftContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
import io.camunda.zeebe.util.logging.ThrottledLogger;
import java.time.Duration;
import java.util.Objects;
import java.util.Random;
Expand Down Expand Up @@ -120,7 +121,7 @@ public class RaftContext implements AutoCloseable, HealthMonitorable {
private volatile long term;
private MemberId lastVotedFor;
private long commitIndex;
private volatile long firstCommitIndex;
private long firstCommitIndex;
private volatile boolean started;
private EntryValidator entryValidator;
// Used for randomizing election timeout
Expand Down Expand Up @@ -216,6 +217,8 @@ public RaftContext(
// Register protocol listeners.
registerHandlers(protocol);
started = true;

addCommitListener(new AwaitingReadyCommitListener());
}

private void setSnapshot(final PersistedSnapshot persistedSnapshot) {
Expand Down Expand Up @@ -413,8 +416,7 @@ public void notifyCommitListeners(final long lastCommitIndex) {
*
* @param committedEntry the most recently committed entry
*/
public void notifyCommitListeners(final IndexedRaftLogEntry committedEntry) {
commitListeners.forEach(listener -> listener.onCommit(committedEntry.index()));
public void notifyCommittedEntryListeners(final IndexedRaftLogEntry committedEntry) {
committedEntryListeners.forEach(listener -> listener.onCommit(committedEntry));
}

Expand All @@ -439,14 +441,8 @@ public long setCommitIndex(final long commitIndex) {
if (configurationIndex > previousCommitIndex && configurationIndex <= commitIndex) {
cluster.commit();
}
setFirstCommitIndex(commitIndex);
// On start up, set the state to READY after the follower has caught up with the leader
// https://github.com/zeebe-io/zeebe/issues/4877
if (state == State.ACTIVE && commitIndex >= firstCommitIndex) {
state = State.READY;
stateChangeListeners.forEach(l -> l.accept(state));
}
replicationMetrics.setCommitIndex(commitIndex);
notifyCommitListeners(commitIndex);
}
return previousCommitIndex;
}
Expand Down Expand Up @@ -1159,4 +1155,27 @@ private enum MissedSnapshotReplicationEvents {
STARTED,
COMPLETED
}

/** Commit listener is active only until the server is ready * */
final class AwaitingReadyCommitListener implements RaftCommitListener {
private final Logger throttledLogger = new ThrottledLogger(log, Duration.ofSeconds(30));

@Override
public void onCommit(final long index) {
setFirstCommitIndex(index);
// On start up, set the state to READY after the follower has caught up with the leader
// https://github.com/zeebe-io/zeebe/issues/4877
if (index >= firstCommitIndex) {
state = State.READY;
log.info("Commit index is {}. RaftServer is ready", index);
stateChangeListeners.forEach(l -> l.accept(state));
removeCommitListener(this);
} else {
throttledLogger.info(
"Commit index is {}. RaftServer is ready only after it has committed events up to index {}",
commitIndex,
firstCommitIndex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ private void replicate(final IndexedRaftLogEntry indexed, final AppendListener a
// entries properly on fail over
if (commitError == null) {
appendListener.onCommit(indexed);
raft.notifyCommitListeners(indexed);
raft.notifyCommittedEntryListeners(indexed);
} else {
appendListener.onCommitError(indexed, commitError);
// replicating the entry will be retried on the next append request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ protected void appendEntries(
final long previousCommitIndex = raft.setCommitIndex(commitIndex);
if (previousCommitIndex < commitIndex) {
log.trace("Committed entries up to index {}", commitIndex);
raft.notifyCommitListeners(commitIndex);
}

// Make sure all entries are flushed before ack to ensure we have persisted what we acknowledge
Expand Down

0 comments on commit 17161f6

Please sign in to comment.