Skip to content

Commit

Permalink
Merge 6d373f3 into 833d0c0
Browse files Browse the repository at this point in the history
  • Loading branch information
efonsell committed Dec 28, 2021
2 parents 833d0c0 + 6d373f3 commit cbfd072
Show file tree
Hide file tree
Showing 87 changed files with 1,163 additions and 1,786 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
## 7.4.1-SNAPSHOT (future release)
## 8.0.0-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- `WorkflowDefinition` is replaced by `AbstractWorkflowDefinition`
- Remove deprecated `WorkflowState.isRetryAllowed`

**Details**
- `nflow-engine`
- Workflow definitions that used to extend `WorkflowDefinition` should now extend `AbstractWorkflowDefinition` instead.
- It is not necessary to define the workflow states as an enum anymore. You can define the states as instances of `SimpleState` or anything else that implements the required `WorkflowState` interface.
- `WorkflowState.isRetryAllowed` was removed, use `new WorkflowSettings.Builder().setExceptionAnalyzer(...)` instead.

## 7.4.0 (2021-12-27)

Expand Down
4 changes: 1 addition & 3 deletions nflow-engine/src/main/java/io/nflow/engine/NflowEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.nflow.engine.service.WorkflowExecutorService;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* NflowEngine starts up nflow-engine with given database and workflow definitions.
Expand Down Expand Up @@ -53,8 +52,7 @@ public class NflowEngine implements AutoCloseable {
* @param workflowDefinitions
* The registered workflow definitions.
*/
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants, Collection<AbstractWorkflowDefinition> workflowDefinitions) {
ctx = new AnnotationConfigApplicationContext();

ctx.registerBean("nflowDatasource", DataSource.class, () -> dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,31 @@

@Singleton
public class NflowController {
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition<?>> workflows;
private final WorkflowLifecycle lifecycle;
private final WorkflowDefinitionService workflowDefinitionService;
private final MaintenanceWorkflowStarter maintenanceWorkflowStarter;
private final Set<AbstractWorkflowDefinition> workflows;

@Inject
public NflowController(WorkflowLifecycle lifecycle,
WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter,
Set<AbstractWorkflowDefinition<?>> workflowDefinitions
) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}
@Inject
public NflowController(WorkflowLifecycle lifecycle, WorkflowDefinitionService workflowDefinitionService,
MaintenanceWorkflowStarter maintenanceWorkflowStarter, Set<AbstractWorkflowDefinition> workflowDefinitions) {
this.lifecycle = lifecycle;
this.workflowDefinitionService = workflowDefinitionService;
this.maintenanceWorkflowStarter = maintenanceWorkflowStarter;
this.workflows = workflowDefinitions;
}

public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
public void start() {
try {
workflows.forEach(workflowDefinitionService::addWorkflowDefinition);
maintenanceWorkflowStarter.start();
} catch (Exception e) {
throw new RuntimeException("Failed to register workflows", e);
}
lifecycle.start();
}

public void stop() {
lifecycle.stop();
}
public void stop() {
lifecycle.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public WorkflowDefinitionDao(SQLVariants sqlVariants,
this.executorInfo = executorDao;
}

public void storeWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
public void storeWorkflowDefinition(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition storedDefinition = convert(definition);
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("type", definition.getType());
Expand Down Expand Up @@ -111,7 +111,7 @@ public StoredWorkflowDefinition mapRow(ResultSet rs, int rowNum) throws SQLExcep
});
}

StoredWorkflowDefinition convert(AbstractWorkflowDefinition<? extends WorkflowState> definition) {
StoredWorkflowDefinition convert(AbstractWorkflowDefinition definition) {
StoredWorkflowDefinition resp = new StoredWorkflowDefinition();
resp.type = definition.getType();
resp.description = definition.getDescription();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void runImpl() {
logger.debug("Starting.");
WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, EnumSet.of(CURRENT_STATE_VARIABLES), null);
logIfLagging(instance);
AbstractWorkflowDefinition<? extends WorkflowState> definition = workflowDefinitions.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type);
if (definition == null) {
rescheduleUnknownWorkflowType(instance);
return;
Expand Down Expand Up @@ -263,7 +263,7 @@ private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, i
}

private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance,
AbstractWorkflowDefinition<?> definition, WorkflowInstanceAction.Builder actionBuilder) {
AbstractWorkflowDefinition definition, WorkflowInstanceAction.Builder actionBuilder) {
if (definition.getMethod(execution.getNextState()) == null && execution.getNextActivation() != null) {
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
Expand All @@ -272,8 +272,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
try {
String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
AbstractWorkflowDefinition<? extends WorkflowState> parentDefinition = workflowDefinitions
.getWorkflowDefinition(parentType);
AbstractWorkflowDefinition parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
String[] waitStates = parentDefinition.getStates().stream() //
.filter(state -> state.getType() == WorkflowStateType.wait) //
.map(WorkflowState::name) //
Expand Down Expand Up @@ -381,8 +380,8 @@ private boolean isNextActivationImmediately(StateExecutionImpl execution) {
&& !execution.getNextActivation().isAfterNow();
}

private NextAction processWithListeners(WorkflowInstance instance,
AbstractWorkflowDefinition<? extends WorkflowState> definition, StateExecutionImpl execution, WorkflowState state) {
private NextAction processWithListeners(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
ProcessingExecutorListener processingListener = new ProcessingExecutorListener(instance, definition, execution, state);
List<WorkflowExecutorListener> chain = new ArrayList<>(executorListeners.size() + 1);
chain.addAll(executorListeners);
Expand Down Expand Up @@ -431,11 +430,11 @@ public NextAction next(ListenerContext context) {

private class ProcessingExecutorListener implements WorkflowExecutorListener {
private final WorkflowInstance instance;
private final AbstractWorkflowDefinition<? extends WorkflowState> definition;
private final AbstractWorkflowDefinition definition;
private final StateExecutionImpl execution;
private final WorkflowState state;

public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition<? extends WorkflowState> definition,
public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState state) {
this.instance = instance;
this.definition = definition;
Expand All @@ -451,7 +450,7 @@ public NextAction process(ListenerContext context, ListenerChain chain) {

private class NormalStateHandler extends StateHandler {

public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
super(instance, definition, execution, currentState);
}
Expand All @@ -466,7 +465,7 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {
private class SkippedStateHandler extends StateHandler {
private final NextAction nextAction;

public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition<?> definition,
public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition definition,
StateExecutionImpl execution, WorkflowState currentState) {
super(instance, definition, execution, currentState);
this.nextAction = nextAction;
Expand All @@ -480,11 +479,11 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object... args) {

private abstract class StateHandler {
protected final WorkflowInstance instance;
protected final AbstractWorkflowDefinition<?> definition;
protected final AbstractWorkflowDefinition definition;
protected final StateExecutionImpl execution;
protected final WorkflowState currentState;

public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution,
WorkflowState currentState) {
this.instance = instance;
this.definition = definition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ public boolean isHistoryCleaningForced() {
return historyCleaningForced;
}

public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?> definition) {
public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition definition) {
if (getRetries() >= definition.getSettings().maxRetries) {
isRetryCountExceeded = true;
handleFailure(definition, "Max retry count exceeded");
Expand All @@ -287,7 +287,7 @@ public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?>
}
}

public void handleFailure(AbstractWorkflowDefinition<?> definition, String failureReason) {
public void handleFailure(AbstractWorkflowDefinition definition, String failureReason) {
setRetry(false);
String currentStateName = getCurrentStateName();
WorkflowState failureState = definition.getFailureTransitions().get(currentStateName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinition
}

public WorkflowInstance process(WorkflowInstance instance) {
AbstractWorkflowDefinition<?> def = workflowDefinitionService.getWorkflowDefinition(instance.type);
AbstractWorkflowDefinition def = workflowDefinitionService.getWorkflowDefinition(instance.type);
if (def == null) {
throw new IllegalArgumentException("No workflow definition found for type [" + instance.type + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class ListenerContext extends ModelObject {
/**
* The definition of the workflow.
*/
public final AbstractWorkflowDefinition<?> definition;
public final AbstractWorkflowDefinition definition;

/**
* The name of the state of the workflow instance before processing.
Expand Down Expand Up @@ -70,7 +70,7 @@ public class ListenerContext extends ModelObject {
*/
public final Map<Object, Object> data = new LinkedHashMap<>();

public ListenerContext(AbstractWorkflowDefinition<?> definition, WorkflowInstance instance, StateExecution stateExecution) {
public ListenerContext(AbstractWorkflowDefinition definition, WorkflowInstance instance, StateExecution stateExecution) {
this.definition = definition;
this.instance = instance;
this.stateExecution = stateExecution;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.nflow.engine.config.NFlow;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined in the class name listing resource.
Expand All @@ -37,12 +36,10 @@ public WorkflowDefinitionClassNameScanner(WorkflowDefinitionService workflowDefi
while ((row = br.readLine()) != null) {
logger.info("Preparing workflow {}", row);
@SuppressWarnings("unchecked")
Class<AbstractWorkflowDefinition<? extends WorkflowState>> clazz = (Class<AbstractWorkflowDefinition<? extends WorkflowState>>) Class
.forName(row);
Class<AbstractWorkflowDefinition> clazz = (Class<AbstractWorkflowDefinition>) Class.forName(row);
workflowDefinitionService.addWorkflowDefinition(clazz.getDeclaredConstructor().newInstance());
}
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.nflow.engine.internal.dao.WorkflowDefinitionDao;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Service for managing workflow definitions.
Expand All @@ -29,9 +28,8 @@ public class WorkflowDefinitionService {

private static final Logger logger = getLogger(WorkflowDefinitionService.class);

private final Map<String, AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions = synchronizedMap(
new LinkedHashMap<>());
private List<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitionValues = emptyList();
private final Map<String, AbstractWorkflowDefinition> workflowDefinitions = synchronizedMap(new LinkedHashMap<>());
private List<AbstractWorkflowDefinition> workflowDefinitionValues = emptyList();
private final WorkflowDefinitionDao workflowDefinitionDao;
private final boolean persistWorkflowDefinitions;
private final boolean autoInit;
Expand All @@ -50,7 +48,7 @@ public WorkflowDefinitionService(WorkflowDefinitionDao workflowDefinitionDao, En
* Workflow definition type.
* @return The workflow definition or null if not found.
*/
public AbstractWorkflowDefinition<?> getWorkflowDefinition(String type) {
public AbstractWorkflowDefinition getWorkflowDefinition(String type) {
return workflowDefinitions.get(type);
}

Expand All @@ -59,7 +57,7 @@ public AbstractWorkflowDefinition<?> getWorkflowDefinition(String type) {
*
* @return List of workflow definitions.
*/
public List<AbstractWorkflowDefinition<? extends WorkflowState>> getWorkflowDefinitions() {
public List<AbstractWorkflowDefinition> getWorkflowDefinitions() {
return workflowDefinitionValues;
}

Expand All @@ -84,8 +82,8 @@ public void postProcessWorkflowDefinitions() {
* @throws IllegalStateException
* When a definition with the same type has already been added.
*/
public void addWorkflowDefinition(AbstractWorkflowDefinition<? extends WorkflowState> wd) {
AbstractWorkflowDefinition<? extends WorkflowState> conflict = workflowDefinitions.put(wd.getType(), wd);
public void addWorkflowDefinition(AbstractWorkflowDefinition wd) {
AbstractWorkflowDefinition conflict = workflowDefinitions.put(wd.getType(), wd);
if (conflict != null) {
throw new IllegalStateException("Both " + wd.getClass().getName() + " and " + conflict.getClass().getName()
+ " define same workflow type: " + wd.getType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.springframework.stereotype.Component;

import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.WorkflowState;

/**
* Register workflow definitions defined as Spring beans.
Expand All @@ -17,8 +16,7 @@ public class WorkflowDefinitionSpringBeanScanner {

@Inject
public WorkflowDefinitionSpringBeanScanner(WorkflowDefinitionService workflowDefinitionService,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
Collection<AbstractWorkflowDefinition> workflowDefinitions) {
workflowDefinitions.forEach(workflowDefinitionService::addWorkflowDefinition);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public boolean updateWorkflowInstance(WorkflowInstance instance, WorkflowInstanc
builder.setStatus(null);
} else {
String type = workflowInstanceDao.getWorkflowInstanceType(instance.id);
AbstractWorkflowDefinition<?> definition = workflowDefinitionService.getWorkflowDefinition(type);
AbstractWorkflowDefinition definition = workflowDefinitionService.getWorkflowDefinition(type);
builder.setStatus(definition.getState(instance.state).getType().getStatus(instance.nextActivation));
}
WorkflowInstance updatedInstance = builder.build();
Expand Down Expand Up @@ -174,15 +174,15 @@ public Optional<Integer> getSignal(long workflowInstanceId) {
public boolean setSignal(long workflowInstanceId, Optional<Integer> signal, String reason, WorkflowActionType actionType) {
Assert.notNull(workflowDefinitionService, "workflowDefinitionService cannot be null");
signal.ifPresent(signalValue -> {
AbstractWorkflowDefinition<?> definition = getDefinition(workflowInstanceId);
AbstractWorkflowDefinition definition = getDefinition(workflowInstanceId);
if (!definition.getSupportedSignals().containsKey(signalValue)) {
logger.warn("Setting unsupported signal value {} to instance {}.", signalValue, workflowInstanceId);
}
});
return workflowInstanceDao.setSignal(workflowInstanceId, signal, reason, actionType);
}

private AbstractWorkflowDefinition<?> getDefinition(Long workflowInstanceId) {
private AbstractWorkflowDefinition getDefinition(Long workflowInstanceId) {
return workflowDefinitionService.getWorkflowDefinition(workflowInstanceDao.getWorkflowInstanceType(workflowInstanceId));
}

Expand Down
Loading

0 comments on commit cbfd072

Please sign in to comment.