Skip to content

Commit

Permalink
Workflow definition refactoring (#432)
Browse files Browse the repository at this point in the history
* extend AbstractWorkflowDefinition and deprecate enum-based WorkflowDefinition

* remove WorkflowDefinition

* fix state method name

* use common test states

* remove deprecated stuff, remove some enum-based states from tests

* rename SimpleState -> State and add javadocs

* update javadocs regarding registering workflow states

* restore required maven version to 3.6

* add junit-platform-commons to test deps to enable running tests in eclipse

* auto-register workflow states defined as static fields in workflow definition class and ensure all registered states have valid state method if needed

* add test to check states are registered as expected in AbstractWorkflowDefinition

* add tests to ensure states cannot be registered with invalid state methods

* update module pom versions to 8.0.0-SNAPSHOT

* add try-catch around setAccessible

* fix bare-minimum and full-stack examples

Co-authored-by: Edvard Fonsell <edvard.fonsell@nitorcreations.com>
  • Loading branch information
efonsell and Edvard Fonsell committed Jan 21, 2022
1 parent e54f774 commit 891c719
Show file tree
Hide file tree
Showing 117 changed files with 1,624 additions and 2,123 deletions.
18 changes: 14 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
## 7.4.1-SNAPSHOT (future release)
## 8.0.0-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- BREAKING CHANGE: Remove `WorkflowDefinition`, workflow definitions should extend `AbstractWorkflowDefinition` instead.
- BREAKING CHANGE: Remove deprecated `WorkflowState.isRetryAllowed`, set exception analyzer for workflow definition instead (if needed).

**Details**
- `nflow-engine`
- Dependency updates:
- minimum supported maven version for buliding is 3.8
- Workflow definitions that used to extend `WorkflowDefinition` should now extend `AbstractWorkflowDefinition` instead.
- It is not necessary to define the workflow states as an enum anymore, which should make it easier to extend and reuse states across different workflow definitions.
- You can define the states as instances of `io.nflow.engine.workflow.curated.State` or anything else that implements the required `io.nflow.engine.workflow.definition.WorkflowState` interface.
- The workflow definitions must now register all possible states as described in `io.nflow.engine.workflow.definition.AbstractWorkflowDefinition`.
- `WorkflowState.isRetryAllowed` was removed. If it was overridden, you can use `new WorkflowSettings.Builder().setExceptionAnalyzer(...)` to change the behavior. The default behavior was not changed.
- Dependency updates
- logback-classic update to version 1.2.10
- http://mailman.qos.ch/pipermail/announce/2021/000164.html
- https://jira.qos.ch/browse/LOGBACK-1591
Expand All @@ -19,6 +26,7 @@
- jodatime 2.10.3
- slf4j 1.7.32
- `nflow-rest-api`
- Dependency updates
- swagger 1.6.4
- `nflow-jetty`
- Dependency updates
Expand All @@ -31,12 +39,14 @@
- Dependency updates
- metrics 4.2.7
- `nflow-tests`
- Dependency updates
- h2 2.0.206
- Note: If you have persisted any h2 databases you must take a backup and restore. Also the nflow h2 schema changed to work with 2.x release of h2.
- Note: If you have persisted any h2 databases you must take a backup and restore. Also the nFlow h2 schema changed to work with 2.x release of h2.
- mssql 9.4.1
- mysql 8.0.27
- mariadb 2.7.4
- postgresql 42.3.1
- Minimum supported Maven version for building is 3.6

## 7.4.0 (2021-12-27)

Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>7.4.1-SNAPSHOT</version>
<version>8.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
4 changes: 1 addition & 3 deletions nflow-engine/src/main/java/io/nflow/engine/NflowEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.nflow.engine.service.WorkflowExecutorService;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* NflowEngine starts up nflow-engine with given database and workflow definitions.
Expand Down Expand Up @@ -53,8 +52,7 @@ public class NflowEngine implements AutoCloseable {
* @param workflowDefinitions
* The registered workflow definitions.
*/
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants, Collection<AbstractWorkflowDefinition> workflowDefinitions) {
ctx = new AnnotationConfigApplicationContext();

ctx.registerBean("nflowDatasource", DataSource.class, () -> dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,31 @@

@Singleton
public class NflowController {
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition<?>> workflows;
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition> workflows;

@Inject
public NflowController(WorkflowLifecycle lifecycle,
WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter,
Set<AbstractWorkflowDefinition<?>> workflowDefinitions
) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}
@Inject
public NflowController(WorkflowLifecycle lifecycle, WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter, Set<AbstractWorkflowDefinition> workflowDefinitions) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}

public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
}

public void stop() {
lifecycle.stop();
}
public void stop() {
lifecycle.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public WorkflowDefinitionDao(SQLVariants sqlVariants,
this.executorInfo = executorDao;
}

public void storeWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
public void storeWorkflowDefinition(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition storedDefinition = convert(definition);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("type", definition.getType());
Expand Down Expand Up @@ -111,7 +111,7 @@ public StoredWorkflowDefinition mapRow(ResultSet rs, int rowNum) throws SQLExcep
});
}

StoredWorkflowDefinition convert(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition convert(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition resp = new StoredWorkflowDefinition();
resp.type = definition.getType();
resp.description = definition.getDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void runImpl() {
logger.debug("Starting.");
WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null);
logIfLagging(instance);
AbstractWorkflowDefinition<? extends WorkflowState> definition = workflowDefinitions.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type);
if (definition == null) {
rescheduleUnknownWorkflowType(instance);
return;
Expand Down Expand Up @@ -263,7 +263,7 @@ private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, i
}

private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance,
AbstractWorkflowDefinition<?> definition, WorkflowInstanceAction.Builder actionBuilder) {
AbstractWorkflowDefinition definition, WorkflowInstanceAction.Builder actionBuilder) {
if (definition.getMethod(execution.getNextState()) == null && execution.getNextActivation() != null) {
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
Expand All @@ -272,8 +272,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
try {
String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
AbstractWorkflowDefinition<? extends WorkflowState> parentDefinition = workflowDefinitions
.getWorkflowDefinition(parentType);
AbstractWorkflowDefinition parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
String[] waitStates = parentDefinition.getStates().stream() //
.filter(state -> state.getType() == WorkflowStateType.wait) //
.map(WorkflowState::name) //
Expand Down Expand Up @@ -381,8 +380,8 @@ private boolean isNextActivationImmediately(StateExecutionImpl execution) {
&& !execution.getNextActivation().isAfterNow();
}

private NextAction processWithListeners(WorkflowInstance instance,
AbstractWorkflowDefinition<? extends WorkflowState> definition, StateExecutionImpl execution, WorkflowState state) {
private NextAction processWithListeners(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
ProcessingExecutorListener processingListener = new ProcessingExecutorListener(instance, definition, execution, state);
List<WorkflowExecutorListener> chain = new ArrayList<>(executorListeners.size() + 1);
chain.addAll(executorListeners);
Expand Down Expand Up @@ -431,11 +430,11 @@ public NextAction next(ListenerContext context) {

private class ProcessingExecutorListener implements WorkflowExecutorListener {
private final WorkflowInstance instance;
private final AbstractWorkflowDefinition<? extends WorkflowState> definition;
private final AbstractWorkflowDefinition definition;
private final StateExecutionImpl execution;
private final WorkflowState state;

public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition<? extends WorkflowState> definition,
public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
this.instance = instance;
this.definition = definition;
Expand All @@ -451,7 +450,7 @@ public NextAction process(ListenerContext context, ListenerChain chain) {

private class NormalStateHandler extends StateHandler {

public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
super(instance, definition, execution, currentState);
}
Expand All @@ -466,7 +465,7 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {
private class SkippedStateHandler extends StateHandler {
private final NextAction nextAction;

public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition<?> definition,
public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState currentState) {
super(instance, definition, execution, currentState);
this.nextAction = nextAction;
Expand All @@ -480,11 +479,11 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {

private abstract class StateHandler {
protected final WorkflowInstance instance;
protected final AbstractWorkflowDefinition<?> definition;
protected final AbstractWorkflowDefinition definition;
protected final StateExecutionImpl execution;
protected final WorkflowState currentState;

public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
this.instance = instance;
this.definition = definition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public boolean isHistoryCleaningForced() {
return historyCleaningForced;
}

public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?> definition) {
public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition definition) {
if (getRetries() >= definition.getSettings().maxRetries) {
isRetryCountExceeded = true;
handleFailure(definition, "Max retry count exceeded");
Expand All @@ -287,7 +287,7 @@ public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?>
}
}

public void handleFailure(AbstractWorkflowDefinition<?> definition, String failureReason) {
public void handleFailure(AbstractWorkflowDefinition definition, String failureReason) {
setRetry(false);
String currentStateName = getCurrentStateName();
WorkflowState failureState = definition.getFailureTransitions().get(currentStateName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static java.util.stream.Collectors.toCollection;
import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.ReflectionUtils.doWithFields;
import static org.springframework.util.ReflectionUtils.doWithMethods;
import static org.springframework.util.ReflectionUtils.findMethod;
import static org.springframework.util.ReflectionUtils.invokeMethod;
Expand All @@ -20,6 +21,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -36,20 +38,21 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.StateVar;
import io.nflow.engine.workflow.definition.WorkflowState;

public class WorkflowDefinitionScanner {

private static final Logger logger = getLogger(WorkflowDefinitionScanner.class);

private static final Set<Class<?>> boxedPrimitiveTypes = Stream
.of(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));
.of(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));

private static final Set<Type> knownImmutableTypes = Stream
.of(Boolean.TYPE, Boolean.class, Byte.TYPE, Byte.class, Character.TYPE, Character.class, Short.TYPE, Short.class,
Integer.TYPE, Integer.class, Long.TYPE, Long.class, Float.TYPE, Float.class, Double.TYPE, Double.class, String.class,
BigDecimal.class, BigInteger.class, Enum.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));
.of(Boolean.TYPE, Boolean.class, Byte.TYPE, Byte.class, Character.TYPE, Character.class, Short.TYPE, Short.class,
Integer.TYPE, Integer.class, Long.TYPE, Long.class, Float.TYPE, Float.class, Double.TYPE, Double.class, String.class,
BigDecimal.class, BigInteger.class, Enum.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));

public Map<String, WorkflowStateMethod> getStateMethods(Class<?> definition) {
final Map<String, WorkflowStateMethod> methods = new LinkedHashMap<>();
Expand Down Expand Up @@ -91,6 +94,19 @@ public Map<String, WorkflowStateMethod> getStateMethods(Class<?> definition) {
return methods;
}

public Set<WorkflowState> getStaticWorkflowStates(Class<?> definition) {
final Set<WorkflowState> states = new HashSet<>();
doWithFields(definition, field -> {
try {
field.setAccessible(true);
states.add((WorkflowState) field.get(null));
} catch (Exception e) {
logger.warn("Failed to access state field {}", field, e);
}
}, field -> isStatic(field.getModifiers()) && WorkflowState.class.isAssignableFrom(field.getType()));
return states;
}

boolean isReadOnly(Type type) {
return knownImmutableTypes.contains(type);
}
Expand All @@ -108,7 +124,8 @@ Object defaultValue(StateVar stateInfo, Class<?> clazz) {
Constructor<?> ctr = clazz.getConstructor();
ctr.newInstance();
return ctr;
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
logger.warn("Could not instantiate {} using empty constructor", clazz, e);
}
}
Expand All @@ -120,8 +137,8 @@ static final class WorkflowTransitionMethod implements MethodFilter {
public boolean matches(Method method) {
int mod = method.getModifiers();
Class<?>[] parameterTypes = method.getParameterTypes();
return isPublic(mod) && !isStatic(mod) && hasStateExecutionParameter(parameterTypes) &&
hasValidReturnType(method.getReturnType());
return isPublic(mod) && !isStatic(mod) && hasStateExecutionParameter(parameterTypes)
&& hasValidReturnType(method.getReturnType());
}

private boolean hasValidReturnType(Class<?> returnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinition
}

public WorkflowInstance process(WorkflowInstance instance) {
AbstractWorkflowDefinition<?> def = workflowDefinitionService.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition def = workflowDefinitionService.getWorkflowDefinition(instance.type);
if (def == null) {
throw new IllegalArgumentException("No workflow definition found for type [" + instance.type + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ListenerContext extends ModelObject {
/**
* The definition of the workflow.
*/
public final AbstractWorkflowDefinition<?> definition;
public final AbstractWorkflowDefinition definition;

/**
* The name of the state of the workflow instance before processing.
Expand Down Expand Up @@ -70,7 +70,7 @@ public class ListenerContext extends ModelObject {
*/
public final Map<Object, Object> data = new LinkedHashMap<>();

public ListenerContext(AbstractWorkflowDefinition<?> definition, WorkflowInstance instance, StateExecution stateExecution) {
public ListenerContext(AbstractWorkflowDefinition definition, WorkflowInstance instance, StateExecution stateExecution) {
this.definition = definition;
this.instance = instance;
this.stateExecution = stateExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.nflow.engine.config.NFlow;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined in the class name listing resource.
Expand All @@ -37,12 +36,10 @@ public WorkflowDefinitionClassNameScanner(WorkflowDefinitionService workflowDefi
while ((row = br.readLine()) != null) {
logger.info("Preparing workflow {}", row);
@SuppressWarnings("unchecked")
Class<AbstractWorkflowDefinition<? extends WorkflowState>> clazz = (Class<AbstractWorkflowDefinition<? extends WorkflowState>>) Class
.forName(row);
Class<AbstractWorkflowDefinition> clazz = (Class<AbstractWorkflowDefinition>) Class.forName(row);
workflowDefinitionService.addWorkflowDefinition(clazz.getDeclaredConstructor().newInstance());
}
}
}
}

}
Loading

0 comments on commit 891c719

Please sign in to comment.