From 2bbd23d453d87ea73d9985f2a423c82a00b8845a Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 29 Nov 2019 13:59:08 +0200 Subject: [PATCH 1/7] Check state variable value length when setting variable. Abbreviate or throw exception based on config. --- .../internal/dao/WorkflowInstanceDao.java | 10 ++++ .../internal/workflow/StateExecutionImpl.java | 26 +++++++++- .../main/resources/nflow-engine.properties | 2 + .../internal/dao/WorkflowInstanceDaoTest.java | 7 +++ .../workflow/StateExecutionImplTest.java | 52 +++++++++++++++---- 5 files changed, 85 insertions(+), 12 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 340e2558e..44e6bb6a6 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -101,6 +101,7 @@ public class WorkflowInstanceDao { private final boolean disableBatchUpdates; int instanceStateTextLength; int actionStateTextLength; + int stateVariableValueLength; @Inject public WorkflowInstanceDao(SQLVariants sqlVariants, @@ -133,6 +134,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, // In one deployment, FirstColumnLengthExtractor returned 0 column length (H2), so allow explicit length setting. instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1); actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1); + stateVariableValueLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); } private int getInstanceStateTextLength() { @@ -149,6 +151,14 @@ int getActionStateTextLength() { return actionStateTextLength; } + public int getStateVariableValueLength() { + if (stateVariableValueLength == -1) { + stateVariableValueLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0", + firstColumnLengthExtractor); + } + return stateVariableValueLength; + } + public long insertWorkflowInstance(WorkflowInstance instance) { long id; if (sqlVariants.hasUpdateableCTE()) { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index d1bb88a58..d47398fbc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -1,6 +1,8 @@ package io.nflow.engine.internal.workflow; import static java.util.Collections.unmodifiableList; +import static org.apache.commons.lang3.StringUtils.abbreviate; +import static org.apache.commons.lang3.StringUtils.length; import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.util.Assert.notNull; @@ -12,6 +14,7 @@ import org.joda.time.DateTime; import org.slf4j.Logger; +import org.springframework.core.env.Environment; import org.springframework.util.Assert; import io.nflow.engine.internal.dao.WorkflowInstanceDao; @@ -45,14 +48,20 @@ public class StateExecutionImpl extends ModelObject implements StateExecution { private boolean createAction = true; private String[] wakeUpParentStates; private boolean historyCleaningForced = false; + private final boolean abbreviateTooLongStateVariableValues; + private final int stateVariableValueLength; public StateExecutionImpl(WorkflowInstance instance, ObjectStringMapper objectMapper, WorkflowInstanceDao workflowDao, - WorkflowInstancePreProcessor workflowInstancePreProcessor, WorkflowInstanceService workflowInstanceService) { + WorkflowInstancePreProcessor workflowInstancePreProcessor, WorkflowInstanceService workflowInstanceService, + Environment env) { this.instance = instance; this.objectMapper = objectMapper; this.workflowDao = workflowDao; this.workflowInstancePreProcessor = workflowInstancePreProcessor; this.workflowInstanceService = workflowInstanceService; + stateVariableValueLength = workflowDao.getStateVariableValueLength(); + abbreviateTooLongStateVariableValues = env.getRequiredProperty("nflow.workflow.state.variable.value.abbreviated", + Boolean.class); } public DateTime getNextActivation() { @@ -118,7 +127,20 @@ public String getVariable(String name, String defaultValue) { @Override public void setVariable(String name, String value) { - instance.stateVariables.put(name, value); + instance.stateVariables.put(name, abbreviateTooLongValueIfNeeded(name, value)); + } + + private String abbreviateTooLongValueIfNeeded(String name, String value) { + if (length(value) > stateVariableValueLength) { + if (abbreviateTooLongStateVariableValues) { + LOG.warn("Too long value (length = {}) for state variable {}: abbreviated to {} characters.", length(value), name, + stateVariableValueLength); + return abbreviate(value, stateVariableValueLength); + } + throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name + + ", maximum allowed length is " + stateVariableValueLength); + } + return value; } @Override diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 53df407ca..0b39ba426 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -66,3 +66,5 @@ nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false nflow.definition.persist=true + +nflow.workflow.state.variable.value.abbreviated=false diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 37b7c1286..e29efc84d 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1026,4 +1026,11 @@ public void run() { } } } + + @Test + public void getStateVariableValueMaxLengthWorks() { + int length = dao.getStateVariableValueLength(); + + assertThat(length, is(10240)); + } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index dfd1da74d..b85025e87 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -2,6 +2,8 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.StringUtils.repeat; +import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -10,6 +12,7 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.joda.time.DateTime.now; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -19,7 +22,6 @@ import java.util.Optional; import org.joda.time.DateTime; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -27,6 +29,8 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.core.env.Environment; +import org.springframework.mock.env.MockEnvironment; import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.service.WorkflowInstanceService; @@ -54,19 +58,22 @@ public class StateExecutionImplTest { StateExecutionImpl execution; StateExecution executionInterface; WorkflowInstance instance; + Environment env; private final DateTime tomorrow = now().plusDays(1); @BeforeEach public void setup() { + when(workflowDao.getStateVariableValueLength()).thenReturn(100); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") .setBusinessKey("business").build(); - execution = createExecution(instance); + env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "false"); + createExecution(); executionInterface = execution; } - private StateExecutionImpl createExecution(WorkflowInstance workflowInstance) { - return new StateExecutionImpl(workflowInstance, objectStringMapper, workflowDao, workflowInstancePreProcessor, - workflowInstanceService); + private void createExecution() { + execution = new StateExecutionImpl(instance, objectStringMapper, workflowDao, workflowInstancePreProcessor, + workflowInstanceService, env); } @Test @@ -103,7 +110,7 @@ public void addWorkflows() { public void wakeUpParentWorkflowSetsWakeUpStates() { instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") .setBusinessKey("business").setParentWorkflowId(123L).build(); - execution = createExecution(instance); + createExecution(); assertThat(execution.getWakeUpParentWorkflowStates().isPresent(), is(false)); execution.wakeUpParentWorkflow(); assertThat(execution.getWakeUpParentWorkflowStates().get(), is(empty())); @@ -228,6 +235,31 @@ public void getMissingVariableWithDefaultReturnsDefaultValue() { assertThat(execution.getVariable("foo", Data.class, defaultData), is(defaultData)); } + @Test + public void setVariableWithTooLongValueIsAbbreviated() { + env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "true"); + createExecution(); + + execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueLength() + 1)); + + String value = execution.getVariable("foo"); + assertThat(value.length(), is(workflowDao.getStateVariableValueLength())); + assertThat(value, endsWith("...")); + } + + @Test + public void setVariableWithTooLongValueThrowsException() { + assertThrows(IllegalArgumentException.class, + () -> execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueLength() + 1))); + } + + @Test + public void setVariableWorks() { + execution.setVariable("foo", "bar"); + + assertThat(execution.getVariable("foo"), is("bar")); + } + @Test public void getSignalWorks() { when(workflowDao.getSignal(instance.id)).thenReturn(Optional.of(42)); @@ -244,7 +276,7 @@ public void setSignalWorks() { @Test public void setSignalRejectsNull() { - Assertions.assertThrows(IllegalArgumentException.class, () -> execution.setSignal(null, "testing")); + assertThrows(IllegalArgumentException.class, () -> execution.setSignal(null, "testing")); } @Test @@ -255,7 +287,7 @@ public void getParentIdReturnsEmptyWhenParentWorkflowIdIsNull() { @Test public void getParentIdReturnsParentWorkflowId() { instance = new WorkflowInstance.Builder().setParentWorkflowId(42L).build(); - execution = createExecution(instance); + createExecution(); assertThat(execution.getParentId(), is(Optional.of(42L))); } @@ -293,7 +325,7 @@ public void handleRetryAfterSetsActivationWhenMaxRetriesIsNotExceeded() { TestWorkflow def = new TestWorkflow(); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext") .setState(TestWorkflow.State.startWithoutFailure.name()).setBusinessKey("business").build(); - execution = createExecution(instance); + createExecution(); execution.handleRetryAfter(tomorrow, def); @@ -305,7 +337,7 @@ private void handleRetryMaxRetriesExceeded(TestDefinition.TestState initialState TestDefinition def = new TestDefinition("x", initialState); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState(currentState.name()) .setBusinessKey("business").build(); - execution = createExecution(instance); + createExecution(); execution.handleRetryAfter(tomorrow, def); } From 23713783d71efc24d6dc00db0c9aa9a06365be28 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 29 Nov 2019 14:09:50 +0200 Subject: [PATCH 2/7] refactoring --- .../internal/dao/WorkflowInstanceDao.java | 33 ++++++++----------- .../internal/workflow/StateExecutionImpl.java | 12 +++---- .../internal/dao/WorkflowInstanceDaoTest.java | 2 +- .../workflow/StateExecutionImplTest.java | 8 ++--- 4 files changed, 25 insertions(+), 30 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 44e6bb6a6..b05f2c132 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -101,17 +101,13 @@ public class WorkflowInstanceDao { private final boolean disableBatchUpdates; int instanceStateTextLength; int actionStateTextLength; - int stateVariableValueLength; + int stateVariableValueMaxLength; @Inject - public WorkflowInstanceDao(SQLVariants sqlVariants, - @NFlow JdbcTemplate nflowJdbcTemplate, - @NFlow TransactionTemplate transactionTemplate, - @NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate, - ExecutorDao executorDao, - WorkflowInstanceExecutor workflowInstanceExecutor, - WorkflowInstanceFactory workflowInstanceFactory, - Environment env) { + public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdbcTemplate, + @NFlow TransactionTemplate transactionTemplate, @NFlow NamedParameterJdbcTemplate nflowNamedParameterJdbcTemplate, + ExecutorDao executorDao, WorkflowInstanceExecutor workflowInstanceExecutor, WorkflowInstanceFactory workflowInstanceFactory, + Environment env) { this.sqlVariants = sqlVariants; this.jdbc = nflowJdbcTemplate; @@ -123,10 +119,10 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, workflowInstanceQueryMaxResults = env.getRequiredProperty("nflow.workflow.instance.query.max.results", Long.class); workflowInstanceQueryMaxResultsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.results.default", - Long.class); + Long.class); workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class); workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default", - Long.class); + Long.class); disableBatchUpdates = env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class); if (disableBatchUpdates) { logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)"); @@ -134,7 +130,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, // In one deployment, FirstColumnLengthExtractor returned 0 column length (H2), so allow explicit length setting. instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1); actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1); - stateVariableValueLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); + stateVariableValueMaxLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); } private int getInstanceStateTextLength() { @@ -151,12 +147,12 @@ int getActionStateTextLength() { return actionStateTextLength; } - public int getStateVariableValueLength() { - if (stateVariableValueLength == -1) { - stateVariableValueLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0", + public int getStateVariableValueMaxLength() { + if (stateVariableValueMaxLength == -1) { + stateVariableValueMaxLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0", firstColumnLengthExtractor); } - return stateVariableValueLength; + return stateVariableValueMaxLength; } public long insertWorkflowInstance(WorkflowInstance instance) { @@ -868,9 +864,8 @@ public int deleteWorkflowInstanceHistory(long workflowInstanceId, Integer histor MapSqlParameterSource params = new MapSqlParameterSource(); params.addValue("workflowId", workflowInstanceId); params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); - Long maxActionId = namedJdbc - .queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " - + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class); + Long maxActionId = namedJdbc.queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class); int deletedActions = 0; if (maxActionId != null) { params.addValue("maxActionId", maxActionId); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index d47398fbc..38e62798c 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -49,7 +49,7 @@ public class StateExecutionImpl extends ModelObject implements StateExecution { private String[] wakeUpParentStates; private boolean historyCleaningForced = false; private final boolean abbreviateTooLongStateVariableValues; - private final int stateVariableValueLength; + private final int stateVariableValueMaxLength; public StateExecutionImpl(WorkflowInstance instance, ObjectStringMapper objectMapper, WorkflowInstanceDao workflowDao, WorkflowInstancePreProcessor workflowInstancePreProcessor, WorkflowInstanceService workflowInstanceService, @@ -59,7 +59,7 @@ public StateExecutionImpl(WorkflowInstance instance, ObjectStringMapper objectMa this.workflowDao = workflowDao; this.workflowInstancePreProcessor = workflowInstancePreProcessor; this.workflowInstanceService = workflowInstanceService; - stateVariableValueLength = workflowDao.getStateVariableValueLength(); + stateVariableValueMaxLength = workflowDao.getStateVariableValueMaxLength(); abbreviateTooLongStateVariableValues = env.getRequiredProperty("nflow.workflow.state.variable.value.abbreviated", Boolean.class); } @@ -131,14 +131,14 @@ public void setVariable(String name, String value) { } private String abbreviateTooLongValueIfNeeded(String name, String value) { - if (length(value) > stateVariableValueLength) { + if (length(value) > stateVariableValueMaxLength) { if (abbreviateTooLongStateVariableValues) { LOG.warn("Too long value (length = {}) for state variable {}: abbreviated to {} characters.", length(value), name, - stateVariableValueLength); - return abbreviate(value, stateVariableValueLength); + stateVariableValueMaxLength); + return abbreviate(value, stateVariableValueMaxLength); } throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name - + ", maximum allowed length is " + stateVariableValueLength); + + ", maximum allowed length is " + stateVariableValueMaxLength); } return value; } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index e29efc84d..2d8ca83e7 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1029,7 +1029,7 @@ public void run() { @Test public void getStateVariableValueMaxLengthWorks() { - int length = dao.getStateVariableValueLength(); + int length = dao.getStateVariableValueMaxLength(); assertThat(length, is(10240)); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index b85025e87..1cdfa9177 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -63,7 +63,7 @@ public class StateExecutionImplTest { @BeforeEach public void setup() { - when(workflowDao.getStateVariableValueLength()).thenReturn(100); + when(workflowDao.getStateVariableValueMaxLength()).thenReturn(100); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") .setBusinessKey("business").build(); env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "false"); @@ -240,17 +240,17 @@ public void setVariableWithTooLongValueIsAbbreviated() { env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "true"); createExecution(); - execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueLength() + 1)); + execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueMaxLength() + 1)); String value = execution.getVariable("foo"); - assertThat(value.length(), is(workflowDao.getStateVariableValueLength())); + assertThat(value.length(), is(workflowDao.getStateVariableValueMaxLength())); assertThat(value, endsWith("...")); } @Test public void setVariableWithTooLongValueThrowsException() { assertThrows(IllegalArgumentException.class, - () -> execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueLength() + 1))); + () -> execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueMaxLength() + 1))); } @Test From d7c87c909e441e6f033385e378c45b0f711c966e Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 29 Nov 2019 17:22:58 +0200 Subject: [PATCH 3/7] move checking to dao --- .../internal/dao/WorkflowInstanceDao.java | 31 +++++++-- .../internal/workflow/StateExecutionImpl.java | 26 +------- .../internal/dao/WorkflowInstanceDaoTest.java | 66 ++++++++++++------- .../workflow/StateExecutionImplTest.java | 33 ++-------- .../src/test/resources/junit.properties | 2 + 5 files changed, 77 insertions(+), 81 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index b05f2c132..eec1372c5 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -16,6 +16,7 @@ import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.StringUtils.abbreviate; import static org.apache.commons.lang3.StringUtils.join; +import static org.apache.commons.lang3.StringUtils.length; import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.transaction.annotation.Propagation.MANDATORY; @@ -102,6 +103,7 @@ public class WorkflowInstanceDao { int instanceStateTextLength; int actionStateTextLength; int stateVariableValueMaxLength; + boolean abbreviateTooLongStateVariableValues; @Inject public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdbcTemplate, @@ -131,6 +133,8 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1); actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1); stateVariableValueMaxLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); + abbreviateTooLongStateVariableValues = env.getRequiredProperty("nflow.workflow.state.variable.value.abbreviated", + Boolean.class); } private int getInstanceStateTextLength() { @@ -147,7 +151,7 @@ int getActionStateTextLength() { return actionStateTextLength; } - public int getStateVariableValueMaxLength() { + int getStateVariableValueMaxLength() { if (stateVariableValueMaxLength == -1) { stateVariableValueMaxLength = jdbc.query("select state_value from nflow_workflow_state where 1 = 0", firstColumnLengthExtractor); @@ -183,7 +187,7 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { sqlb.append(", ins").append(pos).append(" as (").append(insertWorkflowInstanceStateSql()) .append(" select wf.id,0,?,? from wf)"); args[pos++] = variable.getKey(); - args[pos++] = variable.getValue(); + args[pos++] = getStateVariableValue(variable); } sqlb.append(" select wf.id from wf"); return jdbc.queryForObject(sqlb.toString(), Long.class, args); @@ -193,6 +197,10 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { } } + String getStateVariableValue(Entry variable) { + return abbreviateTooLongStateVariableValueIfNeeded(variable.getKey(), variable.getValue()); + } + boolean useBatchUpdate() { return !disableBatchUpdates && sqlVariants.useBatchUpdate(); } @@ -259,7 +267,7 @@ void insertVariables(final long id, final long actionId, Map cha private void insertVariablesWithMultipleUpdates(final long id, final long actionId, Map changedStateVariables) { for (Entry entry : changedStateVariables.entrySet()) { int updated = jdbc.update(insertWorkflowInstanceStateSql() + " values (?,?,?,?)", id, actionId, entry.getKey(), - entry.getValue()); + getStateVariableValue(entry)); if (updated != 1) { throw new IllegalStateException("Failed to insert state variable " + entry.getKey()); } @@ -279,7 +287,7 @@ protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLEx ps.setLong(1, id); ps.setLong(2, actionId); ps.setString(3, variable.getKey()); - ps.setString(4, variable.getValue()); + ps.setString(4, getStateVariableValue(variable)); return true; } }); @@ -416,12 +424,25 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work sqlb.append(", ins").append(pos).append(" as (").append(insertWorkflowInstanceStateSql()) .append(" select wf.id,act.id,?,? from wf,act)"); args[pos++] = variable.getKey(); - args[pos++] = variable.getValue(); + args[pos++] = getStateVariableValue(variable); } sqlb.append(" select act.id from act"); jdbc.queryForObject(sqlb.toString(), Long.class, args); } + public String abbreviateTooLongStateVariableValueIfNeeded(String name, String value) { + if (length(value) > getStateVariableValueMaxLength()) { + if (abbreviateTooLongStateVariableValues) { + logger.warn("Too long value (length = {}) for state variable {}: abbreviated to {} characters.", length(value), name, + getStateVariableValueMaxLength()); + return abbreviate(value, getStateVariableValueMaxLength()); + } + throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name + + ": maximum allowed length is " + getStateVariableValueMaxLength()); + } + return value; + } + String insertWorkflowActionSql() { return "insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, execution_start, execution_end)"; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index 38e62798c..ac3f94995 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -1,8 +1,6 @@ package io.nflow.engine.internal.workflow; import static java.util.Collections.unmodifiableList; -import static org.apache.commons.lang3.StringUtils.abbreviate; -import static org.apache.commons.lang3.StringUtils.length; import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.util.Assert.notNull; @@ -14,7 +12,6 @@ import org.joda.time.DateTime; import org.slf4j.Logger; -import org.springframework.core.env.Environment; import org.springframework.util.Assert; import io.nflow.engine.internal.dao.WorkflowInstanceDao; @@ -48,20 +45,14 @@ public class StateExecutionImpl extends ModelObject implements StateExecution { private boolean createAction = true; private String[] wakeUpParentStates; private boolean historyCleaningForced = false; - private final boolean abbreviateTooLongStateVariableValues; - private final int stateVariableValueMaxLength; public StateExecutionImpl(WorkflowInstance instance, ObjectStringMapper objectMapper, WorkflowInstanceDao workflowDao, - WorkflowInstancePreProcessor workflowInstancePreProcessor, WorkflowInstanceService workflowInstanceService, - Environment env) { + WorkflowInstancePreProcessor workflowInstancePreProcessor, WorkflowInstanceService workflowInstanceService) { this.instance = instance; this.objectMapper = objectMapper; this.workflowDao = workflowDao; this.workflowInstancePreProcessor = workflowInstancePreProcessor; this.workflowInstanceService = workflowInstanceService; - stateVariableValueMaxLength = workflowDao.getStateVariableValueMaxLength(); - abbreviateTooLongStateVariableValues = env.getRequiredProperty("nflow.workflow.state.variable.value.abbreviated", - Boolean.class); } public DateTime getNextActivation() { @@ -127,20 +118,7 @@ public String getVariable(String name, String defaultValue) { @Override public void setVariable(String name, String value) { - instance.stateVariables.put(name, abbreviateTooLongValueIfNeeded(name, value)); - } - - private String abbreviateTooLongValueIfNeeded(String name, String value) { - if (length(value) > stateVariableValueMaxLength) { - if (abbreviateTooLongStateVariableValues) { - LOG.warn("Too long value (length = {}) for state variable {}: abbreviated to {} characters.", length(value), name, - stateVariableValueMaxLength); - return abbreviate(value, stateVariableValueMaxLength); - } - throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name - + ", maximum allowed length is " + stateVariableValueMaxLength); - } - return value; + instance.stateVariables.put(name, workflowDao.abbreviateTooLongStateVariableValueIfNeeded(name, value)); } @Override diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 2d8ca83e7..41b6ad7b1 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -16,6 +16,7 @@ import static java.util.Collections.emptyList; import static java.util.Collections.emptySet; import static org.apache.commons.lang3.StringUtils.countMatches; +import static org.apache.commons.lang3.StringUtils.repeat; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -38,6 +39,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -316,8 +318,8 @@ public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks( dao.updateWorkflowInstanceAfterExecution(i2, a1, asList(childWorkflow1, childWorkflow2), emptyWorkflows, false); - Map> childWorkflows = dao.getWorkflowInstance(id, - EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows; + Map> childWorkflows = dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), + null).childWorkflows; assertThat(childWorkflows.size(), is(1)); for (List childIds : childWorkflows.values()) { assertThat(childIds.size(), is(1)); @@ -509,11 +511,11 @@ public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedExcept List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) - .setState("updateState").setStateText("update text").build(); + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") + .setStateText("update text").build(); sleep(1); assertThrows(IllegalArgumentException.class, - () -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false)); + () -> dao.updateWorkflowInstanceAfterExecution(i2, null, noChildWorkflows, emptyWorkflows, false)); } @Test @@ -578,7 +580,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") .setRootWorkflowId(9283L).setParentWorkflowId(110L).setParentActionId(421L).setNextActivation(started.plusSeconds(1)) .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) - .setStartedIfNotSet(started).setPriority((short)10).build(); + .setStartedIfNotSet(started).setPriority((short) 10).build(); d.insertWorkflowInstance(wf); assertEquals( @@ -646,9 +648,9 @@ public void pollNextWorkflowInstances() { @Test public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() { - long olderLowPrio = createInstance(2, (short)1); - long newerLowPrio = createInstance(1, (short)1); - long newerHighPrio = createInstance(1, (short)2); + long olderLowPrio = createInstance(2, (short) 1); + long newerLowPrio = createInstance(1, (short) 1); + long newerHighPrio = createInstance(1, (short) 2); // high priority comes first List ids = dao.pollNextWorkflowInstanceIds(1); @@ -686,18 +688,12 @@ private WorkflowInstanceDao preparePostgreSQLDao(JdbcTemplate jdbcTemplate) { lenient().when(eDao.getExecutorId()).thenReturn(42); NamedParameterJdbcTemplate namedJdbc = mock(NamedParameterJdbcTemplate.class); TransactionTemplate transactionTemplate = mock(TransactionTemplate.class); - WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(), - jdbcTemplate, - transactionTemplate, - namedJdbc, - eDao, - workflowInstanceExecutor, - workflowInstanceFactory, - env - ); + WorkflowInstanceDao d = new WorkflowInstanceDao(new PostgreSQLVariants(), jdbcTemplate, transactionTemplate, namedJdbc, eDao, + workflowInstanceExecutor, workflowInstanceFactory, env); d.instanceStateTextLength = 128; d.actionStateTextLength = 128; + d.stateVariableValueMaxLength = 128; return d; } @@ -866,9 +862,9 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() { public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusToInProgressAndInsertsAction() { int crashedExecutorId = 999; insertCrashedExecutor(crashedExecutorId, executorDao.getExecutorGroup()); - long id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId") - .setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").setPriority((short) 0) - .build()); + long id = dao.insertWorkflowInstance( + new WorkflowInstance.Builder().setType("test").setExternalId("extId").setExecutorGroup(executorDao.getExecutorGroup()) + .setStatus(executing).setState("processing").setPriority((short) 0).build()); int updated = jdbc.update("update nflow_workflow set executor_id = ? where id = ?", crashedExecutorId, id); assertThat(updated, is(1)); @@ -1028,9 +1024,31 @@ public void run() { } @Test - public void getStateVariableValueMaxLengthWorks() { - int length = dao.getStateVariableValueMaxLength(); + public void getStateVariableValueWorks() { + String value = dao.getStateVariableValue(new SimpleEntry<>("foo", "bar")); + + assertThat(value, is("bar")); + } + + @Test + public void getStateVariableValueThrowsExceptionWhenValueIsTooLong() { + assertThrows(IllegalArgumentException.class, + () -> dao.getStateVariableValue(new SimpleEntry<>("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1)))); + } + + @Test + public void getStateVariableValueAbbreviatesTooLongValue() { + WorkflowInstanceDao d = prepareMockDao(); + d.stateVariableValueMaxLength = 10; + d.abbreviateTooLongStateVariableValues = true; + + String value = d.getStateVariableValue(new SimpleEntry<>("foo", repeat('a', d.getStateVariableValueMaxLength() + 1))); + + assertThat(value.length(), is(d.getStateVariableValueMaxLength())); + } - assertThat(length, is(10240)); + private WorkflowInstanceDao prepareMockDao() { + return new WorkflowInstanceDao(mock(SQLVariants.class), mock(JdbcTemplate.class), mock(TransactionTemplate.class), + mock(NamedParameterJdbcTemplate.class), mock(ExecutorDao.class), workflowInstanceExecutor, workflowInstanceFactory, env); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index 1cdfa9177..efe5a6e36 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -2,8 +2,6 @@ import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -import static org.apache.commons.lang3.StringUtils.repeat; -import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -29,8 +27,6 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.core.env.Environment; -import org.springframework.mock.env.MockEnvironment; import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.service.WorkflowInstanceService; @@ -58,22 +54,19 @@ public class StateExecutionImplTest { StateExecutionImpl execution; StateExecution executionInterface; WorkflowInstance instance; - Environment env; private final DateTime tomorrow = now().plusDays(1); @BeforeEach public void setup() { - when(workflowDao.getStateVariableValueMaxLength()).thenReturn(100); instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") - .setBusinessKey("business").build(); - env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "false"); + .setBusinessKey("business").build(); createExecution(); executionInterface = execution; } private void createExecution() { execution = new StateExecutionImpl(instance, objectStringMapper, workflowDao, workflowInstancePreProcessor, - workflowInstanceService, env); + workflowInstanceService); } @Test @@ -236,28 +229,12 @@ public void getMissingVariableWithDefaultReturnsDefaultValue() { } @Test - public void setVariableWithTooLongValueIsAbbreviated() { - env = new MockEnvironment().withProperty("nflow.workflow.state.variable.value.abbreviated", "true"); - createExecution(); - - execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueMaxLength() + 1)); - - String value = execution.getVariable("foo"); - assertThat(value.length(), is(workflowDao.getStateVariableValueMaxLength())); - assertThat(value, endsWith("...")); - } - - @Test - public void setVariableWithTooLongValueThrowsException() { - assertThrows(IllegalArgumentException.class, - () -> execution.setVariable("foo", repeat('a', workflowDao.getStateVariableValueMaxLength() + 1))); - } + public void setVariableChecksValueLength() { + when(workflowDao.abbreviateTooLongStateVariableValueIfNeeded("foo", "bar")).thenReturn("baz"); - @Test - public void setVariableWorks() { execution.setVariable("foo", "bar"); - assertThat(execution.getVariable("foo"), is("bar")); + assertThat(execution.getVariable("foo"), is("baz")); } @Test diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index 6e9d02012..b8c511572 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -17,3 +17,5 @@ nflow.db.max_pool_size=20 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false + +nflow.workflow.state.variable.value.abbreviated=false From 6f8e1d330fdd5e86e6547b664ca0c2fc40a11a71 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 29 Nov 2019 17:48:15 +0200 Subject: [PATCH 4/7] check state variable values when preprocessing workflow instances --- .../internal/dao/WorkflowInstanceDao.java | 14 ++++++++++++-- .../workflow/WorkflowInstancePreProcessor.java | 8 +++++++- .../internal/dao/WorkflowInstanceDaoTest.java | 11 +++++++++++ .../WorkflowInstancePreProcessorTest.java | 17 ++++++++++++++++- 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index eec1372c5..f224a3d6d 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -437,12 +437,22 @@ public String abbreviateTooLongStateVariableValueIfNeeded(String name, String va getStateVariableValueMaxLength()); return abbreviate(value, getStateVariableValueMaxLength()); } - throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name - + ": maximum allowed length is " + getStateVariableValueMaxLength()); + throw new IllegalArgumentException(getTooLongValueMessage(name, value)); } return value; } + private String getTooLongValueMessage(String name, String value) { + return "Too long value (length = " + length(value) + ") for state variable " + name + ": maximum allowed length is " + + getStateVariableValueMaxLength(); + } + + public void checkStateVariableValue(String name, String value) { + if (!abbreviateTooLongStateVariableValues && length(value) > getStateVariableValueMaxLength()) { + throw new IllegalArgumentException(getTooLongValueMessage(name, value)); + } + } + String insertWorkflowActionSql() { return "insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, execution_start, execution_end)"; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java index d08473315..9c75ad613 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java @@ -8,6 +8,7 @@ import org.springframework.stereotype.Component; +import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.service.WorkflowDefinitionService; import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition; import io.nflow.engine.workflow.instance.WorkflowInstance; @@ -17,9 +18,13 @@ public class WorkflowInstancePreProcessor { private final WorkflowDefinitionService workflowDefinitionService; + private final WorkflowInstanceDao workflowInstanceDao; + @Inject - public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService) { + public WorkflowInstancePreProcessor(WorkflowDefinitionService workflowDefinitionService, + WorkflowInstanceDao workflowInstanceDao) { this.workflowDefinitionService = workflowDefinitionService; + this.workflowInstanceDao = workflowInstanceDao; } // TODO should this set next_activation for child workflows? @@ -45,6 +50,7 @@ public WorkflowInstance process(WorkflowInstance instance) { if (instance.priority == null) { builder.setPriority(def.getSettings().getDefaultPriority()); } + instance.getChangedStateVariables().forEach(workflowInstanceDao::checkStateVariableValue); return builder.build(); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 41b6ad7b1..0825fd804 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1023,6 +1023,17 @@ public void run() { } } + @Test + public void checkStateVariableValueWorks() { + dao.checkStateVariableValue("foo", repeat('a', dao.getStateVariableValueMaxLength())); + } + + @Test + public void checkStateVariableValueThrowsExceptionWhenValueIsTooLong() { + assertThrows(IllegalArgumentException.class, + () -> dao.checkStateVariableValue("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1))); + } + @Test public void getStateVariableValueWorks() { String value = dao.getStateVariableValue(new SimpleEntry<>("foo", "bar")); diff --git a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java index ab219342c..0b4a1df46 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java @@ -6,11 +6,15 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.verify; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; +import io.nflow.engine.internal.dao.WorkflowInstanceDao; import io.nflow.engine.internal.executor.BaseNflowTest; import io.nflow.engine.internal.workflow.WorkflowInstancePreProcessor; import io.nflow.engine.service.DummyTestWorkflow; @@ -19,11 +23,15 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.instance.WorkflowInstance; +@RunWith(MockitoJUnitRunner.class) public class WorkflowInstancePreProcessorTest extends BaseNflowTest { @Mock private WorkflowDefinitionService workflowDefinitionService; + @Mock + private WorkflowInstanceDao workflowInstanceDao; + private WorkflowInstancePreProcessor preProcessor; private WorkflowDefinition dummyWorkflow; @@ -34,7 +42,7 @@ public class WorkflowInstancePreProcessorTest extends BaseNflowTest { public void setup() { dummyWorkflow = new DummyTestWorkflow(new WorkflowSettings.Builder().setDefaultPriority(DEFAULT_PRIORITY).build()); lenient().doReturn(dummyWorkflow).when(workflowDefinitionService).getWorkflowDefinition("dummy"); - preProcessor = new WorkflowInstancePreProcessor(workflowDefinitionService); + preProcessor = new WorkflowInstancePreProcessor(workflowDefinitionService, workflowInstanceDao); } @Test @@ -58,6 +66,13 @@ public void createsMissingState() { assertThat(processed.state, is("CreateLoan")); } + @Test + public void checksStateVariableValues() { + WorkflowInstance i = constructWorkflowInstanceBuilder().putStateVariable("foo", "bar").build(); + preProcessor.process(i); + verify(workflowInstanceDao).checkStateVariableValue("foo", "bar"); + } + @Test public void setsStatusToCreatedWhenStatusIsNotSpecified() { WorkflowInstance i = constructWorkflowInstanceBuilder().setStatus(null).build(); From 6e5ccb9a0aac42a419e4182b9b9e7ccb4261dfe0 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 4 Dec 2019 08:50:44 +0200 Subject: [PATCH 5/7] remove state variable value abbreviation --- .../internal/dao/WorkflowInstanceDao.java | 37 ++++--------------- .../internal/workflow/StateExecutionImpl.java | 3 +- .../WorkflowInstancePreProcessor.java | 2 +- .../main/resources/nflow-engine.properties | 2 - .../internal/dao/WorkflowInstanceDaoTest.java | 34 +---------------- .../workflow/StateExecutionImplTest.java | 4 +- .../WorkflowInstancePreProcessorTest.java | 2 +- .../src/test/resources/junit.properties | 2 - 8 files changed, 14 insertions(+), 72 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index f224a3d6d..6207244dc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -103,7 +103,6 @@ public class WorkflowInstanceDao { int instanceStateTextLength; int actionStateTextLength; int stateVariableValueMaxLength; - boolean abbreviateTooLongStateVariableValues; @Inject public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdbcTemplate, @@ -133,8 +132,6 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb instanceStateTextLength = env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1); actionStateTextLength = env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1); stateVariableValueMaxLength = env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1); - abbreviateTooLongStateVariableValues = env.getRequiredProperty("nflow.workflow.state.variable.value.abbreviated", - Boolean.class); } private int getInstanceStateTextLength() { @@ -187,7 +184,7 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { sqlb.append(", ins").append(pos).append(" as (").append(insertWorkflowInstanceStateSql()) .append(" select wf.id,0,?,? from wf)"); args[pos++] = variable.getKey(); - args[pos++] = getStateVariableValue(variable); + args[pos++] = variable.getValue(); } sqlb.append(" select wf.id from wf"); return jdbc.queryForObject(sqlb.toString(), Long.class, args); @@ -197,10 +194,6 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { } } - String getStateVariableValue(Entry variable) { - return abbreviateTooLongStateVariableValueIfNeeded(variable.getKey(), variable.getValue()); - } - boolean useBatchUpdate() { return !disableBatchUpdates && sqlVariants.useBatchUpdate(); } @@ -267,7 +260,7 @@ void insertVariables(final long id, final long actionId, Map cha private void insertVariablesWithMultipleUpdates(final long id, final long actionId, Map changedStateVariables) { for (Entry entry : changedStateVariables.entrySet()) { int updated = jdbc.update(insertWorkflowInstanceStateSql() + " values (?,?,?,?)", id, actionId, entry.getKey(), - getStateVariableValue(entry)); + entry.getValue()); if (updated != 1) { throw new IllegalStateException("Failed to insert state variable " + entry.getKey()); } @@ -287,7 +280,7 @@ protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLEx ps.setLong(1, id); ps.setLong(2, actionId); ps.setString(3, variable.getKey()); - ps.setString(4, getStateVariableValue(variable)); + ps.setString(4, variable.getValue()); return true; } }); @@ -424,32 +417,16 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work sqlb.append(", ins").append(pos).append(" as (").append(insertWorkflowInstanceStateSql()) .append(" select wf.id,act.id,?,? from wf,act)"); args[pos++] = variable.getKey(); - args[pos++] = getStateVariableValue(variable); + args[pos++] = variable.getValue(); } sqlb.append(" select act.id from act"); jdbc.queryForObject(sqlb.toString(), Long.class, args); } - public String abbreviateTooLongStateVariableValueIfNeeded(String name, String value) { + public void checkStateVariableValueLength(String name, String value) { if (length(value) > getStateVariableValueMaxLength()) { - if (abbreviateTooLongStateVariableValues) { - logger.warn("Too long value (length = {}) for state variable {}: abbreviated to {} characters.", length(value), name, - getStateVariableValueMaxLength()); - return abbreviate(value, getStateVariableValueMaxLength()); - } - throw new IllegalArgumentException(getTooLongValueMessage(name, value)); - } - return value; - } - - private String getTooLongValueMessage(String name, String value) { - return "Too long value (length = " + length(value) + ") for state variable " + name + ": maximum allowed length is " - + getStateVariableValueMaxLength(); - } - - public void checkStateVariableValue(String name, String value) { - if (!abbreviateTooLongStateVariableValues && length(value) > getStateVariableValueMaxLength()) { - throw new IllegalArgumentException(getTooLongValueMessage(name, value)); + throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name + + ": maximum allowed length is " + getStateVariableValueMaxLength()); } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index ac3f94995..628769eca 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -118,7 +118,8 @@ public String getVariable(String name, String defaultValue) { @Override public void setVariable(String name, String value) { - instance.stateVariables.put(name, workflowDao.abbreviateTooLongStateVariableValueIfNeeded(name, value)); + workflowDao.checkStateVariableValueLength(name, value); + instance.stateVariables.put(name, value); } @Override diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java index 9c75ad613..c94c4ff01 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/WorkflowInstancePreProcessor.java @@ -50,7 +50,7 @@ public WorkflowInstance process(WorkflowInstance instance) { if (instance.priority == null) { builder.setPriority(def.getSettings().getDefaultPriority()); } - instance.getChangedStateVariables().forEach(workflowInstanceDao::checkStateVariableValue); + instance.getChangedStateVariables().forEach(workflowInstanceDao::checkStateVariableValueLength); return builder.build(); } } diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 0b39ba426..53df407ca 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -66,5 +66,3 @@ nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false nflow.definition.persist=true - -nflow.workflow.state.variable.value.abbreviated=false diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 0825fd804..3d23b131b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -39,7 +39,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; -import java.util.AbstractMap.SimpleEntry; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; @@ -1025,41 +1024,12 @@ public void run() { @Test public void checkStateVariableValueWorks() { - dao.checkStateVariableValue("foo", repeat('a', dao.getStateVariableValueMaxLength())); + dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength())); } @Test public void checkStateVariableValueThrowsExceptionWhenValueIsTooLong() { assertThrows(IllegalArgumentException.class, - () -> dao.checkStateVariableValue("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1))); - } - - @Test - public void getStateVariableValueWorks() { - String value = dao.getStateVariableValue(new SimpleEntry<>("foo", "bar")); - - assertThat(value, is("bar")); - } - - @Test - public void getStateVariableValueThrowsExceptionWhenValueIsTooLong() { - assertThrows(IllegalArgumentException.class, - () -> dao.getStateVariableValue(new SimpleEntry<>("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1)))); - } - - @Test - public void getStateVariableValueAbbreviatesTooLongValue() { - WorkflowInstanceDao d = prepareMockDao(); - d.stateVariableValueMaxLength = 10; - d.abbreviateTooLongStateVariableValues = true; - - String value = d.getStateVariableValue(new SimpleEntry<>("foo", repeat('a', d.getStateVariableValueMaxLength() + 1))); - - assertThat(value.length(), is(d.getStateVariableValueMaxLength())); - } - - private WorkflowInstanceDao prepareMockDao() { - return new WorkflowInstanceDao(mock(SQLVariants.class), mock(JdbcTemplate.class), mock(TransactionTemplate.class), - mock(NamedParameterJdbcTemplate.class), mock(ExecutorDao.class), workflowInstanceExecutor, workflowInstanceFactory, env); + () -> dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1))); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index efe5a6e36..f67044555 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -230,11 +230,9 @@ public void getMissingVariableWithDefaultReturnsDefaultValue() { @Test public void setVariableChecksValueLength() { - when(workflowDao.abbreviateTooLongStateVariableValueIfNeeded("foo", "bar")).thenReturn("baz"); - execution.setVariable("foo", "bar"); - assertThat(execution.getVariable("foo"), is("baz")); + verify(workflowDao).checkStateVariableValueLength("foo", "bar"); } @Test diff --git a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java index 0b4a1df46..9cac2e043 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/workflow/WorkflowInstancePreProcessorTest.java @@ -70,7 +70,7 @@ public void createsMissingState() { public void checksStateVariableValues() { WorkflowInstance i = constructWorkflowInstanceBuilder().putStateVariable("foo", "bar").build(); preProcessor.process(i); - verify(workflowInstanceDao).checkStateVariableValue("foo", "bar"); + verify(workflowInstanceDao).checkStateVariableValueLength("foo", "bar"); } @Test diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index b8c511572..6e9d02012 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -17,5 +17,3 @@ nflow.db.max_pool_size=20 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false - -nflow.workflow.state.variable.value.abbreviated=false From a4ab45a41e66e67a83d87d200ab5237b6bd46c07 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 4 Dec 2019 11:55:27 +0200 Subject: [PATCH 6/7] use different retry mechanism for too long state variable value case --- CHANGELOG.md | 3 + .../internal/dao/WorkflowInstanceDao.java | 3 +- .../executor/WorkflowStateProcessor.java | 37 ++++++++---- .../StateVariableValueTooLongException.java | 10 ++++ .../main/resources/nflow-engine.properties | 1 + .../internal/dao/WorkflowInstanceDaoTest.java | 3 +- .../executor/WorkflowDispatcherTest.java | 1 + .../WorkflowStateProcessorFactoryTest.java | 1 + .../executor/WorkflowStateProcessorTest.java | 57 ++++++++++++++++++- .../src/test/resources/junit.properties | 1 + 10 files changed, 104 insertions(+), 13 deletions(-) create mode 100644 nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d308c4a09..96be56d74 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,12 @@ ## 6.0.1-SNAPSHOT (future release) **Highlights** +- `nflow-engine` + - Check that state variable value fits into the database column **Details** - `nflow-engine` + - Throw exception if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60). - Dependency updates: - jetty 9.4.24.v20191120 - junit4 4.13-rc-1 diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 6207244dc..78fb31dfc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -71,6 +71,7 @@ import io.nflow.engine.internal.storage.db.SQLVariants; import io.nflow.engine.model.ModelObject; import io.nflow.engine.service.WorkflowInstanceInclude; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.QueryWorkflowInstances; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; @@ -425,7 +426,7 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work public void checkStateVariableValueLength(String name, String value) { if (length(value) > getStateVariableValueMaxLength()) { - throw new IllegalArgumentException("Too long value (length = " + length(value) + ") for state variable " + name + throw new StateVariableValueTooLongException("Too long value (length = " + length(value) + ") for state variable " + name + ": maximum allowed length is " + getStateVariableValueMaxLength()); } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 2fdd7c27c..07fb63fc5 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -50,6 +50,7 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -74,6 +75,7 @@ class WorkflowStateProcessor implements Runnable { private final int unknownWorkflowStateRetryDelay; private final int stateProcessingRetryDelay; private final int stateSaveRetryDelay; + private final int stateVariableValueTooLongRetryDelay; private boolean internalRetryEnabled = true; private final Map processingInstances; private long startTimeSeconds; @@ -96,6 +98,8 @@ class WorkflowStateProcessor implements Runnable { unknownWorkflowStateRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.state.retry.delay.minutes", Integer.class); stateProcessingRetryDelay = env.getRequiredProperty("nflow.executor.stateProcessingRetryDelay.seconds", Integer.class); stateSaveRetryDelay = env.getRequiredProperty("nflow.executor.stateSaveRetryDelay.seconds", Integer.class); + stateVariableValueTooLongRetryDelay = env.getRequiredProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", + Integer.class); } @Override @@ -142,10 +146,13 @@ private void runImpl() { rescheduleUnknownWorkflowState(instance); return; } - + boolean saveInstanceState = true; try { processBeforeListeners(listenerContext); listenerContext.nextAction = processWithListeners(listenerContext, instance, definition, execution, state); + } catch (StateVariableValueTooLongException e) { + instance = rescheduleStateVariableValueTooLong(e, instance); + saveInstanceState = false; } catch (Throwable t) { execution.setFailed(t); logger.error("Handler threw exception, trying again later.", t); @@ -154,14 +161,16 @@ private void runImpl() { execution.setNextStateReason(getStackTrace(t)); execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition); } finally { - if (execution.isFailed()) { - processAfterFailureListeners(listenerContext, execution.getThrown()); - } else { - processAfterListeners(listenerContext); - optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution); + if (saveInstanceState) { + if (execution.isFailed()) { + processAfterFailureListeners(listenerContext, execution.getThrown()); + } else { + processAfterListeners(listenerContext); + optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution); + } + subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); + instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder); } - subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); - instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder); } } logger.debug("Finished."); @@ -191,6 +200,14 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) { logger.debug("Finished."); } + private WorkflowInstance rescheduleStateVariableValueTooLong(StateVariableValueTooLongException e, WorkflowInstance instance) { + logger.warn("Failed to process workflow instance {}: {} - rescheduling workflow instance", instance.id, e.getMessage()); + instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(stateVariableValueTooLongRetryDelay)) + .setStatus(inProgress).setStateText(e.getMessage()).build(); + workflowInstanceDao.updateWorkflowInstance(instance); + return instance; + } + private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, int subsequentStateExecutions, StateExecutionImpl execution) { DateTime nextActivation = execution.getNextActivation(); @@ -230,8 +247,8 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, try { return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { - logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", - instance.id, stateSaveRetryDelay, ex); + logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay, + ex); sleepIgnoreInterrupted(stateSaveRetryDelay); } } while (internalRetryEnabled); diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java new file mode 100644 index 000000000..a04182c47 --- /dev/null +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/executor/StateVariableValueTooLongException.java @@ -0,0 +1,10 @@ +package io.nflow.engine.workflow.executor; + +public class StateVariableValueTooLongException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public StateVariableValueTooLongException(String message) { + super(message); + } +} diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 53df407ca..2c988becf 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -6,6 +6,7 @@ nflow.executor.keepalive.seconds=60 nflow.executor.stuckThreadThreshold.seconds=60 nflow.executor.stateProcessingRetryDelay.seconds=60 nflow.executor.stateSaveRetryDelay.seconds=60 +nflow.executor.stateVariableValueTooLongRetryDelay.minutes=60 nflow.dispatcher.sleep.ms=1000 nflow.dispatcher.await.termination.seconds=60 diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 3d23b131b..61a2f976e 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -66,6 +66,7 @@ import io.nflow.engine.internal.executor.WorkflowInstanceExecutor; import io.nflow.engine.internal.storage.db.SQLVariants; import io.nflow.engine.service.WorkflowInstanceInclude; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.QueryWorkflowInstances; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -1029,7 +1030,7 @@ public void checkStateVariableValueWorks() { @Test public void checkStateVariableValueThrowsExceptionWhenValueIsTooLong() { - assertThrows(IllegalArgumentException.class, + assertThrows(StateVariableValueTooLongException.class, () -> dao.checkStateVariableValueLength("foo", repeat('a', dao.getStateVariableValueMaxLength() + 1))); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index d6befa876..e9a483722 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -80,6 +80,7 @@ public void setup() { env.setProperty("nflow.executor.stuckThreadThreshold.seconds", "60"); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60"); + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); when(executorDao.isTransactionSupportEnabled()).thenReturn(true); executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-")); dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index e48af1268..485df74e2 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -50,6 +50,7 @@ public void setup() { env.setProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.toString(STUCK_THREAD_THRESHOLD)); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60"); + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao, workflowInstancePreProcessor, env); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index d5d6b7234..70dd4893c 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -28,6 +28,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -92,6 +93,7 @@ import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.executor.StateVariableValueTooLongException; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -155,6 +157,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { WorkflowDefinition wakeWf = new NotifyTestWorkflow(); + WorkflowDefinition stateVariableWf = new StateVariableWorkflow(); + static WorkflowInstance newChildWorkflow = mock(WorkflowInstance.class); static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); @@ -173,7 +177,7 @@ public void setup() { env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60"); env.setProperty("nflow.executor.stateProcessingRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "1"); - + env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); setCurrentMillisFixed(currentTimeMillis()); @@ -183,6 +187,7 @@ public void setup() { lenient().doReturn(simpleWf).when(workflowDefinitions).getWorkflowDefinition("simple-test"); lenient().doReturn(failingWf).when(workflowDefinitions).getWorkflowDefinition("failing-test"); lenient().doReturn(wakeWf).when(workflowDefinitions).getWorkflowDefinition("wake-test"); + lenient().doReturn(stateVariableWf).when(workflowDefinitions).getWorkflowDefinition("state-variable"); filterChain(listener1); filterChain(listener2); lenient().when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); @@ -299,6 +304,22 @@ public void instanceWithUnsupportedStateIsRescheduled() { is("Unsupported workflow state"), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } + @Test + public void stateIsRescheduledWhenStateVariableValueIsTooLong() { + WorkflowInstance instance = executingInstanceBuilder().setType("state-variable").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + String errorMessage = "Too long state variable value"; + doThrow(new StateVariableValueTooLongException(errorMessage)).when(workflowInstanceDao) + .checkStateVariableValueLength(anyString(), anyString()); + DateTime oneHourInFuture = now().plusHours(1); + + executor.run(); + + verify(workflowInstanceDao) + .updateWorkflowInstance(argThat(matchesWorkflowInstance(inProgress, StateVariableWorkflow.State.start, 0, + is(errorMessage), greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); + } + @Test public void workflowStatusIsSetToManualForManualStates() { WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("beforeManual") @@ -1146,4 +1167,38 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { } + public static class StateVariableWorkflow extends WorkflowDefinition { + + protected StateVariableWorkflow() { + super("state-variable", State.start, State.end); + permit(State.start, State.end); + } + + public static enum State implements WorkflowState { + start(WorkflowStateType.start), end(WorkflowStateType.end); + + private final WorkflowStateType stateType; + + private State(WorkflowStateType stateType) { + this.stateType = stateType; + } + + @Override + public WorkflowStateType getType() { + return stateType; + } + + @Override + public String getDescription() { + return name(); + } + } + + public NextAction start(StateExecution execution) { + execution.setVariable("foo", "bar"); + return moveToState(State.end, "Done."); + } + + } + } diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index 6e9d02012..3c4202e4c 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -2,6 +2,7 @@ nflow.executor.group=junit nflow.executor.timeout.seconds=900 nflow.executor.keepalive.seconds=60 nflow.executor.stateSaveRetryDelay.seconds=60 +nflow.executor.stateVariableValueTooLongRetryDelay.minutes=60 nflow.workflow.instance.query.max.results=10000 nflow.workflow.instance.query.max.results.default=100 From baf02e378721c3e4e19a94eedebfe4930fd617b2 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 4 Dec 2019 12:26:19 +0200 Subject: [PATCH 7/7] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96be56d74..1ec7f2f90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ **Details** - `nflow-engine` - - Throw exception if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60). + - Throw `StateVariableValueTooLongException` if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing and not handled by the state implementation, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60). - Dependency updates: - jetty 9.4.24.v20191120 - junit4 4.13-rc-1