Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow definition refactoring #432

Merged
merged 35 commits into from
Jan 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b478cd7
extend AbstractWorkflowDefinition and deprecate enum-based WorkflowDe…
Feb 11, 2021
22bef8e
remove WorkflowDefinition
Feb 11, 2021
e535913
fix state method name
Feb 11, 2021
49551d0
fix
Feb 11, 2021
77b9679
format
Feb 11, 2021
5f4c612
use common test states
Feb 11, 2021
07f4f1d
cleanup
Feb 11, 2021
a349a43
restore backward compatibility
efonsell Feb 17, 2021
a837623
Revert "restore backward compatibility"
efonsell Feb 17, 2021
7991ac3
fixes after rebase
efonsell Feb 17, 2021
47874cc
fix
efonsell Feb 17, 2021
f886b9e
Merge branch 'master' into state-refactor
efonsell Dec 27, 2021
f789626
fix cronworkflow test
efonsell Dec 27, 2021
6c3b93c
update changelog
efonsell Dec 27, 2021
399ee33
remove deprecated stuff, remove some enum-based states from tests
efonsell Dec 28, 2021
6d373f3
update changelog
efonsell Dec 28, 2021
97a6b2b
Merge branch 'master' into state-refactor
efonsell Jan 15, 2022
0ee4c37
rename SimpleState -> State and add javadocs
efonsell Jan 15, 2022
0eed6c0
update javadocs regarding registering workflow states
efonsell Jan 15, 2022
ae8a47c
restore required maven version to 3.6
efonsell Jan 15, 2022
013f835
update changelog
efonsell Jan 15, 2022
3ad71ed
add junit-platform-commons to test deps to enable running tests in ec…
efonsell Jan 15, 2022
da77197
auto-register workflow states defined as static fields in workflow de…
efonsell Jan 15, 2022
2a577e2
update javadoc
efonsell Jan 15, 2022
6865f6a
Merge branch 'master' into state-refactor
efonsell Jan 15, 2022
1ffd88c
update changelog
efonsell Jan 15, 2022
1282b98
add test
efonsell Jan 15, 2022
49505e7
add test to check states are registered as expected in AbstractWorkfl…
efonsell Jan 15, 2022
c52a6dd
add tests to ensure states cannot be registered with invalid state me…
efonsell Jan 15, 2022
4bad2d9
update changelog
efonsell Jan 15, 2022
0ced4e9
Merge branch 'master' into state-refactor
efonsell Jan 15, 2022
3c172ee
update module pom versions to 8.0.0-SNAPSHOT
efonsell Jan 15, 2022
0e90218
add try-catch around setAccessible
efonsell Jan 20, 2022
b6eedcf
fix bare-minimum and full-stack examples
efonsell Jan 20, 2022
e52ac7f
improve readme
efonsell Jan 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
## 7.4.1-SNAPSHOT (future release)
## 8.0.0-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- BREAKING CHANGE: Remove `WorkflowDefinition`, workflow definitions should extend `AbstractWorkflowDefinition` instead.
- BREAKING CHANGE: Remove deprecated `WorkflowState.isRetryAllowed`, set exception analyzer for workflow definition instead (if needed).

**Details**
- `nflow-engine`
- Dependency updates:
- minimum supported maven version for buliding is 3.8
- Workflow definitions that used to extend `WorkflowDefinition` should now extend `AbstractWorkflowDefinition` instead.
- It is not necessary to define the workflow states as an enum anymore, which should make it easier to extend and reuse states across different workflow definitions.
- You can define the states as instances of `io.nflow.engine.workflow.curated.State` or anything else that implements the required `io.nflow.engine.workflow.definition.WorkflowState` interface.
- The workflow definitions must now register all possible states as described in `io.nflow.engine.workflow.definition.AbstractWorkflowDefinition`.
- `WorkflowState.isRetryAllowed` was removed. If it was overridden, you can use `new WorkflowSettings.Builder().setExceptionAnalyzer(...)` to change the behavior. The default behavior was not changed.
- Dependency updates
- logback-classic update to version 1.2.10
- http://mailman.qos.ch/pipermail/announce/2021/000164.html
- https://jira.qos.ch/browse/LOGBACK-1591
Expand All @@ -19,6 +26,7 @@
- jodatime 2.10.3
- slf4j 1.7.32
- `nflow-rest-api`
- Dependency updates
- swagger 1.6.4
- `nflow-jetty`
- Dependency updates
Expand All @@ -31,12 +39,14 @@
- Dependency updates
- metrics 4.2.7
- `nflow-tests`
- Dependency updates
- h2 2.0.206
- Note: If you have persisted any h2 databases you must take a backup and restore. Also the nflow h2 schema changed to work with 2.x release of h2.
- Note: If you have persisted any h2 databases you must take a backup and restore. Also the nFlow h2 schema changed to work with 2.x release of h2.
- mssql 9.4.1
- mysql 8.0.27
- mariadb 2.7.4
- postgresql 42.3.1
- Minimum supported Maven version for building is 3.6

## 7.4.0 (2021-12-27)

Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>7.4.1-SNAPSHOT</version>
<version>8.0.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
4 changes: 1 addition & 3 deletions nflow-engine/src/main/java/io/nflow/engine/NflowEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.nflow.engine.service.WorkflowExecutorService;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* NflowEngine starts up nflow-engine with given database and workflow definitions.
Expand Down Expand Up @@ -53,8 +52,7 @@ public class NflowEngine implements AutoCloseable {
* @param workflowDefinitions
* The registered workflow definitions.
*/
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants, Collection<AbstractWorkflowDefinition> workflowDefinitions) {
ctx = new AnnotationConfigApplicationContext();

ctx.registerBean("nflowDatasource", DataSource.class, () -> dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,31 @@

@Singleton
public class NflowController {
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition<?>> workflows;
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition> workflows;

@Inject
public NflowController(WorkflowLifecycle lifecycle,
WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter,
Set<AbstractWorkflowDefinition<?>> workflowDefinitions
) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}
@Inject
public NflowController(WorkflowLifecycle lifecycle, WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter, Set<AbstractWorkflowDefinition> workflowDefinitions) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}

public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
}

public void stop() {
lifecycle.stop();
}
public void stop() {
lifecycle.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public WorkflowDefinitionDao(SQLVariants sqlVariants,
this.executorInfo = executorDao;
}

public void storeWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
public void storeWorkflowDefinition(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition storedDefinition = convert(definition);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("type", definition.getType());
Expand Down Expand Up @@ -111,7 +111,7 @@ public StoredWorkflowDefinition mapRow(ResultSet rs, int rowNum) throws SQLExcep
});
}

StoredWorkflowDefinition convert(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition convert(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition resp = new StoredWorkflowDefinition();
resp.type = definition.getType();
resp.description = definition.getDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void runImpl() {
logger.debug("Starting.");
WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null);
logIfLagging(instance);
AbstractWorkflowDefinition<? extends WorkflowState> definition = workflowDefinitions.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type);
if (definition == null) {
rescheduleUnknownWorkflowType(instance);
return;
Expand Down Expand Up @@ -263,7 +263,7 @@ private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, i
}

private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance,
AbstractWorkflowDefinition<?> definition, WorkflowInstanceAction.Builder actionBuilder) {
AbstractWorkflowDefinition definition, WorkflowInstanceAction.Builder actionBuilder) {
if (definition.getMethod(execution.getNextState()) == null && execution.getNextActivation() != null) {
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
Expand All @@ -272,8 +272,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
try {
String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
AbstractWorkflowDefinition<? extends WorkflowState> parentDefinition = workflowDefinitions
.getWorkflowDefinition(parentType);
AbstractWorkflowDefinition parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
String[] waitStates = parentDefinition.getStates().stream() //
.filter(state -> state.getType() == WorkflowStateType.wait) //
.map(WorkflowState::name) //
Expand Down Expand Up @@ -381,8 +380,8 @@ private boolean isNextActivationImmediately(StateExecutionImpl execution) {
&& !execution.getNextActivation().isAfterNow();
}

private NextAction processWithListeners(WorkflowInstance instance,
AbstractWorkflowDefinition<? extends WorkflowState> definition, StateExecutionImpl execution, WorkflowState state) {
private NextAction processWithListeners(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
ProcessingExecutorListener processingListener = new ProcessingExecutorListener(instance, definition, execution, state);
List<WorkflowExecutorListener> chain = new ArrayList<>(executorListeners.size() + 1);
chain.addAll(executorListeners);
Expand Down Expand Up @@ -431,11 +430,11 @@ public NextAction next(ListenerContext context) {

private class ProcessingExecutorListener implements WorkflowExecutorListener {
private final WorkflowInstance instance;
private final AbstractWorkflowDefinition<? extends WorkflowState> definition;
private final AbstractWorkflowDefinition definition;
private final StateExecutionImpl execution;
private final WorkflowState state;

public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition<? extends WorkflowState> definition,
public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
this.instance = instance;
this.definition = definition;
Expand All @@ -451,7 +450,7 @@ public NextAction process(ListenerContext context, ListenerChain chain) {

private class NormalStateHandler extends StateHandler {

public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
super(instance, definition, execution, currentState);
}
Expand All @@ -466,7 +465,7 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {
private class SkippedStateHandler extends StateHandler {
private final NextAction nextAction;

public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition<?> definition,
public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState currentState) {
super(instance, definition, execution, currentState);
this.nextAction = nextAction;
Expand All @@ -480,11 +479,11 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {

private abstract class StateHandler {
protected final WorkflowInstance instance;
protected final AbstractWorkflowDefinition<?> definition;
protected final AbstractWorkflowDefinition definition;
protected final StateExecutionImpl execution;
protected final WorkflowState currentState;

public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
this.instance = instance;
this.definition = definition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public boolean isHistoryCleaningForced() {
return historyCleaningForced;
}

public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?> definition) {
public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition definition) {
if (getRetries() >= definition.getSettings().maxRetries) {
isRetryCountExceeded = true;
handleFailure(definition, "Max retry count exceeded");
Expand All @@ -287,7 +287,7 @@ public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?>
}
}

public void handleFailure(AbstractWorkflowDefinition<?> definition, String failureReason) {
public void handleFailure(AbstractWorkflowDefinition definition, String failureReason) {
setRetry(false);
String currentStateName = getCurrentStateName();
WorkflowState failureState = definition.getFailureTransitions().get(currentStateName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static java.util.stream.Collectors.toCollection;
import static org.apache.commons.lang3.ClassUtils.primitiveToWrapper;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.ReflectionUtils.doWithFields;
import static org.springframework.util.ReflectionUtils.doWithMethods;
import static org.springframework.util.ReflectionUtils.findMethod;
import static org.springframework.util.ReflectionUtils.invokeMethod;
Expand All @@ -20,6 +21,7 @@
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand All @@ -36,20 +38,21 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.definition.StateVar;
import io.nflow.engine.workflow.definition.WorkflowState;

public class WorkflowDefinitionScanner {

private static final Logger logger = getLogger(WorkflowDefinitionScanner.class);

private static final Set<Class<?>> boxedPrimitiveTypes = Stream
.of(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));
.of(Boolean.class, Byte.class, Short.class, Integer.class, Long.class, Float.class, Double.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));

private static final Set<Type> knownImmutableTypes = Stream
.of(Boolean.TYPE, Boolean.class, Byte.TYPE, Byte.class, Character.TYPE, Character.class, Short.TYPE, Short.class,
Integer.TYPE, Integer.class, Long.TYPE, Long.class, Float.TYPE, Float.class, Double.TYPE, Double.class, String.class,
BigDecimal.class, BigInteger.class, Enum.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));
.of(Boolean.TYPE, Boolean.class, Byte.TYPE, Byte.class, Character.TYPE, Character.class, Short.TYPE, Short.class,
Integer.TYPE, Integer.class, Long.TYPE, Long.class, Float.TYPE, Float.class, Double.TYPE, Double.class, String.class,
BigDecimal.class, BigInteger.class, Enum.class)
.collect(collectingAndThen(toCollection(LinkedHashSet::new), Collections::unmodifiableSet));

public Map<String, WorkflowStateMethod> getStateMethods(Class<?> definition) {
final Map<String, WorkflowStateMethod> methods = new LinkedHashMap<>();
Expand Down Expand Up @@ -91,6 +94,19 @@ public Map<String, WorkflowStateMethod> getStateMethods(Class<?> definition) {
return methods;
}

public Set<WorkflowState> getStaticWorkflowStates(Class<?> definition) {
final Set<WorkflowState> states = new HashSet<>();
doWithFields(definition, field -> {
try {
field.setAccessible(true);
states.add((WorkflowState) field.get(null));
} catch (Exception e) {
logger.warn("Failed to access state field {}", field, e);
}
}, field -> isStatic(field.getModifiers()) && WorkflowState.class.isAssignableFrom(field.getType()));
return states;
}

boolean isReadOnly(Type type) {
return knownImmutableTypes.contains(type);
}
Expand All @@ -108,7 +124,8 @@ Object defaultValue(StateVar stateInfo, Class<?> clazz) {
Constructor<?> ctr = clazz.getConstructor();
ctr.newInstance();
return ctr;
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
} catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
logger.warn("Could not instantiate {} using empty constructor", clazz, e);
}
}
Expand All @@ -120,8 +137,8 @@ static final class WorkflowTransitionMethod implements MethodFilter {
public boolean matches(Method method) {
int mod = method.getModifiers();
Class<?>[] parameterTypes = method.getParameterTypes();
return isPublic(mod) && !isStatic(mod) && hasStateExecutionParameter(parameterTypes) &&
hasValidReturnType(method.getReturnType());
return isPublic(mod) && !isStatic(mod) && hasStateExecutionParameter(parameterTypes)
&& hasValidReturnType(method.getReturnType());
}

private boolean hasValidReturnType(Class<?> returnType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinition
}

public WorkflowInstance process(WorkflowInstance instance) {
AbstractWorkflowDefinition<?> def = workflowDefinitionService.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition def = workflowDefinitionService.getWorkflowDefinition(instance.type);
if (def == null) {
throw new IllegalArgumentException("No workflow definition found for type [" + instance.type + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ListenerContext extends ModelObject {
/**
* The definition of the workflow.
*/
public final AbstractWorkflowDefinition<?> definition;
public final AbstractWorkflowDefinition definition;

/**
* The name of the state of the workflow instance before processing.
Expand Down Expand Up @@ -70,7 +70,7 @@ public class ListenerContext extends ModelObject {
*/
public final Map<Object, Object> data = new LinkedHashMap<>();

public ListenerContext(AbstractWorkflowDefinition<?> definition, WorkflowInstance instance, StateExecution stateExecution) {
public ListenerContext(AbstractWorkflowDefinition definition, WorkflowInstance instance, StateExecution stateExecution) {
this.definition = definition;
this.instance = instance;
this.stateExecution = stateExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.nflow.engine.config.NFlow;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined in the class name listing resource.
Expand All @@ -37,12 +36,10 @@ public WorkflowDefinitionClassNameScanner(WorkflowDefinitionService workflowDefi
while ((row = br.readLine()) != null) {
logger.info("Preparing workflow {}", row);
@SuppressWarnings("unchecked")
Class<AbstractWorkflowDefinition<? extends WorkflowState>> clazz = (Class<AbstractWorkflowDefinition<? extends WorkflowState>>) Class
.forName(row);
Class<AbstractWorkflowDefinition> clazz = (Class<AbstractWorkflowDefinition>) Class.forName(row);
workflowDefinitionService.addWorkflowDefinition(clazz.getDeclaredConstructor().newInstance());
}
}
}
}

}
Loading