diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6914586..603b09723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ -## 5.6.1-SNAPSHOT (future release) +## 5.7.0-SNAPSHOT (future release) **Highlights** +- Added `started` timestamp to workflow instance table (requires database update) **Details** +- `nflow-engine` + - Add started timestamp to workflow instance table. This makes the instance queries much faster when instances have lots of actions, as there is no need to join the nflow_workflow_action table to the query anymore. + - Deprecated WorkflowInstanceInclude.STARTED enum value. This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. ## 5.6.0 (2019-05-21) @@ -16,10 +20,10 @@ - jetty 9.4.17.v20190418 - `nflow-engine` - Retry workflow state processing until all steps in nFlow-side are executed successfully. This will prevent workflow instances from being locked in `executing` status, if e.g. database connection fails after locking the instance and before querying the full workflow instance information (`WorkflowStateProcessor`). - - Fix #306: create empty ArrayList with default initial size - - Log more executor details on startup + - Fix #306: create empty ArrayList with default initial size. + - Log more executor details on startup. - Fix #311: Replace references to WorkflowDefinition with AbstractWorkflowDefinition to support non-enum WorkflowStates - - Use name() instead of toString() when getting workflow instance initial state name + - Use name() instead of toString() when getting workflow instance initial state name. ## 5.5.0 (2019-04-04) diff --git a/DECISION_LOG.md b/DECISION_LOG.md index c9557d5bb..ab219be2c 100644 --- a/DECISION_LOG.md +++ b/DECISION_LOG.md @@ -2,28 +2,34 @@ (newest first) -2015-04-19 efonsell ---------------------------- +6: 2019-05-22 efonsell +---------------------- +Add WorkflowInstance.started value (time when processing of the first state of the workflow was started) to the database as nflow_workflow.started. Reading the value from nflow_workflow_action table was causing performance issues when the amount of actions in the database was growing. + +This decision overrides the decision number 2. + +5: 2015-04-19 efonsell +---------------------- Define default property-values in properties-files only and make them required when there is a static default value. Do not define default values in properties-files for properties that have a dynamic default value. For example, nflow.dispatcher.sleep.ms default value 1000 is defined in nflow-engine.properties, but nflow.executor.thread.count default value which is based on number of processors is defined in the Java code. -2015-01-10 gmokki, efonsell ---------------------------- +4: 2015-01-10 gmokki, efonsell +------------------------------ When polling for next workflow instances in WorkflowInstanceDao, the modified field in OptimisticLockKey is handled as String instead of Timestamp to avoid problems caused by losing millisecond precision from timestamps in some cases (for example with some older versions of MySQL). -2014-12-10 eputtone -------------------- +3: 2014-12-10 eputtone +---------------------- Internal nFlow functionalities can access DAO layer (package com.nitorcreations.nflow.engine.internal.dao) directly instead of going through service layer (package com.nitorcreations.nflow.engine.service). Rationale: service layer is currently part of public API that we wish to keep as simple as possible. Example: WorkflowDefinitionResource in nflow-rest-api uses WorkflowDefinitionDao directly for retrieving StoredWorkflowDefinitions, because we don't want to confuse public API users with multiple workflow definition representations. -2014-11-26 jsyrjala -------------------- +2: 2014-11-26 jsyrjala +---------------------- WorkflowInstance.started value (time when processing of the workflow is first started) is fetched from the earliest nflow_workflow_action.start_execution value. This way no changes to database is needed. If needed because of performance, the value may be later added as nflow_workflow.started. Started value is fetched with a subselect. This requires that nflow_workflow_action.workflow_id is indexed. Postgres has explicit index, h2 and mysql have implicit index via foreign key. -2014-11-26 efonsell -------------------- +1: 2014-11-26 efonsell +---------------------- Swagger-UI static resources are downloaded from GitHub based on version defined in pom.xml and extracted to nflow-jetty target directory, except index.html which is customized and thus updated manually in nFlow repository. diff --git a/nflow-engine/pom.xml b/nflow-engine/pom.xml index 46cb3cfdf..bee0efe2d 100644 --- a/nflow-engine/pom.xml +++ b/nflow-engine/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java index 153f6a1c9..ea04e7c44 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java @@ -146,10 +146,10 @@ public PreparedStatement createPreparedStatement(Connection con) throws SQLExcep return p; } }, keyHolder); - int executorId = keyHolder.getKey().intValue(); + int allocatedExecutorId = keyHolder.getKey().intValue(); logger.info("Joined executor group {} as executor {} running on host {} with process id {}.", - executorGroup, executorId, host, pid); - return executorId; + executorGroup, allocatedExecutorId, host, pid); + return allocatedExecutorId; } public void updateActiveTimestamp() { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index f03acb485..611d1921f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -2,7 +2,6 @@ import static io.nflow.engine.internal.dao.DaoUtil.firstColumnLengthExtractor; import static io.nflow.engine.internal.dao.DaoUtil.getInt; -import static io.nflow.engine.internal.dao.DaoUtil.toDateTime; import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; @@ -332,7 +331,8 @@ public int updateWorkflowInstance(WorkflowInstance instance) { Object nextActivation = sqlVariants.toTimestampObject(instance.nextActivation); return jdbc.update(updateWorkflowInstanceSql(), instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), nextActivation, nextActivation, nextActivation, - instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, instance.id); + instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, toTimestamp(instance.started), + instance.id); } private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instance, final WorkflowInstanceAction action, @@ -368,8 +368,8 @@ public void recoverWorkflowInstancesFromDeadNodes() { private List getRecoverableInstances() { String sql = "select id, state from nflow_workflow where executor_id in (select id from nflow_executor where " - + executorInfo.getExecutorGroupCondition() + " and id <> " + executorInfo.getExecutorId() - + " and " + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")"; + + executorInfo.getExecutorGroupCondition() + " and id <> " + executorInfo.getExecutorId() + " and " + + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")"; return jdbc.query(sql, new RowMapper() { @Override public InstanceInfo mapRow(ResultSet rs, int rowNum) throws SQLException { @@ -386,12 +386,10 @@ private void recoverWorkflowInstance(final int instanceId, final WorkflowInstanc @Override protected void doInTransactionWithoutResult(TransactionStatus status) { int executorId = executorInfo.getExecutorId(); - int updated = jdbc.update( - "update nflow_workflow set executor_id = null, status = " + sqlVariants.workflowStatus(inProgress) - + " where id = ? and executor_id in (select id from nflow_executor where " - + executorInfo.getExecutorGroupCondition() + " and id <> " + executorId + " and " - + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")", - instanceId); + int updated = jdbc.update("update nflow_workflow set executor_id = null, status = " + + sqlVariants.workflowStatus(inProgress) + " where id = ? and executor_id in (select id from nflow_executor where " + + executorInfo.getExecutorGroupCondition() + " and id <> " + executorId + " and " + + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")", instanceId); if (updated > 0) { insertWorkflowInstanceAction(action); } @@ -411,10 +409,9 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work Timestamp nextActivation = toTimestamp(instance.nextActivation); Object[] fixedValues = new Object[] { instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), nextActivation, nextActivation, nextActivation, - instance.status == executing ? executorId : null, instance.retries, instance.id, executorId, action.type.name(), - action.state, abbreviate(action.stateText, getActionStateTextLength()), action.retryNo, - toTimestamp(action.executionStart), - toTimestamp(action.executionEnd) }; + instance.status == executing ? executorId : null, instance.retries, toTimestamp(action.executionStart), instance.id, + executorId, action.type.name(), action.state, abbreviate(action.stateText, getActionStateTextLength()), action.retryNo, + toTimestamp(action.executionStart), toTimestamp(action.executionEnd) }; int pos = fixedValues.length; Object[] args = Arrays.copyOf(fixedValues, pos + changedStateVariables.size() * 2); for (Entry var : changedStateVariables.entrySet()) { @@ -434,7 +431,8 @@ String insertWorkflowActionSql() { private String updateWorkflowInstanceSql() { return "update nflow_workflow set status = " + sqlVariants.workflowStatus() + ", state = ?, state_text = ?, " + "next_activation = " + sqlVariants.nextActivationUpdate() - + ", external_next_activation = null, executor_id = ?, retries = ? where id = ? and executor_id = " + + ", external_next_activation = null, executor_id = ?, retries = ?, " + + "started = (case when started is null then ? else started end) where id = ? and executor_id = " + executorInfo.getExecutorId(); } @@ -465,7 +463,8 @@ public boolean updateNotRunningWorkflowInstance(WorkflowInstance instance) { @Transactional public boolean wakeUpWorkflowExternally(int workflowInstanceId, List expectedStates) { StringBuilder sql = new StringBuilder("update nflow_workflow set next_activation = (case when executor_id is null then ") - .append("case when " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + " then next_activation else current_timestamp end else next_activation end), ") + .append("case when " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + + " then next_activation else current_timestamp end else next_activation end), ") .append("external_next_activation = current_timestamp where ").append(executorInfo.getExecutorGroupCondition()) .append(" and id = ? and next_activation is not null"); return addExpectedStatesToQueryAndUpdate(sql, workflowInstanceId, expectedStates); @@ -494,13 +493,8 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl } public WorkflowInstance getWorkflowInstance(int id, Set includes, Long maxActions) { - String sql = "select * from nflow_workflow w where w.id = ?"; - WorkflowInstance.Builder builder = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id); - if (includes.contains(WorkflowInstanceInclude.STARTED)) { - builder.setStarted(toDateTime(jdbc - .queryForObject("select min(execution_start) from nflow_workflow_action where workflow_id = ?", Timestamp.class, id))); - } - WorkflowInstance instance = builder.build(); + String sql = "select * from nflow_workflow where id = ?"; + WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build(); if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) { fillState(instance); } @@ -514,10 +508,9 @@ public WorkflowInstance getWorkflowInstance(int id, Set } private void fillState(final WorkflowInstance instance) { - jdbc.query( - "select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " - + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " - + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", + jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " + + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " + + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", new RowCallbackHandler() { @Override public void processRow(ResultSet rs) throws SQLException { @@ -541,9 +534,8 @@ String updateInstanceForExecutionQuery() { String whereConditionForInstanceUpdate() { return "where executor_id is null and status in (" + sqlVariants.workflowStatus(created) + ", " - + sqlVariants.workflowStatus(inProgress) + ") and " - + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + " and " - + executorInfo.getExecutorGroupCondition() + " order by next_activation asc"; + + sqlVariants.workflowStatus(inProgress) + ") and " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + + " and " + executorInfo.getExecutorGroupCondition() + " order by next_activation asc"; } private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) { @@ -558,8 +550,8 @@ private List pollNextWorkflowInstanceIdsWithTransaction(final int batch @Override public List doInTransaction(TransactionStatus transactionStatus) { String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize); - List instances = jdbc.query(sql, (rs, rowNum) -> - new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"))); + List instances = jdbc.query(sql, + (rs, rowNum) -> new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"))); if (instances.isEmpty()) { return emptyList(); } @@ -635,31 +627,28 @@ public int compareTo(OptimisticLockKey other) { } public List queryWorkflowInstances(QueryWorkflowInstances query) { - String sql = "select w.*, " - + "(select min(execution_start) from nflow_workflow_action a where a.workflow_id = w.id) as started " - + "from nflow_workflow w "; - + String sql = "select * from nflow_workflow "; List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); conditions.add(executorInfo.getExecutorGroupCondition()); if (!isEmpty(query.ids)) { - conditions.add("w.id in (:ids)"); + conditions.add("id in (:ids)"); params.addValue("ids", query.ids); } if (!isEmpty(query.types)) { - conditions.add("w.type in (:types)"); + conditions.add("type in (:types)"); params.addValue("types", query.types); } if (query.parentWorkflowId != null) { - conditions.add("w.parent_workflow_id = :parent_workflow_id"); + conditions.add("parent_workflow_id = :parent_workflow_id"); params.addValue("parent_workflow_id", query.parentWorkflowId); } if (query.parentActionId != null) { - conditions.add("w.parent_action_id = :parent_action_id"); + conditions.add("parent_action_id = :parent_action_id"); params.addValue("parent_action_id", query.parentActionId); } if (!isEmpty(query.states)) { - conditions.add("w.state in (:states)"); + conditions.add("state in (:states)"); params.addValue("states", query.states); } if (!isEmpty(query.statuses)) { @@ -667,20 +656,20 @@ public List queryWorkflowInstances(QueryWorkflowInstances quer for (WorkflowInstanceStatus s : query.statuses) { convertedStatuses.add(s.name()); } - conditions.add("w.status" + sqlVariants.castToText() + " in (:statuses)"); + conditions.add("status" + sqlVariants.castToText() + " in (:statuses)"); params.addValue("statuses", convertedStatuses); } if (query.businessKey != null) { - conditions.add("w.business_key = :business_key"); + conditions.add("business_key = :business_key"); params.addValue("business_key", query.businessKey); } if (query.externalId != null) { - conditions.add("w.external_id = :external_id"); + conditions.add("external_id = :external_id"); params.addValue("external_id", query.externalId); } - conditions.add("w.executor_group = :executor_group"); + conditions.add("executor_group = :executor_group"); params.addValue("executor_group", executorInfo.getExecutorGroup()); - sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by w.created desc"; + sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by created desc"; sql = sqlVariants.limit(sql, getMaxResults(query.maxResults)); List ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() .map(WorkflowInstance.Builder::build).collect(toList()); @@ -722,8 +711,9 @@ private long getMaxResults(Long maxResults) { private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) : EMPTY_ACTION_STATE_MAP; - String sql = sqlVariants.limit("select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", - getMaxActions(maxActions)); + String sql = sqlVariants.limit( + "select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", + getMaxActions(maxActions)); instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id)); } @@ -790,10 +780,11 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep .setState(rs.getString("state")) // .setStateText(rs.getString("state_text")) // .setActions(new ArrayList()) // - .setNextActivation(sqlVariants.getDateTime(rs,"next_activation")) // + .setNextActivation(sqlVariants.getDateTime(rs, "next_activation")) // .setRetries(rs.getInt("retries")) // - .setCreated(sqlVariants.getDateTime(rs,"created")) // - .setModified(sqlVariants.getDateTime(rs,"modified")) // + .setCreated(sqlVariants.getDateTime(rs, "created")) // + .setModified(sqlVariants.getDateTime(rs, "modified")) // + .setStartedIfNotSet(sqlVariants.getDateTime(rs, "started")) // .setExecutorGroup(rs.getString("executor_group")) // .setSignal(ofNullable(getInt(rs, "workflow_signal"))); } @@ -877,9 +868,9 @@ public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer his MapSqlParameterSource params = new MapSqlParameterSource(); params.addValue("workflowId", workflowInstanceId); params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); - Integer maxActionId = namedJdbc.queryForObject( - "select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + - sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); + Integer maxActionId = namedJdbc + .queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); int deletedActions = 0; if (maxActionId != null) { params.addValue("maxActionId", maxActionId); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 80e5b3e24..360c5db41 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.executor; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.stopInState; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; @@ -8,6 +10,7 @@ import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; import static org.joda.time.DateTime.now; @@ -17,7 +20,6 @@ import static org.springframework.util.ReflectionUtils.invokeMethod; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.Iterator; import java.util.List; @@ -41,7 +43,6 @@ import io.nflow.engine.listener.WorkflowExecutorListener; import io.nflow.engine.listener.WorkflowExecutorListener.ListenerContext; import io.nflow.engine.service.WorkflowDefinitionService; -import io.nflow.engine.service.WorkflowInstanceInclude; import io.nflow.engine.service.WorkflowInstanceService; import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition; import io.nflow.engine.workflow.definition.NextAction; @@ -119,9 +120,7 @@ public void run() { private void runImpl() { logger.debug("Starting."); WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, - EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS, WorkflowInstanceInclude.CURRENT_STATE_VARIABLES, - WorkflowInstanceInclude.STARTED), - null); + EnumSet.of(CHILD_WORKFLOW_IDS, CURRENT_STATE_VARIABLES), null); logIfLagging(instance); AbstractWorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type); if (definition == null) { @@ -220,7 +219,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, execution.wakeUpParentWorkflow(); } } - WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) // + WorkflowInstance.Builder instanceBuilder = new WorkflowInstance.Builder(instance) // .setNextActivation(execution.getNextActivation()) // .setStatus(getStatus(execution, nextState)) // .setStateText(getStateText(instance, execution)) // @@ -228,7 +227,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, .setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0); do { try { - return persistWorkflowInstanceState(execution, instance, actionBuilder, builder); + return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { logger.error("Failed to save workflow instance new state, retrying after {} seconds", stateSaveRetryDelay, ex); sleepIgnoreInterrupted(stateSaveRetryDelay); @@ -244,22 +243,23 @@ void setInternalRetryEnabled(boolean internalRetryEnabled) { this.internalRetryEnabled = internalRetryEnabled; } - private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance, - WorkflowInstanceAction.Builder actionBuilder, WorkflowInstance.Builder builder) { + private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map originalStateVars, + WorkflowInstanceAction.Builder actionBuilder, WorkflowInstance.Builder instanceBuilder) { + WorkflowInstanceAction action = actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)) + .setStateText(execution.getNextStateReason()).build(); if (execution.isStateProcessInvoked()) { - actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)).setStateText(execution.getNextStateReason()); + WorkflowInstance instance = instanceBuilder.setStartedIfNotSet(action.executionStart).build(); if (execution.isFailed()) { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), actionBuilder.build(), - Collections. emptyList(), Collections. emptyList(), true); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(instance, action, emptyList(), emptyList(), true); } else { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), actionBuilder.build(), - execution.getNewChildWorkflows(), execution.getNewWorkflows(), execution.createAction()); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(instance, action, execution.getNewChildWorkflows(), + execution.getNewWorkflows(), execution.createAction()); processSuccess(execution, instance); } } else { - workflowInstanceDao.updateWorkflowInstance(builder.build()); + workflowInstanceDao.updateWorkflowInstance(instanceBuilder.build()); } - return builder.setOriginalStateVariables(instance.stateVariables).build(); + return instanceBuilder.setOriginalStateVariables(originalStateVars).build(); } private void processSuccess(StateExecutionImpl execution, WorkflowInstance instance) { diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java index 883dcdd1a..88dc589fc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java @@ -7,7 +7,9 @@ public enum WorkflowInstanceInclude { /** * The execution start date of the first action of the workflow instance (WorkflowInstance.started). + * @deprecated This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. */ + @Deprecated STARTED, /** diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java index ee79cb6cc..353358504 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java @@ -12,6 +12,8 @@ import java.util.Optional; import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.nflow.engine.internal.workflow.ObjectStringMapper; @@ -229,6 +231,8 @@ public String getStateVariable(String name, String defaultValue) { */ public static class Builder { + private static final Logger LOG = LoggerFactory.getLogger(WorkflowInstance.Builder.class); + Integer id; Integer executorId; Integer rootWorkflowId; @@ -291,6 +295,7 @@ public Builder(WorkflowInstance copy) { this.retries = copy.retries; this.created = copy.created; this.modified = copy.modified; + this.started = copy.started; this.executorGroup = copy.executorGroup; this.signal = copy.signal; this.mapper = copy.mapper; @@ -525,12 +530,30 @@ public Builder setModified(DateTime modified) { } /** - * Set the start timestamp. + * Set the started timestamp. Log a warning if the value was already set and ignore the new value. * @param started Start time. * @return this. + * @deprecated Use setStartedIfNotSet instead. */ + @Deprecated public Builder setStarted(DateTime started) { - this.started = started; + if (this.started == null) { + this.started = started; + } else { + LOG.warn("Started is already set to {}, ignoring new value {}."); + } + return this; + } + + /** + * Set the started timestamp if it has not already been set. + * @param started Start time. + * @return this. + */ + public Builder setStartedIfNotSet(DateTime started) { + if (this.started == null) { + this.started = started; + } return this; } diff --git a/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql index c7e3f04a0..0fed1dc0a 100644 --- a/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql @@ -17,6 +17,7 @@ create table nflow_workflow ( retries int not null default 0, created timestamp(3) not null default current_timestamp, modified timestamp(3) not null default current_timestamp, + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -112,6 +113,7 @@ create table nflow_archive_workflow ( retries int not null default 0, created timestamp(3) not null, modified timestamp(3) not null, + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql index 5120ed186..38a3202cd 100644 --- a/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp not null default current_timestamp, modified timestamp not null default current_timestamp, + started timestamp, executor_group varchar(64) not null, workflow_signal int ); @@ -99,6 +100,7 @@ create table if not exists nflow_archive_workflow ( retries int not null, created timestamp not null, modified timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int ); diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql index c35a9aa84..d070930dd 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3) on update current_timestamp(3), + started timestamp(3) null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -98,6 +99,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3), + started timestamp(3) null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql index e37a982ec..9427681a7 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null default current_timestamp on update current_timestamp, + started timestamp null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -108,6 +109,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null, + started timestamp null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql index e726f3fc4..9215412d0 100644 --- a/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql @@ -15,6 +15,7 @@ create table nflow_workflow ( retries int default 0 not null, created timestamp default current_timestamp not null, modified timestamp default current_timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group), @@ -157,6 +158,7 @@ create table nflow_archive_workflow ( retries int not null, created timestamp not null, modified timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql index 614f2434d..f2278a7f5 100644 --- a/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql @@ -18,6 +18,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamptz not null default current_timestamp, modified timestamptz not null default current_timestamp, + started timestamptz, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -118,6 +119,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamptz not null, modified timestamptz not null, + started timestamptz, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql index 8562a37b5..19b8ac667 100644 --- a/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql @@ -17,6 +17,7 @@ create table nflow_workflow ( retries int not null default 0, created datetimeoffset(3) not null default SYSDATETIMEOFFSET(), modified datetimeoffset(3) not null default SYSDATETIMEOFFSET(), + started datetimeoffset(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -113,6 +114,7 @@ create table nflow_archive_workflow ( retries int not null default 0, created datetimeoffset(3) not null, modified datetimeoffset(3) not null, + started datetimeoffset(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/db2.update.ddl.sql new file mode 100644 index 000000000..131e58538 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/db2.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamp(3); +alter table nflow_archive_workflow add started timestamp(3); + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/h2.update.ddl.sql new file mode 100644 index 000000000..d8662c53b --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/h2.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamp; +alter table nflow_archive_workflow add started timestamp; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.legacy.update.ddl.sql new file mode 100644 index 000000000..82cd6ad3a --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.legacy.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamp null; +alter table nflow_archive_workflow add started timestamp null; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.update.ddl.sql new file mode 100644 index 000000000..2f188455f --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamp(3) null; +alter table nflow_archive_workflow add started timestamp(3) null; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/oracle.update.ddl.sql new file mode 100644 index 000000000..d8662c53b --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/oracle.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamp; +alter table nflow_archive_workflow add started timestamp; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/postgresql.update.ddl.sql new file mode 100644 index 000000000..fac22a528 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/postgresql.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started timestamptz; +alter table nflow_archive_workflow add started timestamptz; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/sqlserver.update.ddl.sql new file mode 100644 index 000000000..5c674bdf2 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/sqlserver.update.ddl.sql @@ -0,0 +1,7 @@ +alter table nflow_workflow add started datetimeoffset(3); +alter table nflow_archive_workflow add started datetimeoffset(3); + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 07ebf0e1d..6b7914d6b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.dao; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; @@ -17,13 +19,13 @@ import static org.apache.commons.lang3.StringUtils.countMatches; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.joda.time.DateTime.now; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -48,7 +50,6 @@ import javax.inject.Inject; -import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -153,9 +154,10 @@ public void updateWorkflowInstance() throws InterruptedException { int id = dao.insertWorkflowInstance(i1); List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); + DateTime started = now(); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) - .setState("updateState").setStateText("update text").build(); + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") + .setStateText("update text").setStartedIfNotSet(started).build(); final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null); assertThat(polledInstance.status, equalTo(executing)); final DateTime originalModifiedTime = polledInstance.modified; @@ -174,6 +176,7 @@ public void processRow(ResultSet rs) throws SQLException { assertThat(rs.getTimestamp("next_activation").getTime(), equalTo(i2.nextActivation.toDate().getTime())); assertThat(rs.getInt("executor_id") != 0, equalTo(i2.status == executing)); assertThat(rs.getTimestamp("modified").getTime(), greaterThan(originalModifiedTime.getMillis())); + assertThat(rs.getTimestamp("started").getTime(), is(equalTo(started.getMillis()))); } }); QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().setBusinessKey("newKey").build(); @@ -446,14 +449,15 @@ public void updatingNextActivationToNullWhileExternalNextActivationIsNotNull() { int workflowId = dao.insertWorkflowInstance(instance); assertTrue(workflowId > -1); jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), workflowId); - assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(now().getMillis()), workflowId), is(1)); - dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)) - .setNextActivation(null).build()); + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + i = new WorkflowInstance.Builder(i).setNextActivation(null).build(); + + dao.updateWorkflowInstance(i); + WorkflowInstance updated = dao.getWorkflowInstance(workflowId, emptySet(), null); - assertThat(updated.nextActivation, is(CoreMatchers.nullValue())); + assertThat(updated.nextActivation, is(nullValue())); } @Test @@ -469,10 +473,10 @@ public void updatingNextActivationWhenExternalNextActivationIsEarlier() { assertTrue(workflowId > -1); assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(now.getMillis()), workflowId), is(1)); - assertThat( - dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).build()), - is(1)); + + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + + assertThat(dao.updateWorkflowInstance(i), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } @@ -488,9 +492,8 @@ public void updatingNextActivationWhenExternalNextActivationIsLater() { assertTrue(workflowId > -1); assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(future.getMillis()), workflowId), is(1)); - assertThat(dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).build()), - is(1)); + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + assertThat(dao.updateWorkflowInstance(i), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } @@ -501,7 +504,7 @@ public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedExcept List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) .setState("updateState").setStateText("update text").build(); sleep(1); assertThrows(IllegalArgumentException.class, @@ -528,10 +531,11 @@ public void fakePostgreSQLupdateWorkflowInstance() { assertEquals("with wf as (update nflow_workflow set status = ?::workflow_status, state = ?, state_text = ?, " + "next_activation = (case when ?::timestamptz is null then null when external_next_activation is null then " + "?::timestamptz else least(?::timestamptz, external_next_activation) end), external_next_activation = null, " - + "executor_id = ?, retries = ? where id = ? and executor_id = 42 returning id), " + + "executor_id = ?, retries = ?, started = (case when started is null then ? else started end) where id = ? " + + "and executor_id = 42 returning id), " + "act as (insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, " + "execution_start, execution_end) select wf.id, ?, ?::action_type, ?, ?, ?, ?, ? from wf returning id), " - + "ins16 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + + "ins17 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + "select wf.id,act.id,?,? from wf,act) select act.id from act", sql.getValue()); assertThat(args.getAllValues().size(), is(countMatches(sql.getValue(), "?"))); @@ -544,6 +548,7 @@ public void fakePostgreSQLupdateWorkflowInstance() { assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(i2.nextActivation.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) 42)); assertThat(args.getAllValues().get(i++), is((Object) i2.retries)); + assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(a1.executionStart.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) i2.id)); assertThat(args.getAllValues().get(i++), is((Object) 42)); assertThat(args.getAllValues().get(i++), is((Object) a1.type.name())); @@ -567,7 +572,8 @@ public void fakePostgreSQLinsertWorkflowInstance() { DateTime started = now(); WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") .setRootWorkflowId(9283).setParentWorkflowId(110).setParentActionId(421).setNextActivation(started.plusSeconds(1)) - .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)).build(); + .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) + .setStartedIfNotSet(started).build(); d.insertWorkflowInstance(wf); assertEquals( @@ -602,7 +608,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { @Test public void insertWorkflowInstanceActionWorks() { DateTime started = now(); - final WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStarted(started).build(); + final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); int id = dao.insertWorkflowInstance(i1); final WorkflowInstanceAction a1 = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42) @@ -720,7 +726,7 @@ public void getWorkflowInstanceStateWorks() { @Test public void insertingSubWorkflowWorks() { DateTime started = now(); - final WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStarted(started).build(); + final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("b", "2"); int parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); @@ -740,8 +746,7 @@ public void insertingSubWorkflowWorks() { assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId)); assertThat(i2.parentActionId, equalTo(parentActionId)); - WorkflowInstance parent = dao.getWorkflowInstance(parentWorkflowId, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), - null); + WorkflowInstance parent = dao.getWorkflowInstance(parentWorkflowId, EnumSet.of(CHILD_WORKFLOW_IDS), null); assertThat(parent.childWorkflows.get(parentActionId), containsInAnyOrder(subWorkflowId1, subWorkflowId2)); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index a1fcd302e..30fc16c39 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -2,6 +2,8 @@ import static edu.umd.cs.mtc.TestFramework.runOnce; import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyInt; @@ -206,7 +208,7 @@ class ShutdownWithoutStart extends MultithreadedTestCase { public void threadShutdown() { dispatcher.shutdown(); - assertFalse(dispatcher.isRunning()); + assertThat(dispatcher.isRunning(), is(false)); } } runOnce(new ShutdownWithoutStart()); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index c021ee15a..fe7816672 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.executor; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.moveToStateAfter; import static io.nflow.engine.workflow.definition.NextAction.retryAfter; @@ -28,7 +30,18 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; import java.util.ArrayList; import java.util.EnumSet; @@ -153,8 +166,7 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { private final DateTime tomorrow = now().plusDays(1); - private final Set INCLUDES = EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS, - WorkflowInstanceInclude.CURRENT_STATE_VARIABLES, WorkflowInstanceInclude.STARTED); + private final Set INCLUDES = EnumSet.of(CHILD_WORKFLOW_IDS, CURRENT_STATE_VARIABLES); @BeforeEach public void setup() { @@ -290,8 +302,8 @@ public void instanceWithUnsupportedStateIsRescheduled() { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, - is("Unsupported workflow state"), greaterThanOrEqualTo(oneHourInFuture)))); + argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, is("Unsupported workflow state"), + greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } @Test @@ -301,9 +313,9 @@ public void workflowStatusIsSetToManualForManualStates() { when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); executor.run(); verify(workflowInstanceDao).updateWorkflowInstanceAfterExecution( - MockitoHamcrest.argThat(matchesWorkflowInstance(manual, SimpleTestWorkflow.State.manualState, 0, is("Stopped in state manualState"), + argThat(matchesWorkflowInstance(manual, SimpleTestWorkflow.State.manualState, 0, is("Stopped in state manualState"), nullValue(DateTime.class))), - MockitoHamcrest.argThat(matchesWorkflowInstanceAction(SimpleTestWorkflow.State.beforeManual, is("Move to manual state."), + argThat(matchesWorkflowInstanceAction(SimpleTestWorkflow.State.beforeManual, is("Move to manual state."), simpleWf.getSettings().maxRetries, stateExecution)), argThat(isEmptyWorkflowList()), argThat(isEmptyWorkflowList()), eq(true)); } @@ -353,8 +365,8 @@ public NextAction answer(InvocationOnMock invocation) { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, SimpleTestWorkflow.State.start, 0, is("Scheduled by previous state start"), - is(skipped)))); + argThat(matchesWorkflowInstance(inProgress, SimpleTestWorkflow.State.start, 0, is("Scheduled by previous state start"), + is(skipped), is(nullValue())))); } @Test @@ -614,14 +626,19 @@ public void runWorkflowWithParameters() { assertThat(state.get("hello"), is("[1,2,3]")); } - private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, - int retries, Matcher stateTextMatcher) { + private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, int retries, + Matcher stateTextMatcher) { return matchesWorkflowInstance(status, state, retries, stateTextMatcher, Matchers.any(DateTime.class)); } + private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, int retries, + Matcher stateTextMatcher, Matcher nextActivationMatcher) { + return matchesWorkflowInstance(status, state, retries, stateTextMatcher, nextActivationMatcher, Matchers.any(DateTime.class)); + } + private Matcher matchesWorkflowInstance(final WorkflowInstanceStatus status, final WorkflowState state, final int retries, final Matcher stateTextMatcher, - final Matcher nextActivationMatcher) { + final Matcher nextActivationMatcher, final Matcher startedMatcher) { return new TypeSafeMatcher() { @Override public void describeTo(Description description) { @@ -636,6 +653,7 @@ protected boolean matchesSafely(WorkflowInstance i) { assertThat(i.stateText, stateTextMatcher); assertThat(i.retries, is(retries)); assertThat(i.nextActivation, nextActivationMatcher); + assertThat(i.started, startedMatcher); return true; } }; @@ -732,8 +750,8 @@ public void instanceWithUnsupportedTypeIsRescheduled() { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, - is("Unsupported workflow type"), greaterThanOrEqualTo(oneHourInFuture)))); + argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type"), + greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } @Test diff --git a/nflow-explorer/pom.xml b/nflow-explorer/pom.xml index c05fdb393..a9025977b 100644 --- a/nflow-explorer/pom.xml +++ b/nflow-explorer/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT .. diff --git a/nflow-jetty/pom.xml b/nflow-jetty/pom.xml index 080964334..c31df1ec0 100644 --- a/nflow-jetty/pom.xml +++ b/nflow-jetty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-metrics/pom.xml b/nflow-metrics/pom.xml index 9d570c87d..09a3bd21f 100644 --- a/nflow-metrics/pom.xml +++ b/nflow-metrics/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-netty/pom.xml b/nflow-netty/pom.xml index 4d1d27bbc..741b7b301 100644 --- a/nflow-netty/pom.xml +++ b/nflow-netty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-perf-test/pom.xml b/nflow-perf-test/pom.xml index 72aa21d0e..9181467bb 100644 --- a/nflow-perf-test/pom.xml +++ b/nflow-perf-test/pom.xml @@ -13,7 +13,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java b/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java index 04a39090c..f86127e67 100644 --- a/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java +++ b/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java @@ -81,7 +81,7 @@ private WorkflowInstance generateWorkflowInstance(List nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 59300d38d..718c79c69 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -152,8 +152,6 @@ public Collection listWorkflowInstances(final List Collection instances = workflowInstances.listWorkflowInstances(q); List resp = new ArrayList<>(); Set parseIncludeEnums = parseIncludeEnums(include); - // TODO: move to include parameters in next major version - parseIncludeEnums.add(WorkflowInstanceInclude.STARTED); for (WorkflowInstance instance : instances) { resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums)); } @@ -173,8 +171,6 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance(final int id, final St final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) throws EmptyResultDataAccessException { Set includes = parseIncludeEnums(include); - // TODO: move to include parameters in next major version - includes.add(WorkflowInstanceInclude.STARTED); WorkflowInstance instance = workflowInstances.getWorkflowInstance(id, includes, maxActions); return listWorkflowConverter.convert(instance, includes); } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java index 54da7cc1b..0702d0a4d 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java @@ -46,9 +46,7 @@ public ListWorkflowInstanceResponse convert(WorkflowInstance instance, Set stateVariables = new LinkedHashMap<>(); stateVariables.put("foo", "1"); stateVariables.put("bar", "quux"); + DateTime started = now(); WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setStatus(inProgress).setType("dummy") .setBusinessKey("businessKey").setParentWorkflowId(942).setParentActionId(842).setExternalId("externalId") .setState("cState").setStateText("cState desc").setNextActivation(now()).setActions(asList(a)) .setCreated(now().minusMinutes(1)).setCreated(now().minusHours(2)).setModified(now().minusHours(1)).setRetries(42) - .setStateVariables(stateVariables).setSignal(Optional.of(42)).build(); + .setStateVariables(stateVariables).setSignal(Optional.of(42)).setStartedIfNotSet(started).build(); JsonNode node1 = mock(JsonNode.class); JsonNode nodeQuux = mock(JsonNode.class); diff --git a/nflow-rest-api-jax-rs/pom.xml b/nflow-rest-api-jax-rs/pom.xml index fe9ca2c1e..7c0c322e7 100644 --- a/nflow-rest-api-jax-rs/pom.xml +++ b/nflow-rest-api-jax-rs/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 0dde236a5..73f957846 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -3,6 +3,7 @@ import static com.nitorcreations.Matchers.hasField; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.externalChange; import static java.util.Arrays.asList; +import static java.util.Collections.emptySet; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.contains; @@ -217,7 +218,7 @@ public void listWorkflowInstancesWorksWithAllIncludes() { @Test public void fetchingNonExistingWorkflowThrowsNotFoundException() { - when(workflowInstances.getWorkflowInstance(42, EnumSet.of(WorkflowInstanceInclude.STARTED), null)) + when(workflowInstances.getWorkflowInstance(42, emptySet(), null)) .thenThrow(EmptyResultDataAccessException.class); assertThrows(NotFoundException.class, () -> resource.fetchWorkflowInstance(42, null, null)); } @@ -226,24 +227,26 @@ public void fetchingNonExistingWorkflowThrowsNotFoundException() { @Test public void fetchingExistingWorkflowWorks() { WorkflowInstance instance = mock(WorkflowInstance.class); - when(workflowInstances.getWorkflowInstance(42, EnumSet.of(WorkflowInstanceInclude.STARTED), null)).thenReturn(instance); + when(workflowInstances.getWorkflowInstance(42, emptySet(), null)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, null, null); - verify(workflowInstances).getWorkflowInstance(42, EnumSet.of(WorkflowInstanceInclude.STARTED), null); + verify(workflowInstances).getWorkflowInstance(42, emptySet(), null); assertEquals(resp, result); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void fetchingExistingWorkflowWorksWithAllIncludes() { WorkflowInstance instance = mock(WorkflowInstance.class); - when(workflowInstances.getWorkflowInstance(42, EnumSet.allOf(WorkflowInstanceInclude.class), 10L)).thenReturn(instance); + EnumSet includes = EnumSet.allOf(WorkflowInstanceInclude.class); + includes.remove(WorkflowInstanceInclude.STARTED); + when(workflowInstances.getWorkflowInstance(42, includes, 10L)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, "actions,currentStateVariables,actionStateVariables,childWorkflows", 10L); - verify(workflowInstances).getWorkflowInstance(42, EnumSet.allOf(WorkflowInstanceInclude.class), 10L); + verify(workflowInstances).getWorkflowInstance(42, includes, 10L); assertEquals(resp, result); } diff --git a/nflow-rest-api-spring-web/pom.xml b/nflow-rest-api-spring-web/pom.xml index 1fdd0e02f..b80cc7ad8 100644 --- a/nflow-rest-api-spring-web/pom.xml +++ b/nflow-rest-api-spring-web/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-server-common/pom.xml b/nflow-server-common/pom.xml index 1d3f6c520..25d50f8b7 100644 --- a/nflow-server-common/pom.xml +++ b/nflow-server-common/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-tests/pom.xml b/nflow-tests/pom.xml index 1e5e96026..5926dac76 100644 --- a/nflow-tests/pom.xml +++ b/nflow-tests/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 72ca8032c..4fd0a2921 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ nflow-root pom nFlow Root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT http://nflow.io