Skip to content

Commit

Permalink
Wrap all internal state machine filters and operations in predicates …
Browse files Browse the repository at this point in the history
…and functions respectively.
  • Loading branch information
kuujo committed Jun 13, 2015
1 parent d25f007 commit ea49b5a
Showing 1 changed file with 75 additions and 35 deletions.
110 changes: 75 additions & 35 deletions raft/src/main/java/net/kuujo/copycat/raft/StateMachine.java
Expand Up @@ -24,6 +24,8 @@
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.BiPredicate;
import java.util.function.Function;


/** /**
* Raft state machine. * Raft state machine.
Expand All @@ -32,10 +34,10 @@
*/ */
public abstract class StateMachine { public abstract class StateMachine {
private final Logger LOGGER = LoggerFactory.getLogger(getClass()); private final Logger LOGGER = LoggerFactory.getLogger(getClass());
private final Map<Compaction.Type, Map<Class<? extends Command>, Method>> filters = new HashMap<>(); private final Map<Compaction.Type, Map<Class<? extends Command>, BiPredicate<Commit<?>, Compaction>>> filters = new HashMap<>();
private Map<Compaction.Type, Method> allFilters = new HashMap<>(); private Map<Compaction.Type, BiPredicate<Commit<?>, Compaction>> allFilters = new HashMap<>();
private final Map<Class<? extends Operation>, Method> operations = new HashMap<>(); private final Map<Class<? extends Operation>, Function<Commit<?>, ?>> operations = new HashMap<>();
private Method allOperation; private Function<Commit<?>, ?> allOperation;


protected StateMachine() { protected StateMachine() {
init(); init();
Expand Down Expand Up @@ -79,48 +81,75 @@ private void declareFilters(Method method) {
for (Class<? extends Command> command : filter.value()) { for (Class<? extends Command> command : filter.value()) {
if (command == Filter.All.class) { if (command == Filter.All.class) {
if (!allFilters.containsKey(filter.compaction())) { if (!allFilters.containsKey(filter.compaction())) {
allFilters.put(filter.compaction(), method); if (method.getParameterCount() == 1) {
allFilters.put(filter.compaction(), wrapFilter(method));
}
} }
} else { } else {
Map<Class<? extends Command>, Method> filters = this.filters.get(filter.compaction()); Map<Class<? extends Command>, BiPredicate<Commit<?>, Compaction>> filters = this.filters.get(filter.compaction());
if (filters == null) { if (filters == null) {
filters = new HashMap<>(); filters = new HashMap<>();
this.filters.put(filter.compaction(), filters); this.filters.put(filter.compaction(), filters);
} }
if (!filters.containsKey(command)) { if (!filters.containsKey(command)) {
filters.put(command, method); filters.put(command, wrapFilter(method));
} }
} }
} }
} }
} }


/**
* Wraps a filter method.
*/
private BiPredicate<Commit<?>, Compaction> wrapFilter(Method method) {
if (method.getParameterCount() == 1) {
return (commit, compaction) -> {
try {
return (boolean) method.invoke(this, commit);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ApplicationException("failed to filter command", e);
}
};
} else if (method.getParameterCount() == 2) {
return (commit, compaction) -> {
try {
return (boolean) method.invoke(this, commit, compaction);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ApplicationException("failed to filter command", e);
}
};
} else {
throw new IllegalStateException("invalid filter method: too many parameters");
}
}

/** /**
* Finds the filter method for the given command. * Finds the filter method for the given command.
*/ */
private Method findFilter(Class<? extends Command> type, Compaction.Type compaction) { private BiPredicate<Commit<?>, Compaction> findFilter(Class<? extends Command> type, Compaction.Type compaction) {
Map<Class<? extends Command>, Method> filters = this.filters.get(compaction); Map<Class<? extends Command>, BiPredicate<Commit<?>, Compaction>> filters = this.filters.get(compaction);
if (filters == null) { if (filters == null) {
Method method = allFilters.get(compaction); BiPredicate<Commit<?>, Compaction> filter = allFilters.get(compaction);
if (method == null) { if (filter == null) {
throw new IllegalArgumentException("unknown command type: " + type); throw new IllegalArgumentException("unknown command type: " + type);
} }
return method; return filter;
} }


Method method = filters.computeIfAbsent(type, t -> { BiPredicate<Commit<?>, Compaction> filter = filters.computeIfAbsent(type, t -> {
for (Map.Entry<Class<? extends Command>, Method> entry : filters.entrySet()) { for (Map.Entry<Class<? extends Command>, BiPredicate<Commit<?>, Compaction>> entry : filters.entrySet()) {
if (entry.getKey().isAssignableFrom(type)) { if (entry.getKey().isAssignableFrom(type)) {
return entry.getValue(); return entry.getValue();
} }
} }
return allFilters.get(compaction); return allFilters.get(compaction);
}); });


if (method == null) { if (filter == null) {
throw new IllegalArgumentException("unknown command type: " + type); throw new IllegalArgumentException("unknown command type: " + type);
} }
return method; return filter;
} }


/** /**
Expand All @@ -132,31 +161,50 @@ private void declareOperations(Method method) {
method.setAccessible(true); method.setAccessible(true);
for (Class<? extends Operation> operation : apply.value()) { for (Class<? extends Operation> operation : apply.value()) {
if (operation == Apply.All.class) { if (operation == Apply.All.class) {
allOperation = method; allOperation = wrapOperation(method);
} else if (!operations.containsKey(operation)) { } else if (!operations.containsKey(operation)) {
operations.put(operation, method); operations.put(operation, wrapOperation(method));
} }
} }
} }
} }


/**
* Wraps an operation method.
*/
private Function<Commit<?>, ?> wrapOperation(Method method) {
if (method.getParameterCount() < 1) {
throw new IllegalStateException("invalid operation method: not enough arguments");
} else if (method.getParameterCount() > 1) {
throw new IllegalStateException("invalid operation method: too many arguments");
} else {
return commit -> {
try {
return method.invoke(this, commit);
} catch (IllegalAccessException | InvocationTargetException e) {
return new ApplicationException("failed to invoke operation", e);
}
};
}
}

/** /**
* Finds the operation method for the given operation. * Finds the operation method for the given operation.
*/ */
private Method findOperation(Class<? extends Operation> type) { private Function<Commit<?>, ?> findOperation(Class<? extends Operation> type) {
Method method = operations.computeIfAbsent(type, t -> { Function<Commit<?>, ?> operation = operations.computeIfAbsent(type, t -> {
for (Map.Entry<Class<? extends Operation>, Method> entry : operations.entrySet()) { for (Map.Entry<Class<? extends Operation>, Function<Commit<?>, ?>> entry : operations.entrySet()) {
if (entry.getKey().isAssignableFrom(type)) { if (entry.getKey().isAssignableFrom(type)) {
return entry.getValue(); return entry.getValue();
} }
} }
return allOperation; return allOperation;
}); });


if (method == null) { if (operation == null) {
throw new IllegalArgumentException("unknown operation type: " + type); throw new IllegalArgumentException("unknown operation type: " + type);
} }
return method; return operation;
} }


/** /**
Expand All @@ -176,12 +224,8 @@ public void register(Session session) {
* @return Whether to keep the commit. * @return Whether to keep the commit.
*/ */
public boolean filter(Commit<? extends Command> commit, Compaction compaction) { public boolean filter(Commit<? extends Command> commit, Compaction compaction) {
LOGGER.debug("filter {}", commit); LOGGER.debug("Filtering {}", commit);
try { return findFilter(commit.type(), compaction.type()).test(commit, compaction);
return (boolean) findFilter(commit.type(), compaction.type()).invoke(this, commit, compaction);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new ApplicationException("failed to filter command", e);
}
} }


/** /**
Expand All @@ -191,12 +235,8 @@ public boolean filter(Commit<? extends Command> commit, Compaction compaction) {
* @return The operation result. * @return The operation result.
*/ */
public Object apply(Commit<? extends Operation> commit) { public Object apply(Commit<? extends Operation> commit) {
LOGGER.debug("apply {}", commit); LOGGER.debug("Applying {}", commit);
try { return findOperation(commit.type()).apply(commit);
return findOperation(commit.type()).invoke(this, commit);
} catch (IllegalAccessException | InvocationTargetException e) {
return new ApplicationException("failed to invoke operation", e);
}
} }


/** /**
Expand Down

0 comments on commit ea49b5a

Please sign in to comment.