Skip to content

Commit

Permalink
Move Raft operation executor to service package.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Jul 3, 2017
1 parent 08825bc commit 492a2b7
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 23 deletions.
Expand Up @@ -15,8 +15,7 @@
*/ */
package io.atomix.protocols.raft.service; package io.atomix.protocols.raft.service;


import io.atomix.protocols.raft.operation.RaftOperationExecutor; import io.atomix.protocols.raft.service.impl.DefaultRaftServiceExecutor;
import io.atomix.protocols.raft.operation.impl.DefaultRaftOperationExecutor;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.session.RaftSessions; import io.atomix.protocols.raft.session.RaftSessions;
import io.atomix.time.LogicalClock; import io.atomix.time.LogicalClock;
Expand All @@ -28,12 +27,12 @@
*/ */
public abstract class AbstractRaftService implements RaftService { public abstract class AbstractRaftService implements RaftService {
private ServiceContext context; private ServiceContext context;
private RaftOperationExecutor executor; private RaftServiceExecutor executor;


@Override @Override
public void init(ServiceContext context) { public void init(ServiceContext context) {
this.context = context; this.context = context;
this.executor = new DefaultRaftOperationExecutor(context); this.executor = new DefaultRaftServiceExecutor(context);
configure(executor); configure(executor);
} }


Expand All @@ -47,11 +46,11 @@ public byte[] apply(RaftCommit<byte[]> commit) {
* <p> * <p>
* By default, this method will configure state machine operations by extracting public methods with * By default, this method will configure state machine operations by extracting public methods with
* a single {@link RaftCommit} parameter via reflection. Override this method to explicitly register * a single {@link RaftCommit} parameter via reflection. Override this method to explicitly register
* state machine operations via the provided {@link RaftOperationExecutor}. * state machine operations via the provided {@link RaftServiceExecutor}.
* *
* @param executor The state machine executor. * @param executor The state machine executor.
*/ */
protected abstract void configure(RaftOperationExecutor executor); protected abstract void configure(RaftServiceExecutor executor);


/** /**
* Returns the service context. * Returns the service context.
Expand Down
Expand Up @@ -18,7 +18,6 @@
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.event.RaftEvent; import io.atomix.protocols.raft.event.RaftEvent;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.operation.RaftOperationExecutor;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.time.LogicalTimestamp; import io.atomix.time.LogicalTimestamp;
import io.atomix.time.WallClockTimestamp; import io.atomix.time.WallClockTimestamp;
Expand Down Expand Up @@ -72,7 +71,7 @@ public interface RaftCommit<T> {
* commit times are guaranteed to progress monotonically, never going back in time. * commit times are guaranteed to progress monotonically, never going back in time.
* <p> * <p>
* Users should <em>never</em> use {@code System} time to control behavior in a state machine and should instead rely * Users should <em>never</em> use {@code System} time to control behavior in a state machine and should instead rely
* upon {@link RaftCommit} times or use the {@link RaftOperationExecutor} for time-based controls. * upon {@link RaftCommit} times or use the {@link RaftServiceExecutor} for time-based controls.
* *
* @return The commit time. * @return The commit time.
*/ */
Expand Down
Expand Up @@ -17,7 +17,6 @@


import io.atomix.protocols.raft.event.RaftEvent; import io.atomix.protocols.raft.event.RaftEvent;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.operation.RaftOperationExecutor;
import io.atomix.protocols.raft.RaftServer; import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.session.RaftSession; import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.session.RaftSessionListener; import io.atomix.protocols.raft.session.RaftSessionListener;
Expand All @@ -41,7 +40,7 @@
* <p> * <p>
* <h3>State machine operations</h3> * <h3>State machine operations</h3>
* State machine operations are implemented as methods on the state machine. Operations can be automatically detected * State machine operations are implemented as methods on the state machine. Operations can be automatically detected
* by the state machine during setup or can be explicitly registered by overriding the {@link #configure(RaftOperationExecutor)} * by the state machine during setup or can be explicitly registered by overriding the {@link #configure(RaftServiceExecutor)}
* method. Each operation method must take a single {@link RaftCommit} argument for a specific operation type. * method. Each operation method must take a single {@link RaftCommit} argument for a specific operation type.
* <pre> * <pre>
* {@code * {@code
Expand Down Expand Up @@ -85,11 +84,11 @@
* that will be sent back to the client. * that will be sent back to the client.
* <p> * <p>
* <h3>Deterministic scheduling</h3> * <h3>Deterministic scheduling</h3>
* The {@link RaftOperationExecutor} is responsible for executing state machine operations sequentially and provides an * The {@link RaftServiceExecutor} is responsible for executing state machine operations sequentially and provides an
* interface similar to that of {@link java.util.concurrent.ScheduledExecutorService} to allow state machines to schedule * interface similar to that of {@link java.util.concurrent.ScheduledExecutorService} to allow state machines to schedule
* time-based callbacks. Because of the determinism requirement, scheduled callbacks are guaranteed to be executed * time-based callbacks. Because of the determinism requirement, scheduled callbacks are guaranteed to be executed
* deterministically as well. The executor can be accessed via the {@link #executor} field. * deterministically as well. The executor can be accessed via the {@link #executor} field.
* See the {@link RaftOperationExecutor} documentation for more information. * See the {@link RaftServiceExecutor} documentation for more information.
* <pre> * <pre>
* {@code * {@code
* public void putWithTtl(Commit<PutWithTtl> commit) { * public void putWithTtl(Commit<PutWithTtl> commit) {
Expand Down Expand Up @@ -154,7 +153,7 @@
* *
* @see RaftCommit * @see RaftCommit
* @see ServiceContext * @see ServiceContext
* @see RaftOperationExecutor * @see RaftServiceExecutor
*/ */
public interface RaftService extends Snapshottable, RaftSessionListener { public interface RaftService extends Snapshottable, RaftSessionListener {


Expand Down
Expand Up @@ -14,9 +14,12 @@
* limitations under the License. * limitations under the License.
*/ */


package io.atomix.protocols.raft.operation; package io.atomix.protocols.raft.service;


import io.atomix.protocols.raft.ReadConsistency; import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.RaftCommit;
import io.atomix.protocols.raft.service.RaftService; import io.atomix.protocols.raft.service.RaftService;
import io.atomix.protocols.raft.service.ServiceContext; import io.atomix.protocols.raft.service.ServiceContext;
Expand Down Expand Up @@ -65,7 +68,7 @@
* @see RaftService * @see RaftService
* @see ServiceContext * @see ServiceContext
*/ */
public interface RaftOperationExecutor extends ThreadContext { public interface RaftServiceExecutor extends ThreadContext {


/** /**
* Applies the given commit to the executor. * Applies the given commit to the executor.
Expand Down
Expand Up @@ -13,13 +13,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.atomix.protocols.raft.operation.impl; package io.atomix.protocols.raft.service.impl;


import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.service.RaftCommit; import io.atomix.protocols.raft.service.RaftCommit;
import io.atomix.protocols.raft.RaftException; import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.operation.RaftOperationExecutor; import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.service.AbstractRaftService; import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.ServiceContext; import io.atomix.protocols.raft.service.ServiceContext;
import io.atomix.utils.concurrent.Scheduled; import io.atomix.utils.concurrent.Scheduled;
Expand All @@ -43,7 +43,7 @@
/** /**
* Default operation executor. * Default operation executor.
*/ */
public class DefaultRaftOperationExecutor implements RaftOperationExecutor { public class DefaultRaftServiceExecutor implements RaftServiceExecutor {
private final Logger log; private final Logger log;
private final Queue<Runnable> tasks = new LinkedList<>(); private final Queue<Runnable> tasks = new LinkedList<>();
private final List<ScheduledTask> scheduledTasks = new ArrayList<>(); private final List<ScheduledTask> scheduledTasks = new ArrayList<>();
Expand All @@ -52,7 +52,7 @@ public class DefaultRaftOperationExecutor implements RaftOperationExecutor {
private OperationType operationType; private OperationType operationType;
private long timestamp; private long timestamp;


public DefaultRaftOperationExecutor(ServiceContext context) { public DefaultRaftServiceExecutor(ServiceContext context) {
this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(AbstractRaftService.class) this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(AbstractRaftService.class)
.addValue(context.serviceId()) .addValue(context.serviceId())
.add("type", context.serviceType()) .add("type", context.serviceType())
Expand Down
Expand Up @@ -24,7 +24,7 @@
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.operation.RaftOperationExecutor; import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.protocol.TestRaftProtocolFactory; import io.atomix.protocols.raft.protocol.TestRaftProtocolFactory;
import io.atomix.protocols.raft.proxy.RaftProxy; import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.service.AbstractRaftService; import io.atomix.protocols.raft.service.AbstractRaftService;
Expand Down Expand Up @@ -1326,7 +1326,7 @@ public static class TestStateMachine extends AbstractRaftService {
private RaftCommit<Void> close; private RaftCommit<Void> close;


@Override @Override
protected void configure(RaftOperationExecutor executor) { protected void configure(RaftServiceExecutor executor) {
executor.register(WRITE, this::write, clientSerializer::encode); executor.register(WRITE, this::write, clientSerializer::encode);
executor.register(READ, this::read, clientSerializer::encode); executor.register(READ, this::read, clientSerializer::encode);
executor.register(EVENT, clientSerializer::decode, this::event, clientSerializer::encode); executor.register(EVENT, clientSerializer::decode, this::event, clientSerializer::encode);
Expand Down
Expand Up @@ -25,7 +25,7 @@
import io.atomix.protocols.raft.operation.OperationId; import io.atomix.protocols.raft.operation.OperationId;
import io.atomix.protocols.raft.operation.OperationType; import io.atomix.protocols.raft.operation.OperationType;
import io.atomix.protocols.raft.operation.RaftOperation; import io.atomix.protocols.raft.operation.RaftOperation;
import io.atomix.protocols.raft.operation.RaftOperationExecutor; import io.atomix.protocols.raft.service.RaftServiceExecutor;
import io.atomix.protocols.raft.protocol.AppendRequest; import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse; import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest; import io.atomix.protocols.raft.protocol.CloseSessionRequest;
Expand Down Expand Up @@ -528,7 +528,7 @@ public class PerformanceStateMachine extends AbstractRaftService {
private Map<String, String> map = new HashMap<>(); private Map<String, String> map = new HashMap<>();


@Override @Override
protected void configure(RaftOperationExecutor executor) { protected void configure(RaftServiceExecutor executor) {
executor.register(PUT, clientSerializer::decode, this::put, clientSerializer::encode); executor.register(PUT, clientSerializer::decode, this::put, clientSerializer::encode);
executor.register(GET, clientSerializer::decode, this::get, clientSerializer::encode); executor.register(GET, clientSerializer::decode, this::get, clientSerializer::encode);
executor.register(REMOVE, clientSerializer::decode, this::remove, clientSerializer::encode); executor.register(REMOVE, clientSerializer::decode, this::remove, clientSerializer::encode);
Expand Down

0 comments on commit 492a2b7

Please sign in to comment.