Skip to content

Commit

Permalink
Update state machine resource to support partitioning.
Browse files Browse the repository at this point in the history
  • Loading branch information
kuujo committed Apr 17, 2015
1 parent 7dded8a commit f05109e
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 128 deletions.
@@ -0,0 +1,32 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.state;

/**
* State machine state factory.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public interface StateFactory<T> {

/**
* Creates a new state.
*
* @return A new state instance.
*/
T createState();

}
@@ -1,5 +1,5 @@
/* /*
* Copyright 2014 the original author or authors. * Copyright 2015 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
Expand All @@ -16,70 +16,41 @@
package net.kuujo.copycat.state; package net.kuujo.copycat.state;


import net.kuujo.copycat.cluster.ClusterConfig; import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.resource.ResourceContext; import net.kuujo.copycat.resource.PartitionContext;
import net.kuujo.copycat.resource.internal.AbstractResource; import net.kuujo.copycat.resource.internal.AbstractPartitionedResource;


import java.lang.reflect.*; import java.lang.reflect.InvocationHandler;
import java.util.ArrayList; import java.lang.reflect.Method;
import java.util.Arrays; import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.stream.Collectors;


/** /**
* State machine. * State machine.
* *
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
public class StateMachine<T> extends AbstractResource<StateMachine<T>> { public class StateMachine<T> extends AbstractPartitionedResource<StateMachine<T>, StateMachinePartition<T>> {

private final StateFactory<T> stateFactory;
/**
* Creates a new state machine with the given cluster and state machine configurations.
*
* @param config The state machine configuration.
* @param cluster The cluster configuration.
* @return A new state machine instance.
*/
public static <T> StateMachine<T> create(T state, StateMachineConfig config, ClusterConfig cluster) {
return new StateMachine<>(state, config, cluster);
}

/**
* Creates a new state machine with the given cluster and state machine configurations.
*
* @param config The state machine configuration.
* @param cluster The cluster configuration.
* @param executor An executor on which to execute state machine callbacks.
* @return A new state machine instance.
*/
public static <T> StateMachine<T> create(T state, StateMachineConfig config, ClusterConfig cluster, Executor executor) {
return new StateMachine<>(state, config, cluster, executor);
}

private T state;
private final StateLog<Object, List<Object>> log;
private final InvocationHandler handler = new StateProxyInvocationHandler(); private final InvocationHandler handler = new StateProxyInvocationHandler();
private final Map<Method, String> methodCache = new ConcurrentHashMap<>();


public StateMachine(T state, StateMachineConfig config, ClusterConfig cluster) { public StateMachine(StateFactory<T> stateFactory, StateMachineConfig config, ClusterConfig cluster) {
this(state, new ResourceContext(config, cluster)); super(config, cluster);
if (stateFactory == null)
throw new NullPointerException("stateFactory cannot be null");
this.stateFactory = stateFactory;
} }


public StateMachine(T state, StateMachineConfig config, ClusterConfig cluster, Executor executor) { public StateMachine(StateFactory<T> stateFactory, StateMachineConfig config, ClusterConfig cluster, Executor executor) {
this(state, new ResourceContext(config, cluster, executor)); super(config, cluster, executor);
if (stateFactory == null)
throw new NullPointerException("stateFactory cannot be null");
this.stateFactory = stateFactory;
} }


@SuppressWarnings("unchecked") @Override
public StateMachine(T state, ResourceContext context) { protected StateMachinePartition<T> createPartition(PartitionContext context) {
super(context); return new StateMachinePartition<>(stateFactory.createState(), context);
if (state == null)
throw new NullPointerException("state cannot be null");
this.state = state;
this.log = new StateLog<>(context);
registerCommands();
} }


/** /**
Expand All @@ -94,81 +65,6 @@ public <U> U createProxy(Class<U> type) {
return (U) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{type}, handler); return (U) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{type}, handler);
} }


@Override
public synchronized CompletableFuture<StateMachine<T>> open() {
return log.open().thenApply(v -> this);
}

@Override
public boolean isOpen() {
return log.isOpen();
}

@Override
public synchronized CompletableFuture<Void> close() {
return log.close();
}

@Override
public boolean isClosed() {
return log.isClosed();
}

/**
* Registers commands on the status log.
*/
private void registerCommands() {
for (Method method : state.getClass().getMethods()) {
Read query = method.getAnnotation(Read.class);
if (query != null) {
log.register(getOperationName(method), Command.Type.READ, wrapOperation(method), query.consistency());
} else {
Delete delete = method.getAnnotation(Delete.class);
if (delete != null) {
log.register(getOperationName(method), Command.Type.DELETE, wrapOperation(method));
} else {
Write command = method.getAnnotation(Write.class);
if (command != null || Modifier.isPublic(method.getModifiers())) {
log.register(getOperationName(method), Command.Type.WRITE, wrapOperation(method));
}
}
}
}
}

/**
* Gets the cached method operation name or generates and caches an operation name if it's not already cached.
*/
private String getOperationName(Method method) {
return methodCache.computeIfAbsent(method, m -> new StringBuilder()
.append(m.getName())
.append('(')
.append(String.join(",", Arrays.asList(m.getParameterTypes()).stream().map(Class::getCanonicalName).collect(Collectors.toList())))
.append(')')
.toString());
}

/**
* Wraps a status log operation for the given method.
*
* @param method The method for which to create the status log command.
* @return The generated status log command.
*/
private Command<Object, List<Object>, Object> wrapOperation(Method method) {
return (key, values) -> {
try {
Object[] args = new Object[values.size() + 1];
args[0] = key;
for (int i = 0; i < values.size(); i++) {
args[i+1] = values.get(i);
}
return method.invoke(state, args);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
};
}

/** /**
* State proxy invocation handler. * State proxy invocation handler.
*/ */
Expand All @@ -177,9 +73,9 @@ private class StateProxyInvocationHandler implements InvocationHandler {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> returnType = method.getReturnType(); Class<?> returnType = method.getReturnType();
if (returnType == CompletableFuture.class) { if (returnType == CompletableFuture.class) {
return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0]))); return partition(args[0]).submit(method, args);
} }
return log.submit(getOperationName(method), new ArrayList<>(Arrays.asList(args != null ? args : new Object[0]))).get(); return partition(args[0]).submit(method, args).get();
} }
} }


Expand Down
Expand Up @@ -15,6 +15,10 @@
*/ */
package net.kuujo.copycat.state; package net.kuujo.copycat.state;


import net.kuujo.copycat.raft.Consistency;
import net.kuujo.copycat.resource.PartitionedResourceConfig;
import net.kuujo.copycat.resource.ResourceConfig;

/** /**
* State machine configuration. * State machine configuration.
* *
Expand All @@ -25,6 +29,14 @@ public class StateMachineConfig extends StateLogConfig {
public StateMachineConfig() { public StateMachineConfig() {
} }


public StateMachineConfig(ResourceConfig<?> config) {
super(config);
}

public StateMachineConfig(PartitionedResourceConfig<?> config) {
super(config);
}

protected StateMachineConfig(StateMachineConfig config) { protected StateMachineConfig(StateMachineConfig config) {
super(config); super(config);
} }
Expand All @@ -34,4 +46,10 @@ public StateMachineConfig copy() {
return new StateMachineConfig(this); return new StateMachineConfig(this);
} }


@Override
public StateMachineConfig withDefaultConsistency(Consistency consistency) {
super.setDefaultConsistency(consistency);
return this;
}

} }
@@ -0,0 +1,129 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.kuujo.copycat.state;

import net.kuujo.copycat.resource.PartitionContext;
import net.kuujo.copycat.resource.internal.AbstractPartition;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* State machine.
*
* @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/
public class StateMachinePartition<T> extends AbstractPartition<StateMachinePartition<T>> {
private T state;
private final StateLogPartition<Object, List<Object>> log;
private final Map<Method, String> methodCache = new ConcurrentHashMap<>();

public StateMachinePartition(T state, PartitionContext context) {
super(context);
if (state == null)
throw new NullPointerException("state cannot be null");
this.state = state;
this.log = new StateLogPartition<>(context);
registerCommands();
}

@Override
public synchronized CompletableFuture<StateMachinePartition<T>> open() {
return log.open().thenApply(v -> this);
}

@Override
public boolean isOpen() {
return log.isOpen();
}

@Override
public synchronized CompletableFuture<Void> close() {
return log.close();
}

@Override
public boolean isClosed() {
return log.isClosed();
}

/**
* Registers commands on the status log.
*/
private void registerCommands() {
for (Method method : state.getClass().getMethods()) {
Read query = method.getAnnotation(Read.class);
if (query != null) {
log.register(getOperationName(method), Command.Type.READ, wrapOperation(method), query.consistency());
} else {
Delete delete = method.getAnnotation(Delete.class);
if (delete != null) {
log.register(getOperationName(method), Command.Type.DELETE, wrapOperation(method));
} else {
Write command = method.getAnnotation(Write.class);
if (command != null || Modifier.isPublic(method.getModifiers())) {
log.register(getOperationName(method), Command.Type.WRITE, wrapOperation(method));
}
}
}
}
}

/**
* Gets the cached method operation name or generates and caches an operation name if it's not already cached.
*/
private String getOperationName(Method method) {
return methodCache.computeIfAbsent(method, m -> {
return m.getName() + "(" + String.join(",", Arrays.asList(m.getParameterTypes()).stream().map(Class::getCanonicalName).collect(Collectors.toList())) + ")";
});
}

/**
* Wraps a status log operation for the given method.
*
* @param method The method for which to create the status log command.
* @return The generated status log command.
*/
private Command<Object, List<Object>, Object> wrapOperation(Method method) {
return (key, values) -> {
try {
Object[] args = new Object[values.size() + 1];
args[0] = key;
for (int i = 0; i < values.size(); i++) {
args[i+1] = values.get(i);
}
return method.invoke(state, args);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new IllegalStateException(e);
}
};
}

/**
* Submits a command to the state machine partition.
*/
protected <U> CompletableFuture<U> submit(Method method, Object[] args) {
return log.submit(getOperationName(method), Arrays.asList(args));
}

}
Expand Up @@ -23,5 +23,5 @@
* @author <a href="http://github.com/kuujo">Jordan Halterman</a> * @author <a href="http://github.com/kuujo">Jordan Halterman</a>
*/ */
@Test @Test
public class StateMachineTest { public class StateMachinePartitionTest {
} }

0 comments on commit f05109e

Please sign in to comment.