Skip to content

Commit

Permalink
Refactor log readers/writers to simplify consistency checks.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jun 24, 2017
1 parent afb410e commit eebd044
Show file tree
Hide file tree
Showing 23 changed files with 361 additions and 429 deletions.
Expand Up @@ -177,15 +177,15 @@ private <T> CompletableFuture<T> applyIndex(long index) {
if (nextIndex < index) {
Indexed<RaftLogEntry> entry = reader.next();
applyEntry(entry);
setLastApplied(entry.getIndex());
setLastApplied(entry.index());
}
// If the next index is equal to the applied index, apply it and return the result.
else if (nextIndex == index) {
// Read the entry from the log. If the entry is non-null then apply it, otherwise
// simply update the last applied index and return a null result.
try {
Indexed<RaftLogEntry> entry = reader.next();
if (entry.getIndex() != index) {
if (entry.index() != index) {
throw new IllegalStateException("inconsistent index applying entry " + index + ": " + entry);
}
return applyEntry(entry);
Expand Down Expand Up @@ -247,21 +247,21 @@ private <T> CompletableFuture<T> applyEntry(Indexed<? extends RaftLogEntry> entr
@SuppressWarnings("unchecked")
private <T> CompletableFuture<T> applyEntry(Indexed<? extends RaftLogEntry> entry) {
LOGGER.trace("{} - Applying {}", state.getCluster().getMember().memberId(), entry);
if (entry.getType() == QueryEntry.class) {
if (entry.type() == QueryEntry.class) {
return (CompletableFuture<T>) applyQuery(entry.cast());
} else if (entry.getType() == CommandEntry.class) {
} else if (entry.type() == CommandEntry.class) {
return (CompletableFuture<T>) applyCommand(entry.cast());
} else if (entry.getType() == OpenSessionEntry.class) {
} else if (entry.type() == OpenSessionEntry.class) {
return (CompletableFuture<T>) applyOpenSession(entry.cast());
} else if (entry.getType() == KeepAliveEntry.class) {
} else if (entry.type() == KeepAliveEntry.class) {
return (CompletableFuture<T>) applyKeepAlive(entry.cast());
} else if (entry.getType() == CloseSessionEntry.class) {
} else if (entry.type() == CloseSessionEntry.class) {
return (CompletableFuture<T>) applyCloseSession(entry.cast());
} else if (entry.getType() == MetadataEntry.class) {
} else if (entry.type() == MetadataEntry.class) {
return (CompletableFuture<T>) applyMetadata(entry.cast());
} else if (entry.getType() == InitializeEntry.class) {
} else if (entry.type() == InitializeEntry.class) {
return (CompletableFuture<T>) applyInitialize(entry.cast());
} else if (entry.getType() == ConfigurationEntry.class) {
} else if (entry.type() == ConfigurationEntry.class) {
return (CompletableFuture<T>) applyConfiguration(entry.cast());
}
return Futures.exceptionalFuture(new InternalException("Unknown entry type"));
Expand Down Expand Up @@ -313,9 +313,9 @@ private CompletableFuture<Void> applyConfiguration(Indexed<ConfigurationEntry> e
*/
private CompletableFuture<Void> applyKeepAlive(Indexed<KeepAliveEntry> entry) {
// Store the session/command/event sequence and event index instead of acquiring a reference to the entry.
long[] sessionIds = entry.getEntry().sessionIds();
long[] commandSequences = entry.getEntry().commandSequenceNumbers();
long[] eventIndexes = entry.getEntry().eventIndexes();
long[] sessionIds = entry.entry().sessionIds();
long[] commandSequences = entry.entry().commandSequenceNumbers();
long[] eventIndexes = entry.entry().eventIndexes();

for (int i = 0; i < sessionIds.length; i++) {
long sessionId = sessionIds[i];
Expand All @@ -324,7 +324,7 @@ private CompletableFuture<Void> applyKeepAlive(Indexed<KeepAliveEntry> entry) {

RaftSessionContext session = sessionManager.getSession(sessionId);
if (session != null) {
session.getStateMachineExecutor().keepAlive(entry.getIndex(), entry.getEntry().timestamp(), session, commandSequence, eventIndex);
session.getStateMachineExecutor().keepAlive(entry.index(), entry.entry().timestamp(), session, commandSequence, eventIndex);
}
}
return CompletableFuture.completedFuture(null);
Expand All @@ -335,65 +335,65 @@ private CompletableFuture<Void> applyKeepAlive(Indexed<KeepAliveEntry> entry) {
*/
private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> entry) {
// Get the state machine executor or create one if it doesn't already exist.
RaftServerStateMachineExecutor stateMachineExecutor = stateMachines.get(entry.getEntry().name());
RaftServerStateMachineExecutor stateMachineExecutor = stateMachines.get(entry.entry().name());
if (stateMachineExecutor == null) {
Supplier<RaftStateMachine> stateMachineSupplier = state.getStateMachineRegistry().getFactory(entry.getEntry().typeName());
Supplier<RaftStateMachine> stateMachineSupplier = state.getStateMachineRegistry().getFactory(entry.entry().typeName());
if (stateMachineSupplier == null) {
return Futures.exceptionalFuture(new UnknownStateMachineException("Unknown state machine type " + entry.getEntry().typeName()));
return Futures.exceptionalFuture(new UnknownStateMachineException("Unknown state machine type " + entry.entry().typeName()));
}

StateMachineId stateMachineId = StateMachineId.from(entry.getIndex());
StateMachineId stateMachineId = StateMachineId.from(entry.index());
stateMachineExecutor = new RaftServerStateMachineExecutor(
stateMachineId,
entry.getEntry().name(),
entry.getEntry().typeName(),
entry.entry().name(),
entry.entry().typeName(),
stateMachineSupplier.get(),
state,
sessionManager,
new ThreadPoolContext(threadPool),
new ThreadPoolContext(threadPool));
stateMachines.put(entry.getEntry().name(), stateMachineExecutor);
stateMachines.put(entry.entry().name(), stateMachineExecutor);
}

SessionId sessionId = SessionId.from(entry.getIndex());
SessionId sessionId = SessionId.from(entry.index());
RaftSessionContext session = new RaftSessionContext(
sessionId,
entry.getEntry().memberId(),
entry.getEntry().name(),
entry.getEntry().typeName(),
entry.getEntry().timeout(),
entry.entry().memberId(),
entry.entry().name(),
entry.entry().typeName(),
entry.entry().timeout(),
stateMachineExecutor,
state);
return stateMachineExecutor.openSession(entry.getIndex(), entry.getEntry().timestamp(), session);
return stateMachineExecutor.openSession(entry.index(), entry.entry().timestamp(), session);
}

/**
* Applies a close session entry to the state machine.
*/
private CompletableFuture<Void> applyCloseSession(Indexed<CloseSessionEntry> entry) {
RaftSessionContext session = sessionManager.getSession(entry.getEntry().session());
RaftSessionContext session = sessionManager.getSession(entry.entry().session());

// If the server session is null, the session either never existed or already expired.
if (session == null) {
return Futures.exceptionalFuture(new UnknownSessionException("Unknown session: " + entry.getEntry().session()));
return Futures.exceptionalFuture(new UnknownSessionException("Unknown session: " + entry.entry().session()));
}

// Get the state machine executor associated with the session and unregister the session.
RaftServerStateMachineExecutor stateMachineExecutor = session.getStateMachineExecutor();
return stateMachineExecutor.closeSession(entry.getIndex(), entry.getEntry().timestamp(), session);
return stateMachineExecutor.closeSession(entry.index(), entry.entry().timestamp(), session);
}

/**
* Applies a metadata entry to the state machine.
*/
private CompletableFuture<RaftMetadataResult> applyMetadata(Indexed<MetadataEntry> entry) {
// If the session ID is non-zero, read the metadata for the associated state machine.
if (entry.getEntry().session() > 0) {
RaftSessionContext session = sessionManager.getSession(entry.getEntry().session());
if (entry.entry().session() > 0) {
RaftSessionContext session = sessionManager.getSession(entry.entry().session());

// If the session is null, return an UnknownSessionException.
if (session == null) {
return Futures.exceptionalFuture(new UnknownSessionException("Unknown session: " + entry.getEntry().session()));
return Futures.exceptionalFuture(new UnknownSessionException("Unknown session: " + entry.entry().session()));
}

Set<RaftSessionMetadata> sessions = new HashSet<>();
Expand Down Expand Up @@ -429,17 +429,17 @@ private CompletableFuture<RaftMetadataResult> applyMetadata(Indexed<MetadataEntr
*/
private CompletableFuture<RaftOperationResult> applyCommand(Indexed<CommandEntry> entry) {
// First check to ensure that the session exists.
RaftSessionContext session = sessionManager.getSession(entry.getEntry().session());
RaftSessionContext session = sessionManager.getSession(entry.entry().session());

// If the session is null, return an UnknownSessionException. Commands applied to the state machine must
// have a session. We ensure that session register/unregister entries are not compacted from the log
// until all associated commands have been cleaned.
if (session == null) {
return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getEntry().session()));
return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.entry().session()));
}

// Execute the command using the state machine associated with the session.
return session.getStateMachineExecutor().executeCommand(entry.getIndex(), entry.getEntry().sequenceNumber(), entry.getEntry().timestamp(), session, entry.getEntry().bytes());
return session.getStateMachineExecutor().executeCommand(entry.index(), entry.entry().sequenceNumber(), entry.entry().timestamp(), session, entry.entry().bytes());
}

/**
Expand All @@ -461,16 +461,16 @@ private CompletableFuture<RaftOperationResult> applyCommand(Indexed<CommandEntry
* fault-tolerance and consistency across the cluster.
*/
private CompletableFuture<RaftOperationResult> applyQuery(Indexed<QueryEntry> entry) {
RaftSessionContext session = sessionManager.getSession(entry.getEntry().session());
RaftSessionContext session = sessionManager.getSession(entry.entry().session());

// If the session is null then that indicates that the session already timed out or it never existed.
// Return with an UnknownSessionException.
if (session == null) {
return Futures.exceptionalFuture(new UnknownSessionException("unknown session " + entry.getEntry().session()));
return Futures.exceptionalFuture(new UnknownSessionException("unknown session " + entry.entry().session()));
}

// Execute the query using the state machine associated with the session.
return session.getStateMachineExecutor().executeQuery(entry.getIndex(), entry.getEntry().sequenceNumber(), entry.getEntry().timestamp(), session, entry.getEntry().bytes());
return session.getStateMachineExecutor().executeQuery(entry.index(), entry.entry().sequenceNumber(), entry.entry().timestamp(), session, entry.entry().bytes());
}

/**
Expand Down
Expand Up @@ -17,7 +17,6 @@

import io.atomix.protocols.raft.cluster.MemberId;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.storage.journal.Indexed;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -47,16 +46,16 @@ public static Builder newBuilder() {

private final long term;
private final MemberId leader;
private final long logIndex;
private final long logTerm;
private final List<Indexed<RaftLogEntry>> entries;
private final long prevLogIndex;
private final long prevLogTerm;
private final List<RaftLogEntry> entries;
private final long commitIndex;

public AppendRequest(long term, MemberId leader, long logIndex, long logTerm, List<Indexed<RaftLogEntry>> entries, long commitIndex) {
public AppendRequest(long term, MemberId leader, long prevLogIndex, long prevLogTerm, List<RaftLogEntry> entries, long commitIndex) {
this.term = term;
this.leader = leader;
this.logIndex = logIndex;
this.logTerm = logTerm;
this.prevLogIndex = prevLogIndex;
this.prevLogTerm = prevLogTerm;
this.entries = entries;
this.commitIndex = commitIndex;
}
Expand Down Expand Up @@ -84,25 +83,25 @@ public MemberId leader() {
*
* @return The index of the log entry preceding the new entry.
*/
public long previousLogIndex() {
return logIndex;
public long prevLogIndex() {
return prevLogIndex;
}

/**
* Returns the term of the log entry preceding the new entry.
*
* @return The index of the term preceding the new entry.
*/
public long previousLogTerm() {
return logTerm;
public long prevLogTerm() {
return prevLogTerm;
}

/**
* Returns the log entries to append.
*
* @return A list of log entries.
*/
public List<Indexed<RaftLogEntry>> entries() {
public List<RaftLogEntry> entries() {
return entries;
}

Expand All @@ -117,7 +116,7 @@ public long commitIndex() {

@Override
public int hashCode() {
return Objects.hash(getClass(), term, leader, logIndex, logTerm, entries, commitIndex);
return Objects.hash(getClass(), term, leader, prevLogIndex, prevLogTerm, entries, commitIndex);
}

@Override
Expand All @@ -126,8 +125,8 @@ public boolean equals(Object object) {
AppendRequest request = (AppendRequest) object;
return request.term == term
&& request.leader == leader
&& request.logIndex == logIndex
&& request.logTerm == logTerm
&& request.prevLogIndex == prevLogIndex
&& request.prevLogTerm == prevLogTerm
&& request.entries.equals(entries)
&& request.commitIndex == commitIndex;
}
Expand All @@ -139,8 +138,8 @@ public String toString() {
return toStringHelper(this)
.add("term", term)
.add("leader", leader)
.add("previousLogIndex", logIndex)
.add("previousLogTerm", logTerm)
.add("prevLogIndex", prevLogIndex)
.add("prevLogTerm", prevLogTerm)
.add("entries", entries.size())
.add("commitIndex", commitIndex)
.toString();
Expand All @@ -154,7 +153,7 @@ public static class Builder extends AbstractRaftRequest.Builder<Builder, AppendR
private MemberId leader;
private long logIndex;
private long logTerm;
private List<Indexed<RaftLogEntry>> entries;
private List<RaftLogEntry> entries;
private long commitIndex = -1;

/**
Expand Down Expand Up @@ -190,7 +189,7 @@ public Builder withLeader(MemberId leader) {
* @throws IllegalArgumentException if the {@code index} is not positive
*/
public Builder withLogIndex(long logIndex) {
checkArgument(logIndex >= 0, "previousLogIndex must be positive");
checkArgument(logIndex >= 0, "prevLogIndex must be positive");
this.logIndex = logIndex;
return this;
}
Expand All @@ -203,7 +202,7 @@ public Builder withLogIndex(long logIndex) {
* @throws IllegalArgumentException if the {@code term} is not positive
*/
public Builder withLogTerm(long logTerm) {
checkArgument(logTerm >= 0, "previousLogTerm must be positive");
checkArgument(logTerm >= 0, "prevLogTerm must be positive");
this.logTerm = logTerm;
return this;
}
Expand All @@ -215,7 +214,7 @@ public Builder withLogTerm(long logTerm) {
* @return The append request builder.
* @throws NullPointerException if {@code entries} is null
*/
public Builder withEntries(Indexed<RaftLogEntry>... entries) {
public Builder withEntries(RaftLogEntry... entries) {
return withEntries(Arrays.asList(checkNotNull(entries, "entries cannot be null")));
}

Expand All @@ -227,7 +226,7 @@ public Builder withEntries(Indexed<RaftLogEntry>... entries) {
* @throws NullPointerException if {@code entries} is null
*/
@SuppressWarnings("unchecked")
public Builder withEntries(List<Indexed<RaftLogEntry>> entries) {
public Builder withEntries(List<RaftLogEntry> entries) {
this.entries = checkNotNull(entries, "entries cannot be null");
return this;
}
Expand All @@ -239,7 +238,7 @@ public Builder withEntries(List<Indexed<RaftLogEntry>> entries) {
* @return The request builder.
* @throws NullPointerException if {@code entry} is {@code null}
*/
public Builder addEntry(Indexed<RaftLogEntry> entry) {
public Builder addEntry(RaftLogEntry entry) {
this.entries.add(checkNotNull(entry, "entry"));
return this;
}
Expand All @@ -262,8 +261,8 @@ protected void validate() {
super.validate();
checkArgument(term > 0, "term must be positive");
checkNotNull(leader, "leader cannot be null");
checkArgument(logIndex >= 0, "previousLogIndex must be positive");
checkArgument(logTerm >= 0, "previousLogTerm must be positive");
checkArgument(logIndex >= 0, "prevLogIndex must be positive");
checkArgument(logTerm >= 0, "prevLogTerm must be positive");
checkNotNull(entries, "entries cannot be null");
checkArgument(commitIndex >= 0, "commitIndex must be positive");
}
Expand Down
Expand Up @@ -98,7 +98,7 @@ public String toString() {
.add("status", status)
.add("term", term)
.add("succeeded", succeeded)
.add("previousLogIndex", logIndex)
.add("prevLogIndex", logIndex)
.toString();
} else {
return toStringHelper(this)
Expand Down Expand Up @@ -148,7 +148,7 @@ public Builder withSucceeded(boolean succeeded) {
* @throws IllegalArgumentException if {@code index} is negative
*/
public Builder withLogIndex(long logIndex) {
checkArgument(logIndex >= 0, "previousLogIndex must be positive");
checkArgument(logIndex >= 0, "prevLogIndex must be positive");
this.logIndex = logIndex;
return this;
}
Expand All @@ -158,7 +158,7 @@ protected void validate() {
super.validate();
if (status == Status.OK) {
checkArgument(term > 0, "term must be positive");
checkArgument(logIndex >= 0, "previousLogIndex must be positive");
checkArgument(logIndex >= 0, "prevLogIndex must be positive");
}
}

Expand Down

0 comments on commit eebd044

Please sign in to comment.