New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RATIS-1481. make state upgradate in notifyStateMachineToInstallSnapshot serialized #573
Conversation
10baf26
to
23b623d
Compare
The time for the user-defined state machine to install snapshot right now only leaves 1 sec. This might be too tight. Shall we make it configurable? @szetszwo |
Yes, it is a good idea to make it configurable. |
.notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex); | ||
// wait for at most 5 seconds for statemachine to install snapshot | ||
TermIndex reply = future.get(RaftServerConfigKeys.Notification.notifyInstallSnapshotTimeout( | ||
getRaftServer().getProperties()).toLong(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are essentially going to be blocking the main thread for 5 seconds (or more if configured that way). I don't think we should be blocking the main thread.
Also, inProgressInstallSnapshotRequest is not being set to 0 when the future completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main thread blocking should be fine as it essentially can do nothing but wait for the snapshot done.
@Xushaohong, thanks for working on this.
This is intentional so that we don't block the main thread while the SM is installing the snapshot. Instead we send a IN_PROGRESS notification back to the leader. |
@hanishakoneru The problem is due to To avoid this, I think the state change part and the following check part should be serialized. |
Leader sends a snapshot (request or notification) to follower when either LogAppender#shouldInstallSnapshot() or GrpcLogAppender#shouldNotifyToInstallSnapshot() are true. And once these are true, without a state change acknowledgement from the follower, Leader will not send an appendEntries request. Also, if the follower does not send a response back till it finishes installing the snapshot, the Leader will not be able to update the lastRpcResponseTime for this follower. This could lead to the leader stepping down thinking that it has lost the connection with this follower. |
Actually, Leader will send an appendEntries request. It keeps send empty appendEntries requests to follower to keep in contact with follower as shouldNotifyToInstallSnapshot() is true as the below.
Hence The follower will send back INCONSISTENCY to the leader and the leader will updateNextIndex.
This is also explained above. append log (heartbeat) will keep the follower in contact! @hanishakoneru PTAL |
@Xushaohong, please bear with me as I try to understand what is happening here and root causing the issue.
What is the follower sends SNAPSHOT_INSTALLATION_IN_PROGRESS status instead of INCONSISTENCY status to the appendEntries request? IIUC, the issue is occurring because of this inconsistency reply to appendEntries request from Leader. |
Sry, I do not get this since I have not modified the follower to send back this status.
Yep, this is nearly the root cause. Precondition: The statemachine installing snapshot(let's say thread A) and return progress of installing snapshot(lets say t thread B) are asynchronous. The update of the index(commit index and next index) is executed serially by thread A. If we keep the non-blocking style to let the follower respond to the leader with its instant status, there are two problems.
My solution: let update of index be executed by the main thread B, so the follower can ensure to send out the SNAPSHOT_INSTALLED after update of index.
My solution: replace |
Thanks @Xushaohong for the detailed explanation. Appreciate it. I understand the problem now and your change makes sense to me. The overall logic LGTM. I will take closer look at it tomorrow. Can you please check the CI failures if they are related. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch LGTM overall.
AppendEntries should not return INCONSISTENCY result when a snapshot is being installed. It would be good to add another result type to AppendEntries request. We can open another Jira to address that.
@@ -1561,7 +1561,8 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt | |||
} | |||
|
|||
if (reply != null) { | |||
if (request.hasLastRaftConfigurationLogEntryProto()) { | |||
if (request.hasLastRaftConfigurationLogEntryProto() && reply.getResult() == | |||
InstallSnapshotResult.SNAPSHOT_INSTALLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SNAPSHOT_INSTALLED result is returned when SM is notified to install snapshot. For the case when snapshot is sent as part of installSnapshot request by Ratis, SUCCESS is returned as the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, I missed that. Now I added the difference check to reduce the frequency of resetting conf. Coz this is redundant to overwrite the conf for each request.
stateMachine.event().notifyConfigurationChanged(newConfLogEntryProto.getTerm(), newConfLogEntryProto.getIndex(), | ||
newConfLogEntryProto.getConfigurationEntry()); | ||
LogEntryProto newConfLogEntryProto = request.getLastRaftConfigurationLogEntryProto(); | ||
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(newConfLogEntryProto))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not call state.writeRaftConfiguration(newConfLogEntryProto) here, a bootstrapping follower might not write it's configuration to disk at all till the StateMachine applies a log containing a configuration entry.
ServerState#writeRaftConfiguration() is called only from 2 places - installSnapshot and applyLog. If the conf is persisted on disk some other way also, then we are good. If not, we should check that condition also here before deciding to not update the conf entry on disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the conf entry on disk is only loaded when the server is restarted, as if the server is the first time to start, it has no persisted conf.
// read configuration from the storage
Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
Thus there are two cases:
The init conf is as same as the conf which is about to update. This won't affect at all, as it will still write conf when actually applied a log.
The init conf is different from the conf which is about to update. Here the equivalence check will ensure it will do the write.
@@ -1651,7 +1651,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( | |||
} | |||
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); | |||
state.setLeader(leaderId, "installSnapshot"); | |||
long snapshotIndex = state.getSnapshotIndex(); | |||
long snapshotIndex = state.getLog().getSnapshotIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change required? ServerState snapshotIndex should be the same as RaftLog snapshotIndex.
Please let me know if I am missing something here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the possibility that after statemachine just installed the snapshot and before the main thread return the SNAPSHOT_INSTALLED, the snapshotIndex here will get from the statemachine instead of memory and return ALREADY_INSTALLED, which will cause the follower not to update the index and reload statemachine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks for explaining.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the possibility that after statemachine just installed the snapshot and before the main thread return the SNAPSHOT_INSTALLED, the snapshotIndex here will get from the statemachine instead of memory and return ALREADY_INSTALLED, which will cause the follower not to update the index and reload statemachine
before the main thread return the SNAPSHOT_INSTALLED, inProgressInstallSnapshotRequest
will not be set back to 0 if no exception is thrown. so the following code will not be executed until inProgressInstallSnapshotRequest
is set back to 0.
if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
if inProgressInstallSnapshotRequest
is set back to 0, it means the snapshot has been installed successfully, so now ServerState snapshotIndex should be the same as RaftLog snapshotIndex because the statemachine has been reloaded.
so seems the changes here is unnecessary , please correct me if i miss something here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Xushaohong for working on this and fixing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Xushaohong for the work!
in the background thread , which is started by stateMachine.followerEvent().notifyInstallSnapshotFromLeader.whenComplete()
, inProgressInstallSnapshotRequest
will be set back to 0
if snapshot is installed successfully.
there may be a case that inProgressInstallSnapshotRequest
is just set back to 0
in the background thread, a notifyInstallSnapshot
request from leader comes and stateMachine.followerEvent().notifyInstallSnapshotFromLeader.whenComplete()
will be called again. is this a problem?
i think we should add some check to see if thefirstAvailableLogTermIndex
included in the request is larger than the lastTermIndex in the current follower`s log
Please let me know if I am missing something here.
Only when an exception happens will the @JacksonYao287 thx Jackson for the review~ The compare between lastTermIndex in the current follower`s log and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @Xushaohong for the explanation. the changes overall looks good to me. i left several comments , please take a look.
when the follower is installing snapshot , leader will send a lot of notifyInstallSnapshot requests repeatedly, which is unnecessary。so can you please create a jira to solve this problem? or handle this issue in this patch?
if (exception != null) { | ||
LOG.error("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", | ||
getMemberId(), exception.getMessage()); | ||
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems it is better to use inProgressInstallSnapshotRequest.set(0)
if an exception is thrown, no need to compare!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should not have the case when an exception happens, the inProgressInstallSnapshotRequest is not the value of firstAvailableLogIndex
. On the other hand, this is somehow protection that if the corner case happens, there
must be something wrong, and this will leave the follower keeping the state of IN_PROGRESS and alert the maintainer.
@@ -1650,7 +1649,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( | |||
} | |||
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); | |||
state.setLeader(leaderId, "installSnapshot"); | |||
long snapshotIndex = state.getSnapshotIndex(); | |||
long snapshotIndex = state.getLog().getSnapshotIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can put this line before if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is protected by the descriptor synchronized
, it should be fine here
@@ -1651,7 +1651,7 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( | |||
} | |||
changeToFollowerAndPersistMetadata(leaderTerm, "installSnapshot"); | |||
state.setLeader(leaderId, "installSnapshot"); | |||
long snapshotIndex = state.getSnapshotIndex(); | |||
long snapshotIndex = state.getLog().getSnapshotIndex(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is the possibility that after statemachine just installed the snapshot and before the main thread return the SNAPSHOT_INSTALLED, the snapshotIndex here will get from the statemachine instead of memory and return ALREADY_INSTALLED, which will cause the follower not to update the index and reload statemachine
before the main thread return the SNAPSHOT_INSTALLED, inProgressInstallSnapshotRequest
will not be set back to 0 if no exception is thrown. so the following code will not be executed until inProgressInstallSnapshotRequest
is set back to 0.
if (snapshotIndex + 1 >= firstAvailableLogIndex && firstAvailableLogIndex > 0) {
// State Machine has already installed the snapshot. Return the
// latest snapshot index to the Leader.
inProgressInstallSnapshotRequest.compareAndSet(firstAvailableLogIndex, 0);
LOG.info("{}: InstallSnapshot notification result: {}, current snapshot index: {}", getMemberId(),
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(), currentTerm,
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
}
if inProgressInstallSnapshotRequest
is set back to 0, it means the snapshot has been installed successfully, so now ServerState snapshotIndex should be the same as RaftLog snapshotIndex because the statemachine has been reloaded.
so seems the changes here is unnecessary , please correct me if i miss something here!
Yep, i have a patch in my local env to fix it and this PR is the prerequisite. I would create a new PR ASAP. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. thanks @Xushaohong for this work!
@szetszwo please take a look
@Xushaohong , sorry that there are some conflicts. Could you update it? |
Sure, I have updated the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Xushaohong , thanks for the update. Some minor comments inlined.
state.writeRaftConfiguration(proto); | ||
server.getStateMachine().event().notifyConfigurationChanged( | ||
proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry()); | ||
final LogEntryProto newConfLogEntryProto = request.getLastRaftConfigurationLogEntryProto(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep using "proto" as the name so that we can keep the lines short.
final RoleInfoProto proto = leaderProto == null || server.getRaftConf().getPeer(state.getLeaderId()) != null? | ||
server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); | ||
final RoleInfoProto roleInfoProto = leaderProto == null || server.getRaftConf().getPeer(state. | ||
getLeaderId()) != null? server.getRoleInfoProto(): getRoleInfoProto(ProtoUtils.toRaftPeer(leaderProto)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please revert this. It only has name change and format change.
long latestInstalledSnapshotIndex = this.installedSnapshotIndex.getAndSet(0); | ||
if (latestInstalledSnapshotIndex > 0) { | ||
// installedSnapshotIndex to (0,0). | ||
final TermIndex latestInstalledSnapshotTermIndex = this.installedSnapshotTermIndex.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use getAndSet or getAndUpdate. Otherwise, the operation is not atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here I have a question, why the operation is not atomic here?
- The process is protected by the ```synchronized`` lock.
- With this lock protection, here the processing logic is idempotent. The second request would not affect the previous one's result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 the change looks good.
What changes were proposed in this pull request?
The state upgrade
state.updateInstalledSnapshotIndex(reply); state.reloadStateMachine(reply.getIndex()); installedSnapshotIndex.set(reply.getIndex());
in
notifyStateMachineToInstallSnapshot
should be synchronized in main thread.Currently it is executed in whenComplete stage after
stateMachine.followerEvent().notifyInstallSnapshotFromLeader
, which could lead to two inappropriate logic.if (latestInstalledSnapshotIndex > 0)
will be executed before state upgrade, which cause the raft server could not respond SNAPSHOT_INSTALLED in one request.What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/RATIS-1481
How was this patch tested?
manual test on ozone