Skip to content

Commit

Permalink
Replace state machine interface with single implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Feb 16, 2015
1 parent 3bb00f0 commit 02a3674
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 271 deletions.
Expand Up @@ -15,26 +15,34 @@
*/ */
package net.kuujo.copycat.state; package net.kuujo.copycat.state;


import net.kuujo.copycat.cluster.Cluster;
import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.resource.Resource; import net.kuujo.copycat.resource.ResourceContext;
import net.kuujo.copycat.state.internal.DefaultStateMachine; import net.kuujo.copycat.resource.internal.AbstractResource;
import net.kuujo.copycat.util.internal.Assert;


import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;


/** /**
* State machine. * State machine.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public interface StateMachine<T> extends Resource<StateMachine<T>> { public class StateMachine<T> extends AbstractResource<StateMachine<T>> {


/** /**
* Creates a new state machine, loading the log configuration from the classpath. * Creates a new state machine, loading the log configuration from the classpath.
* *
* @param <T> The state machine entry type. * @param <T> The state machine entry type.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create() { public static <T> StateMachine<T> create() {
return create(new StateMachineConfig(), new ClusterConfig()); return create(new StateMachineConfig(), new ClusterConfig());
} }


Expand All @@ -44,7 +52,7 @@ static <T> StateMachine<T> create() {
* @param <T> The state machine entry type. * @param <T> The state machine entry type.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(Executor executor) { public static <T> StateMachine<T> create(Executor executor) {
return create(new StateMachineConfig(), new ClusterConfig(), executor); return create(new StateMachineConfig(), new ClusterConfig(), executor);
} }


Expand All @@ -55,7 +63,7 @@ static <T> StateMachine<T> create(Executor executor) {
* @param <T> The state machine entry type. * @param <T> The state machine entry type.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(String name) { public static <T> StateMachine<T> create(String name) {
return create(new StateMachineConfig(name), new ClusterConfig(String.format("cluster.%s", name))); return create(new StateMachineConfig(name), new ClusterConfig(String.format("cluster.%s", name)));
} }


Expand All @@ -67,7 +75,7 @@ static <T> StateMachine<T> create(String name) {
* @param <T> The state machine entry type. * @param <T> The state machine entry type.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(String name, Executor executor) { public static <T> StateMachine<T> create(String name, Executor executor) {
return create(new StateMachineConfig(name), new ClusterConfig(String.format("cluster.%s", name)), executor); return create(new StateMachineConfig(name), new ClusterConfig(String.format("cluster.%s", name)), executor);
} }


Expand All @@ -78,7 +86,7 @@ static <T> StateMachine<T> create(String name, Executor executor) {
* @param cluster The cluster configuration. * @param cluster The cluster configuration.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(String name, ClusterConfig cluster) { public static <T> StateMachine<T> create(String name, ClusterConfig cluster) {
return create(new StateMachineConfig(name), cluster); return create(new StateMachineConfig(name), cluster);
} }


Expand All @@ -90,7 +98,7 @@ static <T> StateMachine<T> create(String name, ClusterConfig cluster) {
* @param executor An executor on which to execute state machine callbacks. * @param executor An executor on which to execute state machine callbacks.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(String name, ClusterConfig cluster, Executor executor) { public static <T> StateMachine<T> create(String name, ClusterConfig cluster, Executor executor) {
return create(new StateMachineConfig(name), cluster, executor); return create(new StateMachineConfig(name), cluster, executor);
} }


Expand All @@ -101,8 +109,8 @@ static <T> StateMachine<T> create(String name, ClusterConfig cluster, Executor e
* @param cluster The cluster configuration. * @param cluster The cluster configuration.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig cluster) { public static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig cluster) {
return new DefaultStateMachine<>(config, cluster); return new StateMachine<>(config, cluster);
} }


/** /**
Expand All @@ -113,8 +121,79 @@ static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig clust
* @param executor An executor on which to execute state machine callbacks. * @param executor An executor on which to execute state machine callbacks.
* @return A new state machine instance. * @return A new state machine instance.
*/ */
static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig cluster, Executor executor) { public static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig cluster, Executor executor) {
return new DefaultStateMachine<>(config, cluster, executor); return new StateMachine<>(config, cluster, executor);
}

private final Class<T> stateType;
private T state;
private final StateLog<List<Object>> log;
private final InvocationHandler handler = new StateProxyInvocationHandler();
private Map<String, Object> data = new HashMap<>(1024);
private final Map<Class<?>, Method> initializers = new HashMap<>();
private final Map<Method, String> methodCache = new ConcurrentHashMap<>();
private final StateContext<T> context = new StateContext<T>() {
@Override
public Cluster cluster() {
return StateMachine.this.cluster();
}

@Override
public T state() {
return state;
}

@Override
public StateContext<T> put(String key, Object value) {
data.put(key, value);
return this;
}

@Override
@SuppressWarnings("unchecked")
public <U> U get(String key) {
return (U) data.get(key);
}

@Override
@SuppressWarnings("unchecked")
public <U> U remove(String key) {
return (U) data.remove(key);
}

@Override
public StateContext<T> clear() {
data.clear();
return this;
}

@Override
public StateContext<T> transition(T state) {
StateMachine.this.state = state;
initialize();
return this;
}
};

public StateMachine(StateMachineConfig config, ClusterConfig cluster) {
this(new ResourceContext(config, cluster));
}

public StateMachine(StateMachineConfig config, ClusterConfig cluster, Executor executor) {
this(new ResourceContext(config, cluster, executor));
}

@SuppressWarnings("unchecked")
public StateMachine(ResourceContext context) {
super(context);
this.stateType = Assert.isNotNull(context.<StateMachineConfig>config().getStateType(), "stateType");
try {
this.state = (T) Assert.isNotNull(context.<StateMachineConfig>config().getInitialState(), "initialState").newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
this.log = new StateLog<>(context);
registerCommands();
} }


/** /**
Expand All @@ -124,6 +203,151 @@ static <T> StateMachine<T> create(StateMachineConfig config, ClusterConfig clust
* @param <U> The proxy type. * @param <U> The proxy type.
* @return The proxy object. * @return The proxy object.
*/ */
<U> U createProxy(Class<U> type); @SuppressWarnings("unchecked")
public <U> U createProxy(Class<U> type) {
return (U) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{type}, handler);
}

/**
* Takes a snapshot of the status machine status.
*/
private Map<String, Object> snapshot() {
Map<String, Object> snapshot = new HashMap<>(2);
snapshot.put("state", state.getClass().getName());
snapshot.put("data", data);
return snapshot;
}

/**
* Installs a snapshot of the status machine status.
*/
@SuppressWarnings("unchecked")
private void install(Map<String, Object> snapshot) {
Object stateClassName = snapshot.get("state");
if (stateClassName == null) {
throw new IllegalStateException("Invalid snapshot");
}
try {
Class<?> stateClass = Class.forName(stateClassName.toString());
this.state = (T) stateClass.newInstance();
initialize();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalStateException("Invalid snapshot state");
}
this.data = (Map<String, Object>) snapshot.get("data");
}

@Override
public synchronized CompletableFuture<StateMachine<T>> open() {
log.snapshotWith(this::snapshot);
log.installWith(this::install);
return log.open().thenApply(v -> this);
}

@Override
public boolean isOpen() {
return log.isOpen();
}

@Override
public synchronized CompletableFuture<Void> close() {
return log.close().whenComplete((result, error) -> {
log.snapshotWith(null);
log.installWith(null);
});
}

@Override
public boolean isClosed() {
return log.isClosed();
}

/**
* Registers commands on the status log.
*/
private void registerCommands() {
for (Method method : stateType.getMethods()) {
Query query = method.getAnnotation(Query.class);
if (query != null) {
log.registerQuery(getOperationName(method), wrapOperation(method), query.consistency());
} else {
Command command = method.getAnnotation(Command.class);
if (command != null || Modifier.isPublic(method.getModifiers())) {
log.registerCommand(getOperationName(method), wrapOperation(method));
}
}
}
initialize();
}

/**
* Initializes the current status by locating the @Initializer method on the status class and caching the method.
*/
private void initialize() {
Method initializer = initializers.get(state.getClass());
if (initializer == null) {
for (Method method : state.getClass().getMethods()) {
if (method.isAnnotationPresent(Initializer.class)) {
initializer = method;
break;
}
}
if (initializer != null) {
initializers.put(state.getClass(), initializer);
}
}
if (initializer != null) {
try {
initializer.invoke(state, context);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
}
}

/**
* Gets the cached method operation name or generates and caches an operation name if it's not already cached.
*/
private String getOperationName(Method method) {
return methodCache.computeIfAbsent(method, m -> {
return new StringBuilder()
.append(m.getName())
.append('(')
.append(String.join(",", Arrays.asList(m.getParameterTypes()).stream().map(Class::getCanonicalName).collect(Collectors
.toList())))
.append(')')
.toString();
});
}

/**
* Wraps a status log operation for the given method.
*
* @param method The method for which to create the status log command.
* @return The generated status log command.
*/
private Function<List<Object>, Object> wrapOperation(Method method) {
return values -> {
try {
return method.invoke(state, values.toArray(new Object[values.size()]));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
};
}

/**
* State proxy invocation handler.
*/
private class StateProxyInvocationHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> returnType = method.getReturnType();
if (returnType == CompletableFuture.class) {
return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0])));
}
return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0]))).get();
}
}


} }

0 comments on commit 02a3674

Please sign in to comment.