Skip to content

Commit

Permalink
Merge pull request #360 from NitorCreations/state-value-len
Browse files Browse the repository at this point in the history
Check state variable value length when setting variable.
  • Loading branch information
efonsell committed Dec 13, 2019
2 parents d40dabc + baf02e3 commit 7d09f6f
Show file tree
Hide file tree
Showing 14 changed files with 196 additions and 57 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
## 6.0.1-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- Check that state variable value fits into the database column

**Details**
- `nflow-engine`
- Throw `StateVariableValueTooLongException` if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing and not handled by the state implementation, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60).
- Dependency updates:
- jetty 9.4.24.v20191120
- junit4 4.13-rc-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.stream.Collectors.toList;
import static org.apache.commons.lang3.StringUtils.abbreviate;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.commons.lang3.StringUtils.length;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.transaction.annotation.Propagation.MANDATORY;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.nflow.engine.internal.storage.db.SQLVariants;
import io.nflow.engine.model.ModelObject;
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.workflow.executor.StateVariableValueTooLongException;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
Expand Down Expand Up @@ -101,16 +103,13 @@ public class WorkflowInstanceDao {
private final boolean disableBatchUpdates;
int instanceStateTextLength;
int actionStateTextLength;
int stateVariableValueMaxLength;

@Inject
public WorkflowInstanceDao(SQLVariants sqlVariants,
@NFlow JdbcTemplate nflowJdbcTemplate,
@NFlow TransactionTemplate transactionTemplate,
@NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate,
ExecutorDao executorDao,
WorkflowInstanceExecutor workflowInstanceExecutor,
WorkflowInstanceFactory workflowInstanceFactory,
Environment env) {
public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdbcTemplate,
@NFlow TransactionTemplate transactionTemplate, @NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate,
ExecutorDao executorDao, WorkflowInstanceExecutor workflowInstanceExecutor, WorkflowInstanceFactory workflowInstanceFactory,
Environment env) {

this.sqlVariants = sqlVariants;
this.jdbc = nflowJdbcTemplate;
Expand All @@ -122,17 +121,18 @@ public WorkflowInstanceDao(SQLVariants sqlVariants,

workflowInstanceQueryMaxResults = env.getRequiredProperty("nflow.workflow.instance.query.max.results", Long.class);
workflowInstanceQueryMaxResultsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.results.default",
Long.class);
Long.class);
workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class);
workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default",
Long.class);
Long.class);
disableBatchUpdates = env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class);
if (disableBatchUpdates) {
logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)");
}
// In one deployment, FirstColumnLengthExtractor returned 0 column length (H2), so allow explicit length setting.
instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1);
actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1);
stateVariableValueMaxLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1);
}

private int getInstanceStateTextLength() {
Expand All @@ -149,6 +149,14 @@ int getActionStateTextLength() {
return actionStateTextLength;
}

int getStateVariableValueMaxLength() {
if (stateVariableValueMaxLength == -1) {
stateVariableValueMaxLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0",
firstColumnLengthExtractor);
}
return stateVariableValueMaxLength;
}

public long insertWorkflowInstance(WorkflowInstance instance) {
long id;
if (sqlVariants.hasUpdateableCTE()) {
Expand Down Expand Up @@ -416,6 +424,13 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work
jdbc.queryForObject(sqlb.toString(), Long.class, args);
}

public void checkStateVariableValueLength(String name, String value) {
if (length(value) > getStateVariableValueMaxLength()) {
throw new StateVariableValueTooLongException("Too long value (length = " + length(value) + ") for state variable " + name
+ ": maximum allowed length is " + getStateVariableValueMaxLength());
}
}

String insertWorkflowActionSql() {
return "insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, execution_start, execution_end)";
}
Expand Down Expand Up @@ -858,9 +873,8 @@ public int deleteWorkflowInstanceHistory(long workflowInstanceId, Integer histor
MapSqlParameterSource params = new MapSqlParameterSource();
params.addValue("workflowId", workflowInstanceId);
params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours)));
Long maxActionId = namedJdbc
.queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and "
+ sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class);
Long maxActionId = namedJdbc.queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and "
+ sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class);
int deletedActions = 0;
if (maxActionId != null) {
params.addValue("maxActionId", maxActionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.executor.StateVariableValueTooLongException;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
Expand All @@ -74,6 +75,7 @@ class WorkflowStateProcessor implements Runnable {
private final int unknownWorkflowStateRetryDelay;
private final int stateProcessingRetryDelay;
private final int stateSaveRetryDelay;
private final int stateVariableValueTooLongRetryDelay;
private boolean internalRetryEnabled = true;
private final Map<Long, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
Expand All @@ -96,6 +98,8 @@ class WorkflowStateProcessor implements Runnable {
unknownWorkflowStateRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.state.retry.delay.minutes", Integer.class);
stateProcessingRetryDelay = env.getRequiredProperty("nflow.executor.stateProcessingRetryDelay.seconds", Integer.class);
stateSaveRetryDelay = env.getRequiredProperty("nflow.executor.stateSaveRetryDelay.seconds", Integer.class);
stateVariableValueTooLongRetryDelay = env.getRequiredProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes",
Integer.class);
}

@Override
Expand Down Expand Up @@ -142,10 +146,13 @@ private void runImpl() {
rescheduleUnknownWorkflowState(instance);
return;
}

boolean saveInstanceState = true;
try {
processBeforeListeners(listenerContext);
listenerContext.nextAction = processWithListeners(listenerContext, instance, definition, execution, state);
} catch (StateVariableValueTooLongException e) {
instance = rescheduleStateVariableValueTooLong(e, instance);
saveInstanceState = false;
} catch (Throwable t) {
execution.setFailed(t);
logger.error("Handler threw exception, trying again later.", t);
Expand All @@ -154,14 +161,16 @@ private void runImpl() {
execution.setNextStateReason(getStackTrace(t));
execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition);
} finally {
if (execution.isFailed()) {
processAfterFailureListeners(listenerContext, execution.getThrown());
} else {
processAfterListeners(listenerContext);
optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution);
if (saveInstanceState) {
if (execution.isFailed()) {
processAfterFailureListeners(listenerContext, execution.getThrown());
} else {
processAfterListeners(listenerContext);
optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution);
}
subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution);
instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder);
}
subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution);
instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder);
}
}
logger.debug("Finished.");
Expand Down Expand Up @@ -191,6 +200,14 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) {
logger.debug("Finished.");
}

private WorkflowInstance rescheduleStateVariableValueTooLong(StateVariableValueTooLongException e, WorkflowInstance instance) {
logger.warn("Failed to process workflow instance {}: {} - rescheduling workflow instance", instance.id, e.getMessage());
instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(stateVariableValueTooLongRetryDelay))
.setStatus(inProgress).setStateText(e.getMessage()).build();
workflowInstanceDao.updateWorkflowInstance(instance);
return instance;
}

private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, int subsequentStateExecutions,
StateExecutionImpl execution) {
DateTime nextActivation = execution.getNextActivation();
Expand Down Expand Up @@ -230,8 +247,8 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
try {
return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder);
} catch (Exception ex) {
logger.error("Failed to save workflow instance {} new state, retrying after {} seconds",
instance.id, stateSaveRetryDelay, ex);
logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay,
ex);
sleepIgnoreInterrupted(stateSaveRetryDelay);
}
} while (internalRetryEnabled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public String getVariable(String name, String defaultValue) {

@Override
public void setVariable(String name, String value) {
workflowDao.checkStateVariableValueLength(name, value);
instance.stateVariables.put(name, value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.springframework.stereotype.Component;

import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.instance.WorkflowInstance;
Expand All @@ -17,9 +18,13 @@ public class WorkflowInstancePreProcessor {

private final WorkflowDefinitionService workflowDefinitionService;

private final WorkflowInstanceDao workflowInstanceDao;

@Inject
public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService) {
public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService,
WorkflowInstanceDao workflowInstanceDao) {
this.workflowDefinitionService = workflowDefinitionService;
this.workflowInstanceDao = workflowInstanceDao;
}

// TODO should this set next_activation for child workflows?
Expand All @@ -45,6 +50,7 @@ public WorkflowInstance process(WorkflowInstance instance) {
if (instance.priority == null) {
builder.setPriority(def.getSettings().getDefaultPriority());
}
instance.getChangedStateVariables().forEach(workflowInstanceDao::checkStateVariableValueLength);
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.nflow.engine.workflow.executor;

public class StateVariableValueTooLongException extends RuntimeException {

private static final long serialVersionUID = 1L;

public StateVariableValueTooLongException(String message) {
super(message);
}
}
1 change: 1 addition & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ nflow.executor.keepalive.seconds=60
nflow.executor.stuckThreadThreshold.seconds=60
nflow.executor.stateProcessingRetryDelay.seconds=60
nflow.executor.stateSaveRetryDelay.seconds=60
nflow.executor.stateVariableValueTooLongRetryDelay.minutes=60

nflow.dispatcher.sleep.ms=1000
nflow.dispatcher.await.termination.seconds=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.emptySet;
import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.apache.commons.lang3.StringUtils.repeat;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
Expand Down Expand Up @@ -65,6 +66,7 @@
import io.nflow.engine.internal.executor.WorkflowInstanceExecutor;
import io.nflow.engine.internal.storage.db.SQLVariants;
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.workflow.executor.StateVariableValueTooLongException;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
Expand Down Expand Up @@ -316,8 +318,8 @@ public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks(

dao.updateWorkflowInstanceAfterExecution(i2, a1, asList(childWorkflow1, childWorkflow2), emptyWorkflows, false);

Map<Long, List<Long>> childWorkflows = dao.getWorkflowInstance(id,
EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows;
Map<Long, List<Long>> childWorkflows = dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS),
null).childWorkflows;
assertThat(childWorkflows.size(), is(1));
for (List<Long> childIds : childWorkflows.values()) {
assertThat(childIds.size(), is(1));
Expand Down Expand Up @@ -509,11 +511,11 @@ public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedExcept
List<Long> ids = dao.pollNextWorkflowInstanceIds(1);
assertThat(ids, contains(id));
final WorkflowInstance i2 = new WorkflowInstance.Builder(
dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress)
.setState("updateState").setStateText("update text").build();
dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState")
.setStateText("update text").build();
sleep(1);
assertThrows(IllegalArgumentException.class,
() -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false));
() -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false));
}

@Test
Expand Down Expand Up @@ -578,7 +580,7 @@ public void fakePostgreSQLinsertWorkflowInstance() {
WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text")
.setRootWorkflowId(9283L).setParentWorkflowId(110L).setParentActionId(421L).setNextActivation(started.plusSeconds(1))
.setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1))
.setStartedIfNotSet(started).setPriority((short)10).build();
.setStartedIfNotSet(started).setPriority((short) 10).build();

d.insertWorkflowInstance(wf);
assertEquals(
Expand Down Expand Up @@ -646,9 +648,9 @@ public void pollNextWorkflowInstances() {

@Test
public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() {
long olderLowPrio = createInstance(2, (short)1);
long newerLowPrio = createInstance(1, (short)1);
long newerHighPrio = createInstance(1, (short)2);
long olderLowPrio = createInstance(2, (short) 1);
long newerLowPrio = createInstance(1, (short) 1);
long newerHighPrio = createInstance(1, (short) 2);

// high priority comes first
List<Long> ids = dao.pollNextWorkflowInstanceIds(1);
Expand Down Expand Up @@ -686,18 +688,12 @@ private WorkflowInstanceDao preparePostgreSQLDao(JdbcTemplate jdbcTemplate) {
lenient().when(eDao.getExecutorId()).thenReturn(42);
NamedParameterJdbcTemplate namedJdbc = mock(NamedParameterJdbcTemplate.class);
TransactionTemplate transactionTemplate = mock(TransactionTemplate.class);
WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(),
jdbcTemplate,
transactionTemplate,
namedJdbc,
eDao,
workflowInstanceExecutor,
workflowInstanceFactory,
env
);
WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(), jdbcTemplate, transactionTemplate, namedJdbc, eDao,
workflowInstanceExecutor, workflowInstanceFactory, env);

d.instanceStateTextLength = 128;
d.actionStateTextLength = 128;
d.stateVariableValueMaxLength = 128;
return d;
}

Expand Down Expand Up @@ -866,9 +862,9 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() {
public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusToInProgressAndInsertsAction() {
int crashedExecutorId = 999;
insertCrashedExecutor(crashedExecutorId, executorDao.getExecutorGroup());
long id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId")
.setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").setPriority((short) 0)
.build());
long id = dao.insertWorkflowInstance(
new WorkflowInstance.Builder().setType("test").setExternalId("extId").setExecutorGroup(executorDao.getExecutorGroup())
.setStatus(executing).setState("processing").setPriority((short) 0).build());
int updated = jdbc.update("update nflow_workflow set executor_id = ? where id = ?", crashedExecutorId, id);
assertThat(updated, is(1));

Expand Down Expand Up @@ -1026,4 +1022,15 @@ public void run() {
}
}
}

@Test
public void checkStateVariableValueWorks() {
dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength()));
}

@Test
public void checkStateVariableValueThrowsExceptionWhenValueIsTooLong() {
assertThrows(StateVariableValueTooLongException.class,
() -> dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1)));
}
}
Loading

0 comments on commit 7d09f6f

Please sign in to comment.