Skip to content

Commit

Permalink
Initialize session with monotonically increasing service timestamp #981
Browse files Browse the repository at this point in the history
  • Loading branch information
shan committed Jan 31, 2019
1 parent c3abbfe commit d3974ee
Showing 1 changed file with 21 additions and 2 deletions.
Expand Up @@ -71,6 +71,7 @@ public class RaftServiceContext implements ServiceContext {
private long currentIndex;
private Session currentSession;
private long currentTimestamp;
private long timestampDelta;
private OperationType currentOperation;
private boolean deleted;
private final LogicalClock logicalClock = new LogicalClock() {
Expand Down Expand Up @@ -188,7 +189,23 @@ private void setOperation(OperationType operation) {
*/
private void tick(long index, long timestamp) {
this.currentIndex = index;
this.currentTimestamp = Math.max(currentTimestamp, timestamp);

// If the entry timestamp is less than the current state machine timestamp
// and the delta is not yet set, set the delta and do not change the current timestamp.
// If the entry timestamp is less than the current state machine timestamp
// and the delta is set, update the current timestamp to the entry timestamp plus the delta.
// If the entry timestamp is greater than or equal to the current timestamp, update the current
// timestamp and reset the delta.
if (timestamp < currentTimestamp) {
if (timestampDelta == 0) {
timestampDelta = currentTimestamp - timestamp;
} else {
currentTimestamp = timestamp + timestampDelta;
}
} else {
currentTimestamp = timestamp;
timestampDelta = 0;
}

// Set the current operation type to COMMAND to allow events to be sent.
setOperation(OperationType.COMMAND);
Expand Down Expand Up @@ -230,6 +247,7 @@ public void installSnapshot(SnapshotReader reader) {
String serviceName = reader.readString();
currentIndex = reader.readLong();
currentTimestamp = reader.readLong();
timestampDelta = reader.readLong();

int sessionCount = reader.readInt();
for (int i = 0; i < sessionCount; i++) {
Expand Down Expand Up @@ -280,6 +298,7 @@ public void takeSnapshot(SnapshotWriter writer) {
writer.writeString(serviceName);
writer.writeLong(currentIndex);
writer.writeLong(currentTimestamp);
writer.writeLong(timestampDelta);

writer.writeInt(sessions.getSessions().size());
for (RaftSession session : sessions.getSessions()) {
Expand Down Expand Up @@ -310,7 +329,7 @@ public long openSession(long index, long timestamp, RaftSession session) {
// Update the state machine index/timestamp.
tick(index, timestamp);

// Update the state machine index/ timestamp .
// Set the session timestamp to the current service timestamp.
session.setLastUpdated(currentTimestamp);

// Expire sessions that have timed out.
Expand Down

0 comments on commit d3974ee

Please sign in to comment.