diff --git a/CHANGELOG.md b/CHANGELOG.md index d308c4a09..1ec7f2f90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,12 @@ ## 6.0.1-SNAPSHOT (future release) **Highlights** +- `nflow-engine` + - Check that state variable value fits into the database column **Details** - `nflow-engine` + - Throw `StateVariableValueTooLongException` if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing and not handled by the state implementation, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60). - Dependency updates: - jetty 9.4.24.v20191120 - junit4 4.13-rc-1 diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 340e2558e..78fb31dfc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -16,6 +16,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.StringUtils.abbreviate; import static org.apache.commons.lang3.StringUtils.join; +import static org.apache.commons.lang3.StringUtils.length; import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.transaction.annotation.Propagation.MANDATORY; @@ -70,6 +71,7 @@ import io.nflow.engine.internal.storage.db.SQLVariants; import io.nflow.engine.model.ModelObject; import io.nflow.engine.service.WorkflowInstanceInclude; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.QueryWorkflowInstances; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; @@ -101,16 +103,13 @@ public class WorkflowInstanceDao { private final boolean disableBatchUpdates; int instanceStateTextLength; int actionStateTextLength; + int stateVariableValueMaxLength; @Inject - public WorkflowInstanceDao(SQLVariants sqlVariants, - @NFlow JdbcTemplate nflowJdbcTemplate, - @NFlow TransactionTemplate transactionTemplate, - @NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate, - ExecutorDao executorDao, - WorkflowInstanceExecutor workflowInstanceExecutor, - WorkflowInstanceFactory workflowInstanceFactory, - Environment env) { + public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdbcTemplate, + @NFlow TransactionTemplate transactionTemplate, @NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate, + ExecutorDao executorDao, WorkflowInstanceExecutor workflowInstanceExecutor, WorkflowInstanceFactory workflowInstanceFactory, + Environment env) { this.sqlVariants = sqlVariants; this.jdbc = nflowJdbcTemplate; @@ -122,10 +121,10 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, workflowInstanceQueryMaxResults = env.getRequiredProperty("nflow.workflow.instance.query.max.results", Long.class); workflowInstanceQueryMaxResultsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.results.default", - Long.class); + Long.class); workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class); workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default", - Long.class); + Long.class); disableBatchUpdates = env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class); if (disableBatchUpdates) { logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)"); @@ -133,6 +132,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, // In one deployment, FirstColumnLengthExtractor returned 0 column length (H2), so allow explicit length setting. instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1); actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1); + stateVariableValueMaxLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); } private int getInstanceStateTextLength() { @@ -149,6 +149,14 @@ int getActionStateTextLength() { return actionStateTextLength; } + int getStateVariableValueMaxLength() { + if (stateVariableValueMaxLength == -1) { + stateVariableValueMaxLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0", + firstColumnLengthExtractor); + } + return stateVariableValueMaxLength; + } + public long insertWorkflowInstance(WorkflowInstance instance) { long id; if (sqlVariants.hasUpdateableCTE()) { @@ -416,6 +424,13 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work jdbc.queryForObject(sqlb.toString(), Long.class, args); } + public void checkStateVariableValueLength(String name, String value) { + if (length(value) > getStateVariableValueMaxLength()) { + throw new StateVariableValueTooLongException("Too long value (length = " + length(value) + ") for state variable " + name + + ": maximum allowed length is " + getStateVariableValueMaxLength()); + } + } + String insertWorkflowActionSql() { return "insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, execution_start, execution_end)"; } @@ -858,9 +873,8 @@ public int deleteWorkflowInstanceHistory(long workflowInstanceId, Integer histor MapSqlParameterSource params = new MapSqlParameterSource(); params.addValue("workflowId", workflowInstanceId); params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); - Long maxActionId = namedJdbc - .queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " - + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class); + Long maxActionId = namedJdbc.queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class); int deletedActions = 0; if (maxActionId != null) { params.addValue("maxActionId", maxActionId); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 2fdd7c27c..07fb63fc5 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -50,6 +50,7 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -74,6 +75,7 @@ class WorkflowStateProcessor implements Runnable { private final int unknownWorkflowStateRetryDelay; private final int stateProcessingRetryDelay; private final int stateSaveRetryDelay; + private final int stateVariableValueTooLongRetryDelay; private boolean internalRetryEnabled = true; private final Map processingInstances; private long startTimeSeconds; @@ -96,6 +98,8 @@ class WorkflowStateProcessor implements Runnable { unknownWorkflowStateRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.state.retry.delay.minutes", Integer.class); stateProcessingRetryDelay = env.getRequiredProperty("nflow.executor.stateProcessingRetryDelay.seconds", Integer.class); stateSaveRetryDelay = env.getRequiredProperty("nflow.executor.stateSaveRetryDelay.seconds", Integer.class); + stateVariableValueTooLongRetryDelay = env.getRequiredProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", + Integer.class); } @Override @@ -142,10 +146,13 @@ private void runImpl() { rescheduleUnknownWorkflowState(instance); return; } - + boolean saveInstanceState = true; try { processBeforeListeners(listenerContext); listenerContext.nextAction = processWithListeners(listenerContext, instance, definition, execution, state); + } catch (StateVariableValueTooLongException e) { + instance = rescheduleStateVariableValueTooLong(e, instance); + saveInstanceState = false; } catch (Throwable t) { execution.setFailed(t); logger.error("Handler threw exception, trying again later.", t); @@ -154,14 +161,16 @@ private void runImpl() { execution.setNextStateReason(getStackTrace(t)); execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition); } finally { - if (execution.isFailed()) { - processAfterFailureListeners(listenerContext, execution.getThrown()); - } else { - processAfterListeners(listenerContext); - optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution); + if (saveInstanceState) { + if (execution.isFailed()) { + processAfterFailureListeners(listenerContext, execution.getThrown()); + } else { + processAfterListeners(listenerContext); + optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution); + } + subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); + instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder); } - subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); - instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder); } } logger.debug("Finished."); @@ -191,6 +200,14 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) { logger.debug("Finished."); } + private WorkflowInstance rescheduleStateVariableValueTooLong(StateVariableValueTooLongException e, WorkflowInstance instance) { + logger.warn("Failed to process workflow instance {}: {} - rescheduling workflow instance", instance.id, e.getMessage()); + instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(stateVariableValueTooLongRetryDelay)) + .setStatus(inProgress).setStateText(e.getMessage()).build(); + workflowInstanceDao.updateWorkflowInstance(instance); + return instance; + } + private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, int subsequentStateExecutions, StateExecutionImpl execution) { DateTime nextActivation = execution.getNextActivation(); @@ -230,8 +247,8 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, try { return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { - logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", - instance.id, stateSaveRetryDelay, ex); + logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay, + ex); sleepIgnoreInterrupted(stateSaveRetryDelay); } } while (internalRetryEnabled); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index d1bb88a58..628769eca 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -118,6 +118,7 @@ public String getVariable(String name, String defaultValue) { @Override public void setVariable(String name, String value) { + workflowDao.checkStateVariableValueLength(name, value); instance.stateVariables.put(name, value); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java index d08473315..c94c4ff01 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java @@ -8,6 +8,7 @@ import org.springframework.stereotype.Component; +import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.service.WorkflowDefinitionService; import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition; import io.nflow.engine.workflow.instance.WorkflowInstance; @@ -17,9 +18,13 @@ public class WorkflowInstancePreProcessor { private final WorkflowDefinitionService workflowDefinitionService; + private final WorkflowInstanceDao workflowInstanceDao; + @Inject - public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService) { + public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService, + WorkflowInstanceDao workflowInstanceDao) { this.workflowDefinitionService = workflowDefinitionService; + this.workflowInstanceDao = workflowInstanceDao; } // TODO should this set next_activation for child workflows? @@ -45,6 +50,7 @@ public WorkflowInstance process(WorkflowInstance instance) { if (instance.priority == null) { builder.setPriority(def.getSettings().getDefaultPriority()); } + instance.getChangedStateVariables().forEach(workflowInstanceDao::checkStateVariableValueLength); return builder.build(); } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java new file mode 100644 index 000000000..a04182c47 --- /dev/null +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java @@ -0,0 +1,10 @@ +package io.nflow.engine.workflow.executor; + +public class StateVariableValueTooLongException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public StateVariableValueTooLongException(String message) { + super(message); + } +} diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 53df407ca..2c988becf 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -6,6 +6,7 @@ nflow.executor.keepalive.seconds=60 nflow.executor.stuckThreadThreshold.seconds=60 nflow.executor.stateProcessingRetryDelay.seconds=60 nflow.executor.stateSaveRetryDelay.seconds=60 +nflow.executor.stateVariableValueTooLongRetryDelay.minutes=60 nflow.dispatcher.sleep.ms=1000 nflow.dispatcher.await.termination.seconds=60 diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 37b7c1286..61a2f976e 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static org.apache.commons.lang3.StringUtils.countMatches; +import static org.apache.commons.lang3.StringUtils.repeat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -65,6 +66,7 @@ import io.nflow.engine.internal.executor.WorkflowInstanceExecutor; import io.nflow.engine.internal.storage.db.SQLVariants; import io.nflow.engine.service.WorkflowInstanceInclude; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.QueryWorkflowInstances; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -316,8 +318,8 @@ public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks( dao.updateWorkflowInstanceAfterExecution(i2, a1, asList(childWorkflow1, childWorkflow2), emptyWorkflows, false); - Map> childWorkflows = dao.getWorkflowInstance(id, - EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows; + Map> childWorkflows = dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), + null).childWorkflows; assertThat(childWorkflows.size(), is(1)); for (List childIds : childWorkflows.values()) { assertThat(childIds.size(), is(1)); @@ -509,11 +511,11 @@ public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedExcept List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) - .setState("updateState").setStateText("update text").build(); + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") + .setStateText("update text").build(); sleep(1); assertThrows(IllegalArgumentException.class, - () -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false)); + () -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false)); } @Test @@ -578,7 +580,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") .setRootWorkflowId(9283L).setParentWorkflowId(110L).setParentActionId(421L).setNextActivation(started.plusSeconds(1)) .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) - .setStartedIfNotSet(started).setPriority((short)10).build(); + .setStartedIfNotSet(started).setPriority((short) 10).build(); d.insertWorkflowInstance(wf); assertEquals( @@ -646,9 +648,9 @@ public void pollNextWorkflowInstances() { @Test public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() { - long olderLowPrio = createInstance(2, (short)1); - long newerLowPrio = createInstance(1, (short)1); - long newerHighPrio = createInstance(1, (short)2); + long olderLowPrio = createInstance(2, (short) 1); + long newerLowPrio = createInstance(1, (short) 1); + long newerHighPrio = createInstance(1, (short) 2); // high priority comes first List ids = dao.pollNextWorkflowInstanceIds(1); @@ -686,18 +688,12 @@ private WorkflowInstanceDao preparePostgreSQLDao(JdbcTemplate jdbcTemplate) { lenient().when(eDao.getExecutorId()).thenReturn(42); NamedParameterJdbcTemplate namedJdbc = mock(NamedParameterJdbcTemplate.class); TransactionTemplate transactionTemplate = mock(TransactionTemplate.class); - WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(), - jdbcTemplate, - transactionTemplate, - namedJdbc, - eDao, - workflowInstanceExecutor, - workflowInstanceFactory, - env - ); + WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(), jdbcTemplate, transactionTemplate, namedJdbc, eDao, + workflowInstanceExecutor, workflowInstanceFactory, env); d.instanceStateTextLength = 128; d.actionStateTextLength = 128; + d.stateVariableValueMaxLength = 128; return d; } @@ -866,9 +862,9 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() { public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusToInProgressAndInsertsAction() { int crashedExecutorId = 999; insertCrashedExecutor(crashedExecutorId, executorDao.getExecutorGroup()); - long id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId") - .setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").setPriority((short) 0) - .build()); + long id = dao.insertWorkflowInstance( + new WorkflowInstance.Builder().setType("test").setExternalId("extId").setExecutorGroup(executorDao.getExecutorGroup()) + .setStatus(executing).setState("processing").setPriority((short) 0).build()); int updated = jdbc.update("update nflow_workflow set executor_id = ? where id = ?", crashedExecutorId, id); assertThat(updated, is(1)); @@ -1026,4 +1022,15 @@ public void run() { } } } + + @Test + public void checkStateVariableValueWorks() { + dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength())); + } + + @Test + public void checkStateVariableValueThrowsExceptionWhenValueIsTooLong() { + assertThrows(StateVariableValueTooLongException.class, + () -> dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1))); + } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index d6befa876..e9a483722 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -80,6 +80,7 @@ public void setup() { env.setProperty("nflow.executor.stuckThreadThreshold.seconds", "60"); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60"); + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); when(executorDao.isTransactionSupportEnabled()).thenReturn(true); executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-")); dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index e48af1268..485df74e2 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -50,6 +50,7 @@ public void setup() { env.setProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.toString(STUCK_THREAD_THRESHOLD)); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60"); + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao, workflowInstancePreProcessor, env); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index d5d6b7234..70dd4893c 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -92,6 +93,7 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -155,6 +157,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { WorkflowDefinition wakeWf = new NotifyTestWorkflow(); + WorkflowDefinition stateVariableWf = new StateVariableWorkflow(); + static WorkflowInstance newChildWorkflow = mock(WorkflowInstance.class); static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); @@ -173,7 +177,7 @@ public void setup() { env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60"); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "1"); - + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); setCurrentMillisFixed(currentTimeMillis()); @@ -183,6 +187,7 @@ public void setup() { lenient().doReturn(simpleWf).when(workflowDefinitions).getWorkflowDefinition("simple-test"); lenient().doReturn(failingWf).when(workflowDefinitions).getWorkflowDefinition("failing-test"); lenient().doReturn(wakeWf).when(workflowDefinitions).getWorkflowDefinition("wake-test"); + lenient().doReturn(stateVariableWf).when(workflowDefinitions).getWorkflowDefinition("state-variable"); filterChain(listener1); filterChain(listener2); lenient().when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); @@ -299,6 +304,22 @@ public void instanceWithUnsupportedStateIsRescheduled() { is("Unsupported workflow state"), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } + @Test + public void stateIsRescheduledWhenStateVariableValueIsTooLong() { + WorkflowInstance instance = executingInstanceBuilder().setType("state-variable").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + String errorMessage = "Too long state variable value"; + doThrow(new StateVariableValueTooLongException(errorMessage)).when(workflowInstanceDao) + .checkStateVariableValueLength(anyString(), anyString()); + DateTime oneHourInFuture = now().plusHours(1); + + executor.run(); + + verify(workflowInstanceDao) + .updateWorkflowInstance(argThat(matchesWorkflowInstance(inProgress, StateVariableWorkflow.State.start, 0, + is(errorMessage), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); + } + @Test public void workflowStatusIsSetToManualForManualStates() { WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("beforeManual") @@ -1146,4 +1167,38 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { } + public static class StateVariableWorkflow extends WorkflowDefinition { + + protected StateVariableWorkflow() { + super("state-variable", State.start, State.end); + permit(State.start, State.end); + } + + public static enum State implements WorkflowState { + start(WorkflowStateType.start), end(WorkflowStateType.end); + + private final WorkflowStateType stateType; + + private State(WorkflowStateType stateType) { + this.stateType = stateType; + } + + @Override + public WorkflowStateType getType() { + return stateType; + } + + @Override + public String getDescription() { + return name(); + } + } + + public NextAction start(StateExecution execution) { + execution.setVariable("foo", "bar"); + return moveToState(State.end, "Done."); + } + + } + } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index dfd1da74d..f67044555 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -10,6 +10,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.joda.time.DateTime.now; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,7 +20,6 @@ import java.util.Optional; import org.joda.time.DateTime; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -59,13 +59,13 @@ public class StateExecutionImplTest { @BeforeEach public void setup() { instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") - .setBusinessKey("business").build(); - execution = createExecution(instance); + .setBusinessKey("business").build(); + createExecution(); executionInterface = execution; } - private StateExecutionImpl createExecution(WorkflowInstance workflowInstance) { - return new StateExecutionImpl(workflowInstance, objectStringMapper, workflowDao, workflowInstancePreProcessor, + private void createExecution() { + execution = new StateExecutionImpl(instance, objectStringMapper, workflowDao, workflowInstancePreProcessor, workflowInstanceService); } @@ -103,7 +103,7 @@ public void addWorkflows() { public void wakeUpParentWorkflowSetsWakeUpStates() { instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") .setBusinessKey("business").setParentWorkflowId(123L).build(); - execution = createExecution(instance); + createExecution(); assertThat(execution.getWakeUpParentWorkflowStates().isPresent(), is(false)); execution.wakeUpParentWorkflow(); assertThat(execution.getWakeUpParentWorkflowStates().get(), is(empty())); @@ -228,6 +228,13 @@ public void getMissingVariableWithDefaultReturnsDefaultValue() { assertThat(execution.getVariable("foo", Data.class, defaultData), is(defaultData)); } + @Test + public void setVariableChecksValueLength() { + execution.setVariable("foo", "bar"); + + verify(workflowDao).checkStateVariableValueLength("foo", "bar"); + } + @Test public void getSignalWorks() { when(workflowDao.getSignal(instance.id)).thenReturn(Optional.of(42)); @@ -244,7 +251,7 @@ public void setSignalWorks() { @Test public void setSignalRejectsNull() { - Assertions.assertThrows(IllegalArgumentException.class, () -> execution.setSignal(null, "testing")); + assertThrows(IllegalArgumentException.class, () -> execution.setSignal(null, "testing")); } @Test @@ -255,7 +262,7 @@ public void getParentIdReturnsEmptyWhenParentWorkflowIdIsNull() { @Test public void getParentIdReturnsParentWorkflowId() { instance = new WorkflowInstance.Builder().setParentWorkflowId(42L).build(); - execution = createExecution(instance); + createExecution(); assertThat(execution.getParentId(), is(Optional.of(42L))); } @@ -293,7 +300,7 @@ public void handleRetryAfterSetsActivationWhenMaxRetriesIsNotExceeded() { TestWorkflow def = new TestWorkflow(); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext") .setState(TestWorkflow.State.startWithoutFailure.name()).setBusinessKey("business").build(); - execution = createExecution(instance); + createExecution(); execution.handleRetryAfter(tomorrow, def); @@ -305,7 +312,7 @@ private void handleRetryMaxRetriesExceeded(TestDefinition.TestState initialState TestDefinition def = new TestDefinition("x", initialState); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState(currentState.name()) .setBusinessKey("business").build(); - execution = createExecution(instance); + createExecution(); execution.handleRetryAfter(tomorrow, def); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java index ab219342c..9cac2e043 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java @@ -6,11 +6,15 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.internal.executor.BaseNflowTest; import io.nflow.engine.internal.workflow.WorkflowInstancePreProcessor; import io.nflow.engine.service.DummyTestWorkflow; @@ -19,11 +23,15 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.instance.WorkflowInstance; +@RunWith(MockitoJUnitRunner.class) public class WorkflowInstancePreProcessorTest extends BaseNflowTest { @Mock private WorkflowDefinitionService workflowDefinitionService; + @Mock + private WorkflowInstanceDao workflowInstanceDao; + private WorkflowInstancePreProcessor preProcessor; private WorkflowDefinition dummyWorkflow; @@ -34,7 +42,7 @@ public class WorkflowInstancePreProcessorTest extends BaseNflowTest { public void setup() { dummyWorkflow = new DummyTestWorkflow(new WorkflowSettings.Builder().setDefaultPriority(DEFAULT_PRIORITY).build()); lenient().doReturn(dummyWorkflow).when(workflowDefinitionService).getWorkflowDefinition("dummy"); - preProcessor = new WorkflowInstancePreProcessor(workflowDefinitionService); + preProcessor = new WorkflowInstancePreProcessor(workflowDefinitionService, workflowInstanceDao); } @Test @@ -58,6 +66,13 @@ public void createsMissingState() { assertThat(processed.state, is("CreateLoan")); } + @Test + public void checksStateVariableValues() { + WorkflowInstance i = constructWorkflowInstanceBuilder().putStateVariable("foo", "bar").build(); + preProcessor.process(i); + verify(workflowInstanceDao).checkStateVariableValueLength("foo", "bar"); + } + @Test public void setsStatusToCreatedWhenStatusIsNotSpecified() { WorkflowInstance i = constructWorkflowInstanceBuilder().setStatus(null).build(); diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index 6e9d02012..3c4202e4c 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -2,6 +2,7 @@ nflow.executor.group=junit nflow.executor.timeout.seconds=900 nflow.executor.keepalive.seconds=60 nflow.executor.stateSaveRetryDelay.seconds=60 +nflow.executor.stateVariableValueTooLongRetryDelay.minutes=60 nflow.workflow.instance.query.max.results=10000 nflow.workflow.instance.query.max.results.default=100