diff --git a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateFactory.java b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateFactory.java new file mode 100644 index 0000000000..d446dd9437 --- /dev/null +++ b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat.state; + +/** + * State machine state factory. + * + * @author Jordan Halterman + */ +public interface StateFactory { + + /** + * Creates a new state. + * + * @return A new state instance. + */ + T createState(); + +} diff --git a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java index 16645ad5d5..1ce88d6875 100644 --- a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java +++ b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachine.java @@ -1,5 +1,5 @@ /* - * Copyright 2014 the original author or authors. + * Copyright 2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,70 +16,41 @@ package net.kuujo.copycat.state; import net.kuujo.copycat.cluster.ClusterConfig; -import net.kuujo.copycat.resource.ResourceContext; -import net.kuujo.copycat.resource.internal.AbstractResource; +import net.kuujo.copycat.resource.PartitionContext; +import net.kuujo.copycat.resource.internal.AbstractPartitionedResource; -import java.lang.reflect.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.stream.Collectors; /** * State machine. * * @author Jordan Halterman */ -public class StateMachine extends AbstractResource> { - - /** - * Creates a new state machine with the given cluster and state machine configurations. - * - * @param config The state machine configuration. - * @param cluster The cluster configuration. - * @return A new state machine instance. - */ - public static StateMachine create(T state, StateMachineConfig config, ClusterConfig cluster) { - return new StateMachine<>(state, config, cluster); - } - - /** - * Creates a new state machine with the given cluster and state machine configurations. - * - * @param config The state machine configuration. - * @param cluster The cluster configuration. - * @param executor An executor on which to execute state machine callbacks. - * @return A new state machine instance. - */ - public static StateMachine create(T state, StateMachineConfig config, ClusterConfig cluster, Executor executor) { - return new StateMachine<>(state, config, cluster, executor); - } - - private T state; - private final StateLog> log; +public class StateMachine extends AbstractPartitionedResource, StateMachinePartition> { + private final StateFactory stateFactory; private final InvocationHandler handler = new StateProxyInvocationHandler(); - private final Map methodCache = new ConcurrentHashMap<>(); - public StateMachine(T state, StateMachineConfig config, ClusterConfig cluster) { - this(state, new ResourceContext(config, cluster)); + public StateMachine(StateFactory stateFactory, StateMachineConfig config, ClusterConfig cluster) { + super(config, cluster); + if (stateFactory == null) + throw new NullPointerException("stateFactory cannot be null"); + this.stateFactory = stateFactory; } - public StateMachine(T state, StateMachineConfig config, ClusterConfig cluster, Executor executor) { - this(state, new ResourceContext(config, cluster, executor)); + public StateMachine(StateFactory stateFactory, StateMachineConfig config, ClusterConfig cluster, Executor executor) { + super(config, cluster, executor); + if (stateFactory == null) + throw new NullPointerException("stateFactory cannot be null"); + this.stateFactory = stateFactory; } - @SuppressWarnings("unchecked") - public StateMachine(T state, ResourceContext context) { - super(context); - if (state == null) - throw new NullPointerException("state cannot be null"); - this.state = state; - this.log = new StateLog<>(context); - registerCommands(); + @Override + protected StateMachinePartition createPartition(PartitionContext context) { + return new StateMachinePartition<>(stateFactory.createState(), context); } /** @@ -94,81 +65,6 @@ public U createProxy(Class type) { return (U) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{type}, handler); } - @Override - public synchronized CompletableFuture> open() { - return log.open().thenApply(v -> this); - } - - @Override - public boolean isOpen() { - return log.isOpen(); - } - - @Override - public synchronized CompletableFuture close() { - return log.close(); - } - - @Override - public boolean isClosed() { - return log.isClosed(); - } - - /** - * Registers commands on the status log. - */ - private void registerCommands() { - for (Method method : state.getClass().getMethods()) { - Read query = method.getAnnotation(Read.class); - if (query != null) { - log.register(getOperationName(method), Command.Type.READ, wrapOperation(method), query.consistency()); - } else { - Delete delete = method.getAnnotation(Delete.class); - if (delete != null) { - log.register(getOperationName(method), Command.Type.DELETE, wrapOperation(method)); - } else { - Write command = method.getAnnotation(Write.class); - if (command != null || Modifier.isPublic(method.getModifiers())) { - log.register(getOperationName(method), Command.Type.WRITE, wrapOperation(method)); - } - } - } - } - } - - /** - * Gets the cached method operation name or generates and caches an operation name if it's not already cached. - */ - private String getOperationName(Method method) { - return methodCache.computeIfAbsent(method, m -> new StringBuilder() - .append(m.getName()) - .append('(') - .append(String.join(",", Arrays.asList(m.getParameterTypes()).stream().map(Class::getCanonicalName).collect(Collectors.toList()))) - .append(')') - .toString()); - } - - /** - * Wraps a status log operation for the given method. - * - * @param method The method for which to create the status log command. - * @return The generated status log command. - */ - private Command, Object> wrapOperation(Method method) { - return (key, values) -> { - try { - Object[] args = new Object[values.size() + 1]; - args[0] = key; - for (int i = 0; i < values.size(); i++) { - args[i+1] = values.get(i); - } - return method.invoke(state, args); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new IllegalStateException(e); - } - }; - } - /** * State proxy invocation handler. */ @@ -177,9 +73,9 @@ private class StateProxyInvocationHandler implements InvocationHandler { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Class returnType = method.getReturnType(); if (returnType == CompletableFuture.class) { - return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0]))); + return partition(args[0]).submit(method, args); } - return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0]))).get(); + return partition(args[0]).submit(method, args).get(); } } diff --git a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java index 2e5106245a..7b3451d6c1 100644 --- a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java +++ b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachineConfig.java @@ -15,6 +15,10 @@ */ package net.kuujo.copycat.state; +import net.kuujo.copycat.raft.Consistency; +import net.kuujo.copycat.resource.PartitionedResourceConfig; +import net.kuujo.copycat.resource.ResourceConfig; + /** * State machine configuration. * @@ -25,6 +29,14 @@ public class StateMachineConfig extends StateLogConfig { public StateMachineConfig() { } + public StateMachineConfig(ResourceConfig config) { + super(config); + } + + public StateMachineConfig(PartitionedResourceConfig config) { + super(config); + } + protected StateMachineConfig(StateMachineConfig config) { super(config); } @@ -34,4 +46,10 @@ public StateMachineConfig copy() { return new StateMachineConfig(this); } + @Override + public StateMachineConfig withDefaultConsistency(Consistency consistency) { + super.setDefaultConsistency(consistency); + return this; + } + } diff --git a/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachinePartition.java b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachinePartition.java new file mode 100644 index 0000000000..e2c378a61c --- /dev/null +++ b/resources/state-machine/src/main/java/net/kuujo/copycat/state/StateMachinePartition.java @@ -0,0 +1,129 @@ +/* + * Copyright 2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.kuujo.copycat.state; + +import net.kuujo.copycat.resource.PartitionContext; +import net.kuujo.copycat.resource.internal.AbstractPartition; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * State machine. + * + * @author Jordan Halterman + */ +public class StateMachinePartition extends AbstractPartition> { + private T state; + private final StateLogPartition> log; + private final Map methodCache = new ConcurrentHashMap<>(); + + public StateMachinePartition(T state, PartitionContext context) { + super(context); + if (state == null) + throw new NullPointerException("state cannot be null"); + this.state = state; + this.log = new StateLogPartition<>(context); + registerCommands(); + } + + @Override + public synchronized CompletableFuture> open() { + return log.open().thenApply(v -> this); + } + + @Override + public boolean isOpen() { + return log.isOpen(); + } + + @Override + public synchronized CompletableFuture close() { + return log.close(); + } + + @Override + public boolean isClosed() { + return log.isClosed(); + } + + /** + * Registers commands on the status log. + */ + private void registerCommands() { + for (Method method : state.getClass().getMethods()) { + Read query = method.getAnnotation(Read.class); + if (query != null) { + log.register(getOperationName(method), Command.Type.READ, wrapOperation(method), query.consistency()); + } else { + Delete delete = method.getAnnotation(Delete.class); + if (delete != null) { + log.register(getOperationName(method), Command.Type.DELETE, wrapOperation(method)); + } else { + Write command = method.getAnnotation(Write.class); + if (command != null || Modifier.isPublic(method.getModifiers())) { + log.register(getOperationName(method), Command.Type.WRITE, wrapOperation(method)); + } + } + } + } + } + + /** + * Gets the cached method operation name or generates and caches an operation name if it's not already cached. + */ + private String getOperationName(Method method) { + return methodCache.computeIfAbsent(method, m -> { + return m.getName() + "(" + String.join(",", Arrays.asList(m.getParameterTypes()).stream().map(Class::getCanonicalName).collect(Collectors.toList())) + ")"; + }); + } + + /** + * Wraps a status log operation for the given method. + * + * @param method The method for which to create the status log command. + * @return The generated status log command. + */ + private Command, Object> wrapOperation(Method method) { + return (key, values) -> { + try { + Object[] args = new Object[values.size() + 1]; + args[0] = key; + for (int i = 0; i < values.size(); i++) { + args[i+1] = values.get(i); + } + return method.invoke(state, args); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException(e); + } + }; + } + + /** + * Submits a command to the state machine partition. + */ + protected CompletableFuture submit(Method method, Object[] args) { + return log.submit(getOperationName(method), Arrays.asList(args)); + } + +} diff --git a/resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachineTest.java b/resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachinePartitionTest.java similarity index 95% rename from resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachineTest.java rename to resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachinePartitionTest.java index c31658499f..8a481625e2 100644 --- a/resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachineTest.java +++ b/resources/state-machine/src/test/java/net/kuujo/copycat/state/StateMachinePartitionTest.java @@ -23,5 +23,5 @@ * @author Jordan Halterman */ @Test -public class StateMachineTest { +public class StateMachinePartitionTest { }