Skip to content

Commit

Permalink
Rename RaftCommit to Commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 12, 2017
1 parent d4b2374 commit 8dde86b
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 56 deletions.
Expand Up @@ -46,15 +46,15 @@ public void init(ServiceContext context) {
} }


@Override @Override
public byte[] apply(RaftCommit<byte[]> commit) { public byte[] apply(Commit<byte[]> commit) {
return executor.apply(commit); return executor.apply(commit);
} }


/** /**
* Configures the state machine. * Configures the state machine.
* <p> * <p>
* By default, this method will configure state machine operations by extracting public methods with * By default, this method will configure state machine operations by extracting public methods with
* a single {@link RaftCommit} parameter via reflection. Override this method to explicitly register * a single {@link Commit} parameter via reflection. Override this method to explicitly register
* state machine operations via the provided {@link RaftServiceExecutor}. * state machine operations via the provided {@link RaftServiceExecutor}.
* *
* @param executor The state machine executor. * @param executor The state machine executor.
Expand Down
Expand Up @@ -27,7 +27,7 @@
/** /**
* Represents the committed state and metadata of a Raft state machine operation. * Represents the committed state and metadata of a Raft state machine operation.
*/ */
public interface RaftCommit<T> { public interface Commit<T> {


/** /**
* Returns the commit index. * Returns the commit index.
Expand All @@ -48,7 +48,7 @@ public interface RaftCommit<T> {
* Returns the session that submitted the operation. * Returns the session that submitted the operation.
* <p> * <p>
* The returned {@link RaftSession} is representative of the session that submitted the operation * The returned {@link RaftSession} is representative of the session that submitted the operation
* that resulted in this {@link RaftCommit}. The session can be used to {@link RaftSession#publish(RaftEvent)} * that resulted in this {@link Commit}. The session can be used to {@link RaftSession#publish(RaftEvent)}
* event messages to the client. * event messages to the client.
* *
* @return The session that created the commit. * @return The session that created the commit.
Expand All @@ -71,7 +71,7 @@ public interface RaftCommit<T> {
* commit times are guaranteed to progress monotonically, never going back in time. * commit times are guaranteed to progress monotonically, never going back in time.
* <p> * <p>
* Users should <em>never</em> use {@code System} time to control behavior in a state machine and should instead rely * Users should <em>never</em> use {@code System} time to control behavior in a state machine and should instead rely
* upon {@link RaftCommit} times or use the {@link RaftServiceExecutor} for time-based controls. * upon {@link Commit} times or use the {@link RaftServiceExecutor} for time-based controls.
* *
* @return The commit time. * @return The commit time.
*/ */
Expand All @@ -98,13 +98,13 @@ public interface RaftCommit<T> {
* @param <U> the output commit value type * @param <U> the output commit value type
* @return the mapped commit * @return the mapped commit
*/ */
<U> RaftCommit<U> map(Function<T, U> transcoder); <U> Commit<U> map(Function<T, U> transcoder);


/** /**
* Converts the commit to a null valued commit. * Converts the commit to a null valued commit.
* *
* @return the mapped commit * @return the mapped commit
*/ */
RaftCommit<Void> mapToNull(); Commit<Void> mapToNull();


} }
Expand Up @@ -29,7 +29,7 @@
* Users should extend this class to create a state machine for use within a {@link RaftServer}. * Users should extend this class to create a state machine for use within a {@link RaftServer}.
* <p> * <p>
* State machines are responsible for handling {@link RaftOperation operations} submitted to the Raft cluster and * State machines are responsible for handling {@link RaftOperation operations} submitted to the Raft cluster and
* filtering {@link RaftCommit committed} operations out of the Raft log. The most important rule of state machines is * filtering {@link Commit committed} operations out of the Raft log. The most important rule of state machines is
* that <em>state machines must be deterministic</em> in order to maintain Raft's consistency guarantees. That is, * that <em>state machines must be deterministic</em> in order to maintain Raft's consistency guarantees. That is,
* state machines must not change their behavior based on external influences and have no side effects. Users should * state machines must not change their behavior based on external influences and have no side effects. Users should
* <em>never</em> use {@code System} time to control behavior within a state machine. * <em>never</em> use {@code System} time to control behavior within a state machine.
Expand All @@ -41,7 +41,7 @@
* <h3>State machine operations</h3> * <h3>State machine operations</h3>
* State machine operations are implemented as methods on the state machine. Operations can be automatically detected * State machine operations are implemented as methods on the state machine. Operations can be automatically detected
* by the state machine during setup or can be explicitly registered by overriding the {@link #configure(RaftServiceExecutor)} * by the state machine during setup or can be explicitly registered by overriding the {@link #configure(RaftServiceExecutor)}
* method. Each operation method must take a single {@link RaftCommit} argument for a specific operation type. * method. Each operation method must take a single {@link Commit} argument for a specific operation type.
* <pre> * <pre>
* {@code * {@code
* public class MapStateMachine extends StateMachine { * public class MapStateMachine extends StateMachine {
Expand Down Expand Up @@ -69,12 +69,12 @@
* } * }
* } * }
* </pre> * </pre>
* When operations are applied to the state machine they're wrapped in a {@link RaftCommit} object. The commit provides the * When operations are applied to the state machine they're wrapped in a {@link Commit} object. The commit provides the
* context of how the command or query was committed to the cluster, including the log {@link RaftCommit#index()}, the * context of how the command or query was committed to the cluster, including the log {@link Commit#index()}, the
* {@link RaftSession} from which the operation was submitted, and the approximate * {@link RaftSession} from which the operation was submitted, and the approximate
* wall-clock {@link RaftCommit#wallClockTime()} at which the commit was written to the Raft log. Note that the commit time is * wall-clock {@link Commit#wallClockTime()} at which the commit was written to the Raft log. Note that the commit time is
* guaranteed to progress monotonically, but it may not be representative of the progress of actual time. See the * guaranteed to progress monotonically, but it may not be representative of the progress of actual time. See the
* {@link RaftCommit} documentation for more information. * {@link Commit} documentation for more information.
* <p> * <p>
* State machine operations are guaranteed to be executed in the order in which they were submitted by the client, * State machine operations are guaranteed to be executed in the order in which they were submitted by the client,
* always in the same thread, and thus always sequentially. State machines do not need to be thread safe, but they must * always in the same thread, and thus always sequentially. State machines do not need to be thread safe, but they must
Expand Down Expand Up @@ -151,7 +151,7 @@
* method. Once the state machine has written a snapshot of its state, Raft will automatically remove all commands * method. Once the state machine has written a snapshot of its state, Raft will automatically remove all commands
* associated with the state machine from the underlying log. * associated with the state machine from the underlying log.
* *
* @see RaftCommit * @see Commit
* @see ServiceContext * @see ServiceContext
* @see RaftServiceExecutor * @see RaftServiceExecutor
*/ */
Expand All @@ -171,7 +171,7 @@ public interface RaftService extends Snapshottable, RaftSessionListener {
* @param commit the commit to apply * @param commit the commit to apply
* @return the commit result * @return the commit result
*/ */
byte[] apply(RaftCommit<byte[]> commit); byte[] apply(Commit<byte[]> commit);


/** /**
* Closes the state machine. * Closes the state machine.
Expand Down
Expand Up @@ -20,9 +20,6 @@
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.service.RaftCommit;
import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.ServiceContext;
import io.atomix.storage.buffer.HeapBytes; import io.atomix.storage.buffer.HeapBytes;
import io.atomix.utils.concurrent.ThreadContext; import io.atomix.utils.concurrent.ThreadContext;


Expand Down Expand Up @@ -76,7 +73,7 @@ public interface RaftServiceExecutor extends ThreadContext {
* @param commit the commit to apply * @param commit the commit to apply
* @return the commit result * @return the commit result
*/ */
byte[] apply(RaftCommit<byte[]> commit); byte[] apply(Commit<byte[]> commit);


/** /**
* Registers a operation callback. * Registers a operation callback.
Expand All @@ -85,7 +82,7 @@ public interface RaftServiceExecutor extends ThreadContext {
* @param callback the operation callback * @param callback the operation callback
* @throws NullPointerException if the {@code operationId} or {@code callback} is null * @throws NullPointerException if the {@code operationId} or {@code callback} is null
*/ */
void handle(OperationId operationId, Function<RaftCommit<byte[]>, byte[]> callback); void handle(OperationId operationId, Function<Commit<byte[]>, byte[]> callback);


/** /**
* Registers a operation callback. * Registers a operation callback.
Expand All @@ -94,7 +91,7 @@ public interface RaftServiceExecutor extends ThreadContext {
* @param callback the operation callback * @param callback the operation callback
* @throws NullPointerException if the {@code operationId} or {@code callback} is null * @throws NullPointerException if the {@code operationId} or {@code callback} is null
*/ */
default void register(OperationId operationId, Consumer<RaftCommit<Void>> callback) { default void register(OperationId operationId, Consumer<Commit<Void>> callback) {
checkNotNull(operationId, "operationId cannot be null"); checkNotNull(operationId, "operationId cannot be null");
checkNotNull(callback, "callback cannot be null"); checkNotNull(callback, "callback cannot be null");
handle(operationId, commit -> { handle(operationId, commit -> {
Expand All @@ -110,7 +107,7 @@ default void register(OperationId operationId, Consumer<RaftCommit<Void>> callba
* @param callback the operation callback * @param callback the operation callback
* @throws NullPointerException if the {@code operationId} or {@code callback} is null * @throws NullPointerException if the {@code operationId} or {@code callback} is null
*/ */
default <R> void register(OperationId operationId, Function<RaftCommit<Void>, R> callback, Function<R, byte[]> encoder) { default <R> void register(OperationId operationId, Function<Commit<Void>, R> callback, Function<R, byte[]> encoder) {
checkNotNull(operationId, "operationId cannot be null"); checkNotNull(operationId, "operationId cannot be null");
checkNotNull(callback, "callback cannot be null"); checkNotNull(callback, "callback cannot be null");
checkNotNull(encoder, "encoder cannot be null"); checkNotNull(encoder, "encoder cannot be null");
Expand All @@ -125,7 +122,7 @@ default <R> void register(OperationId operationId, Function<RaftCommit<Void>, R>
* @param callback the operation callback * @param callback the operation callback
* @throws NullPointerException if the {@code operationId} or {@code callback} is null * @throws NullPointerException if the {@code operationId} or {@code callback} is null
*/ */
default <T> void register(OperationId operationId, Function<byte[], T> decoder, Consumer<RaftCommit<T>> callback) { default <T> void register(OperationId operationId, Function<byte[], T> decoder, Consumer<Commit<T>> callback) {
checkNotNull(operationId, "operationId cannot be null"); checkNotNull(operationId, "operationId cannot be null");
checkNotNull(decoder, "decoder cannot be null"); checkNotNull(decoder, "decoder cannot be null");
checkNotNull(callback, "callback cannot be null"); checkNotNull(callback, "callback cannot be null");
Expand All @@ -144,7 +141,7 @@ default <T> void register(OperationId operationId, Function<byte[], T> decoder,
* @param encoder the output encoder * @param encoder the output encoder
* @throws NullPointerException if the {@code operationId} or {@code callback} is null * @throws NullPointerException if the {@code operationId} or {@code callback} is null
*/ */
default <T, R> void register(OperationId operationId, Function<byte[], T> decoder, Function<RaftCommit<T>, R> callback, Function<R, byte[]> encoder) { default <T, R> void register(OperationId operationId, Function<byte[], T> decoder, Function<Commit<T>, R> callback, Function<R, byte[]> encoder) {
checkNotNull(operationId, "operationId cannot be null"); checkNotNull(operationId, "operationId cannot be null");
checkNotNull(decoder, "decoder cannot be null"); checkNotNull(decoder, "decoder cannot be null");
checkNotNull(callback, "callback cannot be null"); checkNotNull(callback, "callback cannot be null");
Expand Down
Expand Up @@ -16,7 +16,7 @@
package io.atomix.protocols.raft.service.impl; package io.atomix.protocols.raft.service.impl;


import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.time.LogicalTimestamp; import io.atomix.time.LogicalTimestamp;
import io.atomix.time.WallClockTimestamp; import io.atomix.time.WallClockTimestamp;
Expand All @@ -30,14 +30,14 @@
/** /**
* Server commit. * Server commit.
*/ */
public class DefaultRaftCommit<T> implements RaftCommit<T> { public class DefaultCommit<T> implements Commit<T> {
private final long index; private final long index;
private final RaftSession session; private final RaftSession session;
private final long timestamp; private final long timestamp;
private final OperationId operation; private final OperationId operation;
private final T value; private final T value;


public DefaultRaftCommit(long index, OperationId operation, T value, RaftSession session, long timestamp) { public DefaultCommit(long index, OperationId operation, T value, RaftSession session, long timestamp) {
this.index = index; this.index = index;
this.session = session; this.session = session;
this.timestamp = timestamp; this.timestamp = timestamp;
Expand Down Expand Up @@ -77,23 +77,23 @@ public T value() {


@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(RaftCommit.class, index, session.sessionId(), operation); return Objects.hash(Commit.class, index, session.sessionId(), operation);
} }


@Override @Override
public <U> RaftCommit<U> map(Function<T, U> transcoder) { public <U> Commit<U> map(Function<T, U> transcoder) {
return new DefaultRaftCommit<>(index, operation, transcoder.apply(value), session, timestamp); return new DefaultCommit<>(index, operation, transcoder.apply(value), session, timestamp);
} }


@Override @Override
public RaftCommit<Void> mapToNull() { public Commit<Void> mapToNull() {
return new DefaultRaftCommit<>(index, operation, null, session, timestamp); return new DefaultCommit<>(index, operation, null, session, timestamp);
} }


@Override @Override
public boolean equals(Object object) { public boolean equals(Object object) {
if (object instanceof RaftCommit) { if (object instanceof Commit) {
RaftCommit commit = (RaftCommit) object; Commit commit = (Commit) object;
return commit.index() == index return commit.index() == index
&& commit.session().equals(session) && commit.session().equals(session)
&& commit.operation().equals(operation) && commit.operation().equals(operation)
Expand Down
Expand Up @@ -18,7 +18,7 @@
import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftService; import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.RaftServiceExecutor; import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.service.ServiceContext; import io.atomix.protocols.raft.service.ServiceContext;
Expand Down Expand Up @@ -48,7 +48,7 @@ public class DefaultRaftServiceExecutor implements RaftServiceExecutor {
private final Queue<Runnable> tasks = new LinkedList<>(); private final Queue<Runnable> tasks = new LinkedList<>();
private final List<ScheduledTask> scheduledTasks = new ArrayList<>(); private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
private final List<ScheduledTask> complete = new ArrayList<>(); private final List<ScheduledTask> complete = new ArrayList<>();
private final Map<OperationId, Function<RaftCommit<byte[]>, byte[]>> operations = new HashMap<>(); private final Map<OperationId, Function<Commit<byte[]>, byte[]>> operations = new HashMap<>();
private OperationType operationType; private OperationType operationType;
private long timestamp; private long timestamp;


Expand All @@ -65,7 +65,7 @@ public DefaultRaftServiceExecutor(ServiceContext context) {
* *
* @param commit the current commit * @param commit the current commit
*/ */
private void prepareOperation(RaftCommit<byte[]> commit) { private void prepareOperation(Commit<byte[]> commit) {
long timestamp = commit.wallClockTime().unixTimestamp(); long timestamp = commit.wallClockTime().unixTimestamp();


// Trigger scheduled tasks if this is a command and tasks are waiting to be executed. // Trigger scheduled tasks if this is a command and tasks are waiting to be executed.
Expand Down Expand Up @@ -111,21 +111,21 @@ private void checkOperation(OperationType type, String message) {
} }


@Override @Override
public void handle(OperationId operationId, Function<RaftCommit<byte[]>, byte[]> callback) { public void handle(OperationId operationId, Function<Commit<byte[]>, byte[]> callback) {
checkNotNull(operationId, "operationId cannot be null"); checkNotNull(operationId, "operationId cannot be null");
checkNotNull(callback, "callback cannot be null"); checkNotNull(callback, "callback cannot be null");
operations.put(operationId, callback); operations.put(operationId, callback);
log.debug("Registered operation callback {}", operationId); log.debug("Registered operation callback {}", operationId);
} }


@Override @Override
public byte[] apply(RaftCommit<byte[]> commit) { public byte[] apply(Commit<byte[]> commit) {
log.trace("Executing {}", commit); log.trace("Executing {}", commit);


prepareOperation(commit); prepareOperation(commit);


// Look up the registered callback for the operation. // Look up the registered callback for the operation.
Function<RaftCommit<byte[]>, byte[]> callback = operations.get(commit.operation()); Function<Commit<byte[]>, byte[]> callback = operations.get(commit.operation());


if (callback == null) { if (callback == null) {
throw new IllegalStateException("Unknown state machine operation: " + commit.operation()); throw new IllegalStateException("Unknown state machine operation: " + commit.operation());
Expand Down
Expand Up @@ -24,7 +24,7 @@
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftService; import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.ServiceContext; import io.atomix.protocols.raft.service.ServiceContext;
import io.atomix.protocols.raft.service.ServiceId; import io.atomix.protocols.raft.service.ServiceId;
Expand Down Expand Up @@ -561,7 +561,7 @@ private void applyCommand(long index, long sequence, long timestamp, RaftOperati
return; return;
} }


RaftCommit<byte[]> commit = new DefaultRaftCommit<>(index, operation.id(), operation.value(), session, timestamp); Commit<byte[]> commit = new DefaultCommit<>(index, operation.id(), operation.value(), session, timestamp);


long eventIndex = session.getEventIndex(); long eventIndex = session.getEventIndex();


Expand Down Expand Up @@ -657,7 +657,7 @@ private void applyQuery(long timestamp, RaftSessionContext session, RaftOperatio
// Set the current operation type to QUERY to prevent events from being sent to clients. // Set the current operation type to QUERY to prevent events from being sent to clients.
setOperation(OperationType.QUERY); setOperation(OperationType.QUERY);


RaftCommit<byte[]> commit = new DefaultRaftCommit<>(session.getLastApplied(), operation.id(), operation.value(), session, timestamp); Commit<byte[]> commit = new DefaultCommit<>(session.getLastApplied(), operation.id(), operation.value(), session, timestamp);


long eventIndex = session.getEventIndex(); long eventIndex = session.getEventIndex();


Expand Down
Expand Up @@ -27,7 +27,7 @@
import io.atomix.protocols.raft.protocol.TestRaftProtocolFactory; import io.atomix.protocols.raft.protocol.TestRaftProtocolFactory;
import io.atomix.protocols.raft.proxy.RaftProxy; import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.service.AbstractRaftService; import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor; import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.service.ServiceType; import io.atomix.protocols.raft.service.ServiceType;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
Expand Down Expand Up @@ -1402,8 +1402,8 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
* Test state machine. * Test state machine.
*/ */
public static class TestStateMachine extends AbstractRaftService { public static class TestStateMachine extends AbstractRaftService {
private RaftCommit<Void> expire; private Commit<Void> expire;
private RaftCommit<Void> close; private Commit<Void> close;


@Override @Override
protected void configure(RaftServiceExecutor executor) { protected void configure(RaftServiceExecutor executor) {
Expand Down Expand Up @@ -1438,15 +1438,15 @@ public void install(SnapshotReader reader) {
assertEquals(10, reader.readLong()); assertEquals(10, reader.readLong());
} }


protected long write(RaftCommit<Void> commit) { protected long write(Commit<Void> commit) {
return commit.index(); return commit.index();
} }


protected long read(RaftCommit<Void> commit) { protected long read(Commit<Void> commit) {
return commit.index(); return commit.index();
} }


protected long event(RaftCommit<Boolean> commit) { protected long event(Commit<Boolean> commit) {
if (commit.value()) { if (commit.value()) {
commit.session().publish(CHANGE_EVENT, clientSerializer::encode, commit.index()); commit.session().publish(CHANGE_EVENT, clientSerializer::encode, commit.index());
} else { } else {
Expand All @@ -1457,11 +1457,11 @@ protected long event(RaftCommit<Boolean> commit) {
return commit.index(); return commit.index();
} }


public void close(RaftCommit<Void> commit) { public void close(Commit<Void> commit) {
this.close = commit; this.close = commit;
} }


public void expire(RaftCommit<Void> commit) { public void expire(Commit<Void> commit) {
this.expire = commit; this.expire = commit;
} }
} }
Expand Down

0 comments on commit 8dde86b

Please sign in to comment.