From 5edbb88d7367a9bb805c25be41f919aacae1c154 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 14 May 2019 21:20:49 +0300 Subject: [PATCH 01/20] add started timestamp to workflow instance table --- CHANGELOG.md | 2 + .../engine/internal/dao/ExecutorDao.java | 6 +- .../internal/dao/WorkflowInstanceDao.java | 91 +++++++++---------- .../executor/WorkflowStateProcessor.java | 24 +++-- .../service/WorkflowInstanceInclude.java | 5 - .../resources/scripts/db/db2.create.ddl.sql | 2 + .../resources/scripts/db/h2.create.ddl.sql | 2 + .../resources/scripts/db/mysql.create.ddl.sql | 2 + .../scripts/db/mysql.legacy.create.ddl.sql | 2 + .../scripts/db/oracle.create.ddl.sql | 2 + .../scripts/db/postgresql.create.ddl.sql | 2 + .../scripts/db/sqlserver.create.ddl.sql | 2 + .../db/update-5.5.0-x/db2.update.ddl.sql | 1 + .../db/update-5.5.0-x/h2.update.ddl.sql | 1 + .../mysql.legacy.update.ddl.sql | 1 + .../db/update-5.5.0-x/mysql.update.ddl.sql | 1 + .../db/update-5.5.0-x/oracle.update.ddl.sql | 1 + .../update-5.5.0-x/postgresql.update.ddl.sql | 1 + .../update-5.5.0-x/sqlserver.update.ddl.sql | 1 + .../internal/dao/WorkflowInstanceDaoTest.java | 55 ++++++----- .../executor/WorkflowDispatcherTest.java | 4 +- .../executor/WorkflowStateProcessorTest.java | 37 +++++--- .../java/io/nflow/rest/v1/ResourceBase.java | 4 - .../ListWorkflowInstanceConverter.java | 4 +- .../jaxrs/WorkflowInstanceResourceTest.java | 7 +- 25 files changed, 142 insertions(+), 118 deletions(-) create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql create mode 100644 nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a6914586..1f55032ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ **Highlights** - Support non-enum WorkflowStates to enable, for example, Kotlin workflow definitions by extending AbstractWorkflowDefinition. +- Added `started` timestamp to workflow instance table (requires database update) **Details** - Dependency and plugin updates: @@ -20,6 +21,7 @@ - Log more executor details on startup - Fix #311: Replace references to WorkflowDefinition with AbstractWorkflowDefinition to support non-enum WorkflowStates - Use name() instead of toString() when getting workflow instance initial state name + - Deprecated WorkflowInstanceInclude.STARTED enum value ## 5.5.0 (2019-04-04) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java index 153f6a1c9..ea04e7c44 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java @@ -146,10 +146,10 @@ public PreparedStatement createPreparedStatement(Connection con) throws SQLExcep return p; } }, keyHolder); - int executorId = keyHolder.getKey().intValue(); + int allocatedExecutorId = keyHolder.getKey().intValue(); logger.info("Joined executor group {} as executor {} running on host {} with process id {}.", - executorGroup, executorId, host, pid); - return executorId; + executorGroup, allocatedExecutorId, host, pid); + return allocatedExecutorId; } public void updateActiveTimestamp() { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index f03acb485..9994baee9 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -2,7 +2,6 @@ import static io.nflow.engine.internal.dao.DaoUtil.firstColumnLengthExtractor; import static io.nflow.engine.internal.dao.DaoUtil.getInt; -import static io.nflow.engine.internal.dao.DaoUtil.toDateTime; import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; @@ -184,7 +183,7 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { Object[] instanceValues = new Object[] { instance.type, instance.rootWorkflowId, instance.parentWorkflowId, instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), - toTimestamp(instance.nextActivation), instance.signal.orElse(null) }; + toTimestamp(instance.nextActivation), instance.signal.orElse(null), toTimestamp(instance.started) }; int pos = instanceValues.length; Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2); for (Entry var : instance.stateVariables.entrySet()) { @@ -203,8 +202,8 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { String insertWorkflowInstanceSql() { return "insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, external_id, " - + "executor_group, status, state, state_text, next_activation, workflow_signal) values (?, ?, ?, ?, ?, ?, ?, " - + sqlVariants.workflowStatus() + ", ?, ?, ?, ?)"; + + "executor_group, status, state, state_text, next_activation, workflow_signal, started) values (?, ?, ?, ?, ?, ?, ?, " + + sqlVariants.workflowStatus() + ", ?, ?, ?, ?, ?)"; } String insertWorkflowInstanceStateSql() { @@ -236,6 +235,7 @@ private int insertWorkflowInstanceWithTransaction(final WorkflowInstance instanc } else { ps.setNull(p++, Types.INTEGER); } + sqlVariants.setDateTime(ps, p++, instance.started); return ps; }, keyHolder); } catch (DuplicateKeyException e) { @@ -323,16 +323,16 @@ public void updateWorkflowInstanceAfterExecution(WorkflowInstance instance, Work updateWorkflowInstanceWithTransaction(instance, action, childWorkflows, workflows, changedStateVariables); } } else { - updateWorkflowInstance(instance); + updateWorkflowInstance(instance, action.executionStart); } } - public int updateWorkflowInstance(WorkflowInstance instance) { + public int updateWorkflowInstance(WorkflowInstance instance, DateTime started) { // using sqlVariants.nextActivationUpdate() requires that nextActivation is used 3 times Object nextActivation = sqlVariants.toTimestampObject(instance.nextActivation); return jdbc.update(updateWorkflowInstanceSql(), instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), nextActivation, nextActivation, nextActivation, - instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, instance.id); + instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, toTimestamp(started), instance.id); } private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instance, final WorkflowInstanceAction action, @@ -341,7 +341,7 @@ private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instan transaction.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - updateWorkflowInstance(instance); + updateWorkflowInstance(instance, action.executionStart); int parentActionId = insertWorkflowInstanceAction(action); insertVariables(action.workflowInstanceId, parentActionId, changedStateVariables); for (WorkflowInstance childTemplate : childWorkflows) { @@ -368,8 +368,8 @@ public void recoverWorkflowInstancesFromDeadNodes() { private List getRecoverableInstances() { String sql = "select id, state from nflow_workflow where executor_id in (select id from nflow_executor where " - + executorInfo.getExecutorGroupCondition() + " and id <> " + executorInfo.getExecutorId() - + " and " + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")"; + + executorInfo.getExecutorGroupCondition() + " and id <> " + executorInfo.getExecutorId() + " and " + + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")"; return jdbc.query(sql, new RowMapper() { @Override public InstanceInfo mapRow(ResultSet rs, int rowNum) throws SQLException { @@ -386,12 +386,10 @@ private void recoverWorkflowInstance(final int instanceId, final WorkflowInstanc @Override protected void doInTransactionWithoutResult(TransactionStatus status) { int executorId = executorInfo.getExecutorId(); - int updated = jdbc.update( - "update nflow_workflow set executor_id = null, status = " + sqlVariants.workflowStatus(inProgress) - + " where id = ? and executor_id in (select id from nflow_executor where " - + executorInfo.getExecutorGroupCondition() + " and id <> " + executorId + " and " - + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")", - instanceId); + int updated = jdbc.update("update nflow_workflow set executor_id = null, status = " + + sqlVariants.workflowStatus(inProgress) + " where id = ? and executor_id in (select id from nflow_executor where " + + executorInfo.getExecutorGroupCondition() + " and id <> " + executorId + " and " + + sqlVariants.dateLtEqDiff("expires", "current_timestamp") + ")", instanceId); if (updated > 0) { insertWorkflowInstanceAction(action); } @@ -411,10 +409,9 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work Timestamp nextActivation = toTimestamp(instance.nextActivation); Object[] fixedValues = new Object[] { instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), nextActivation, nextActivation, nextActivation, - instance.status == executing ? executorId : null, instance.retries, instance.id, executorId, action.type.name(), - action.state, abbreviate(action.stateText, getActionStateTextLength()), action.retryNo, - toTimestamp(action.executionStart), - toTimestamp(action.executionEnd) }; + instance.status == executing ? executorId : null, instance.retries, toTimestamp(action.executionStart), instance.id, + executorId, action.type.name(), action.state, abbreviate(action.stateText, getActionStateTextLength()), action.retryNo, + toTimestamp(action.executionStart), toTimestamp(action.executionEnd) }; int pos = fixedValues.length; Object[] args = Arrays.copyOf(fixedValues, pos + changedStateVariables.size() * 2); for (Entry var : changedStateVariables.entrySet()) { @@ -434,7 +431,8 @@ String insertWorkflowActionSql() { private String updateWorkflowInstanceSql() { return "update nflow_workflow set status = " + sqlVariants.workflowStatus() + ", state = ?, state_text = ?, " + "next_activation = " + sqlVariants.nextActivationUpdate() - + ", external_next_activation = null, executor_id = ?, retries = ? where id = ? and executor_id = " + + ", external_next_activation = null, executor_id = ?, retries = ?, " + + "started = (case when started is null then ? else started end) where id = ? and executor_id = " + executorInfo.getExecutorId(); } @@ -465,7 +463,8 @@ public boolean updateNotRunningWorkflowInstance(WorkflowInstance instance) { @Transactional public boolean wakeUpWorkflowExternally(int workflowInstanceId, List expectedStates) { StringBuilder sql = new StringBuilder("update nflow_workflow set next_activation = (case when executor_id is null then ") - .append("case when " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + " then next_activation else current_timestamp end else next_activation end), ") + .append("case when " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + + " then next_activation else current_timestamp end else next_activation end), ") .append("external_next_activation = current_timestamp where ").append(executorInfo.getExecutorGroupCondition()) .append(" and id = ? and next_activation is not null"); return addExpectedStatesToQueryAndUpdate(sql, workflowInstanceId, expectedStates); @@ -494,13 +493,8 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl } public WorkflowInstance getWorkflowInstance(int id, Set includes, Long maxActions) { - String sql = "select * from nflow_workflow w where w.id = ?"; - WorkflowInstance.Builder builder = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id); - if (includes.contains(WorkflowInstanceInclude.STARTED)) { - builder.setStarted(toDateTime(jdbc - .queryForObject("select min(execution_start) from nflow_workflow_action where workflow_id = ?", Timestamp.class, id))); - } - WorkflowInstance instance = builder.build(); + String sql = "select * from nflow_workflow where id = ?"; + WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build(); if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) { fillState(instance); } @@ -514,10 +508,9 @@ public WorkflowInstance getWorkflowInstance(int id, Set } private void fillState(final WorkflowInstance instance) { - jdbc.query( - "select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " - + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " - + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", + jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " + + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " + + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", new RowCallbackHandler() { @Override public void processRow(ResultSet rs) throws SQLException { @@ -541,9 +534,8 @@ String updateInstanceForExecutionQuery() { String whereConditionForInstanceUpdate() { return "where executor_id is null and status in (" + sqlVariants.workflowStatus(created) + ", " - + sqlVariants.workflowStatus(inProgress) + ") and " - + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + " and " - + executorInfo.getExecutorGroupCondition() + " order by next_activation asc"; + + sqlVariants.workflowStatus(inProgress) + ") and " + sqlVariants.dateLtEqDiff("next_activation", "current_timestamp") + + " and " + executorInfo.getExecutorGroupCondition() + " order by next_activation asc"; } private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) { @@ -558,8 +550,8 @@ private List pollNextWorkflowInstanceIdsWithTransaction(final int batch @Override public List doInTransaction(TransactionStatus transactionStatus) { String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize); - List instances = jdbc.query(sql, (rs, rowNum) -> - new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"))); + List instances = jdbc.query(sql, + (rs, rowNum) -> new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"))); if (instances.isEmpty()) { return emptyList(); } @@ -635,10 +627,7 @@ public int compareTo(OptimisticLockKey other) { } public List queryWorkflowInstances(QueryWorkflowInstances query) { - String sql = "select w.*, " - + "(select min(execution_start) from nflow_workflow_action a where a.workflow_id = w.id) as started " - + "from nflow_workflow w "; - + String sql = "select w.* from nflow_workflow w "; List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); conditions.add(executorInfo.getExecutorGroupCondition()); @@ -722,8 +711,9 @@ private long getMaxResults(Long maxResults) { private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) : EMPTY_ACTION_STATE_MAP; - String sql = sqlVariants.limit("select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", - getMaxActions(maxActions)); + String sql = sqlVariants.limit( + "select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", + getMaxActions(maxActions)); instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id)); } @@ -790,10 +780,11 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep .setState(rs.getString("state")) // .setStateText(rs.getString("state_text")) // .setActions(new ArrayList()) // - .setNextActivation(sqlVariants.getDateTime(rs,"next_activation")) // + .setNextActivation(sqlVariants.getDateTime(rs, "next_activation")) // .setRetries(rs.getInt("retries")) // - .setCreated(sqlVariants.getDateTime(rs,"created")) // - .setModified(sqlVariants.getDateTime(rs,"modified")) // + .setCreated(sqlVariants.getDateTime(rs, "created")) // + .setModified(sqlVariants.getDateTime(rs, "modified")) // + .setStarted(sqlVariants.getDateTime(rs, "started")) // .setExecutorGroup(rs.getString("executor_group")) // .setSignal(ofNullable(getInt(rs, "workflow_signal"))); } @@ -877,9 +868,9 @@ public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer his MapSqlParameterSource params = new MapSqlParameterSource(); params.addValue("workflowId", workflowInstanceId); params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); - Integer maxActionId = namedJdbc.queryForObject( - "select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + - sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); + Integer maxActionId = namedJdbc + .queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); int deletedActions = 0; if (maxActionId != null) { params.addValue("maxActionId", maxActionId); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 80e5b3e24..a7cf1c79f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.executor; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.stopInState; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; @@ -8,6 +10,7 @@ import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; import static org.joda.time.DateTime.now; @@ -17,7 +20,6 @@ import static org.springframework.util.ReflectionUtils.invokeMethod; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.Iterator; import java.util.List; @@ -41,7 +43,6 @@ import io.nflow.engine.listener.WorkflowExecutorListener; import io.nflow.engine.listener.WorkflowExecutorListener.ListenerContext; import io.nflow.engine.service.WorkflowDefinitionService; -import io.nflow.engine.service.WorkflowInstanceInclude; import io.nflow.engine.service.WorkflowInstanceService; import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition; import io.nflow.engine.workflow.definition.NextAction; @@ -119,9 +120,7 @@ public void run() { private void runImpl() { logger.debug("Starting."); WorkflowInstance instance = workflowInstances.getWorkflowInstance(instanceId, - EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS, WorkflowInstanceInclude.CURRENT_STATE_VARIABLES, - WorkflowInstanceInclude.STARTED), - null); + EnumSet.of(CHILD_WORKFLOW_IDS, CURRENT_STATE_VARIABLES), null); logIfLagging(instance); AbstractWorkflowDefinition definition = workflowDefinitions.getWorkflowDefinition(instance.type); if (definition == null) { @@ -178,7 +177,7 @@ private void rescheduleUnknownWorkflowType(WorkflowInstance instance) { logger.warn("Workflow type {} not configured to this nFlow instance - rescheduling workflow instance", instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowTypeRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow type").build(); - workflowInstanceDao.updateWorkflowInstance(instance); + workflowInstanceDao.updateWorkflowInstance(instance, now()); logger.debug("Finished."); } @@ -187,7 +186,7 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) { instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowStateRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow state").build(); - workflowInstanceDao.updateWorkflowInstance(instance); + workflowInstanceDao.updateWorkflowInstance(instance, now()); logger.debug("Finished."); } @@ -246,18 +245,17 @@ void setInternalRetryEnabled(boolean internalRetryEnabled) { private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance, WorkflowInstanceAction.Builder actionBuilder, WorkflowInstance.Builder builder) { + WorkflowInstanceAction action = actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)) + .setStateText(execution.getNextStateReason()).build(); if (execution.isStateProcessInvoked()) { - actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)).setStateText(execution.getNextStateReason()); if (execution.isFailed()) { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), actionBuilder.build(), - Collections. emptyList(), Collections. emptyList(), true); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), action, emptyList(), emptyList(), true); } else { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), actionBuilder.build(), - execution.getNewChildWorkflows(), execution.getNewWorkflows(), execution.createAction()); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), action, execution.getNewChildWorkflows(), execution.getNewWorkflows(), execution.createAction()); processSuccess(execution, instance); } } else { - workflowInstanceDao.updateWorkflowInstance(builder.build()); + workflowInstanceDao.updateWorkflowInstance(builder.build(), action.executionStart); } return builder.setOriginalStateVariables(instance.stateVariables).build(); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java index 883dcdd1a..afcb33849 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java @@ -5,11 +5,6 @@ */ public enum WorkflowInstanceInclude { - /** - * The execution start date of the first action of the workflow instance (WorkflowInstance.started). - */ - STARTED, - /** * The most recent values of all state variables (WorkflowInstance.stateVariables). */ diff --git a/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql index c7e3f04a0..0fed1dc0a 100644 --- a/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/db2.create.ddl.sql @@ -17,6 +17,7 @@ create table nflow_workflow ( retries int not null default 0, created timestamp(3) not null default current_timestamp, modified timestamp(3) not null default current_timestamp, + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -112,6 +113,7 @@ create table nflow_archive_workflow ( retries int not null default 0, created timestamp(3) not null, modified timestamp(3) not null, + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql index 5120ed186..38a3202cd 100644 --- a/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/h2.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp not null default current_timestamp, modified timestamp not null default current_timestamp, + started timestamp, executor_group varchar(64) not null, workflow_signal int ); @@ -99,6 +100,7 @@ create table if not exists nflow_archive_workflow ( retries int not null, created timestamp not null, modified timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int ); diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql index c35a9aa84..8dfb2eec8 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3) on update current_timestamp(3), + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -98,6 +99,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3), + started timestamp(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql index e37a982ec..6e984a3c8 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql @@ -16,6 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null default current_timestamp on update current_timestamp, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -108,6 +109,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql index e726f3fc4..9215412d0 100644 --- a/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/oracle.create.ddl.sql @@ -15,6 +15,7 @@ create table nflow_workflow ( retries int default 0 not null, created timestamp default current_timestamp not null, modified timestamp default current_timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group), @@ -157,6 +158,7 @@ create table nflow_archive_workflow ( retries int not null, created timestamp not null, modified timestamp not null, + started timestamp, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql index 614f2434d..f2278a7f5 100644 --- a/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/postgresql.create.ddl.sql @@ -18,6 +18,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamptz not null default current_timestamp, modified timestamptz not null default current_timestamp, + started timestamptz, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -118,6 +119,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamptz not null, modified timestamptz not null, + started timestamptz, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql index 8562a37b5..19b8ac667 100644 --- a/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/sqlserver.create.ddl.sql @@ -17,6 +17,7 @@ create table nflow_workflow ( retries int not null default 0, created datetimeoffset(3) not null default SYSDATETIMEOFFSET(), modified datetimeoffset(3) not null default SYSDATETIMEOFFSET(), + started datetimeoffset(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -113,6 +114,7 @@ create table nflow_archive_workflow ( retries int not null default 0, created datetimeoffset(3) not null, modified datetimeoffset(3) not null, + started datetimeoffset(3), executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql new file mode 100644 index 000000000..495339bd8 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamp(3); diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql new file mode 100644 index 000000000..32d7caf6b --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamp; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql new file mode 100644 index 000000000..32d7caf6b --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamp; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql new file mode 100644 index 000000000..495339bd8 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamp(3); diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql new file mode 100644 index 000000000..32d7caf6b --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamp; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql new file mode 100644 index 000000000..88a32213c --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started timestamptz; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql new file mode 100644 index 000000000..862c12b73 --- /dev/null +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql @@ -0,0 +1 @@ +alter table nflow_workflow add started datetimeoffset(3); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 07ebf0e1d..e4f62b096 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.dao; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; @@ -17,13 +19,13 @@ import static org.apache.commons.lang3.StringUtils.countMatches; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; import static org.joda.time.DateTime.now; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -48,7 +50,6 @@ import javax.inject.Inject; -import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -154,8 +155,8 @@ public void updateWorkflowInstance() throws InterruptedException { List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) - .setState("updateState").setStateText("update text").build(); + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") + .setStateText("update text").build(); final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null); assertThat(polledInstance.status, equalTo(executing)); final DateTime originalModifiedTime = polledInstance.modified; @@ -442,18 +443,20 @@ public void updateNotRunningWorkflowInstanceDoesNotUpdateRunningInstance() { @Test public void updatingNextActivationToNullWhileExternalNextActivationIsNotNull() { + DateTime started = now(); WorkflowInstance instance = constructWorkflowInstanceBuilder().setNextActivation(null).build(); int workflowId = dao.insertWorkflowInstance(instance); assertTrue(workflowId > -1); jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), workflowId); - assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(now().getMillis()), workflowId), is(1)); - dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)) - .setNextActivation(null).build()); + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + i = new WorkflowInstance.Builder(i).setNextActivation(null).build(); + + dao.updateWorkflowInstance(i, started); + WorkflowInstance updated = dao.getWorkflowInstance(workflowId, emptySet(), null); - assertThat(updated.nextActivation, is(CoreMatchers.nullValue())); + assertThat(updated.nextActivation, is(nullValue())); } @Test @@ -469,10 +472,10 @@ public void updatingNextActivationWhenExternalNextActivationIsEarlier() { assertTrue(workflowId > -1); assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(now.getMillis()), workflowId), is(1)); - assertThat( - dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).build()), - is(1)); + + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + + assertThat(dao.updateWorkflowInstance(i, now), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } @@ -488,9 +491,8 @@ public void updatingNextActivationWhenExternalNextActivationIsLater() { assertTrue(workflowId > -1); assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(future.getMillis()), workflowId), is(1)); - assertThat(dao.updateWorkflowInstance(new WorkflowInstance.Builder( - dao.getWorkflowInstance(workflowId, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).build()), - is(1)); + WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); + assertThat(dao.updateWorkflowInstance(i, now), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } @@ -501,7 +503,7 @@ public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedExcept List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) .setState("updateState").setStateText("update text").build(); sleep(1); assertThrows(IllegalArgumentException.class, @@ -528,10 +530,11 @@ public void fakePostgreSQLupdateWorkflowInstance() { assertEquals("with wf as (update nflow_workflow set status = ?::workflow_status, state = ?, state_text = ?, " + "next_activation = (case when ?::timestamptz is null then null when external_next_activation is null then " + "?::timestamptz else least(?::timestamptz, external_next_activation) end), external_next_activation = null, " - + "executor_id = ?, retries = ? where id = ? and executor_id = 42 returning id), " + + "executor_id = ?, retries = ?, started = (case when started is null then ? else started end) where id = ? " + + "and executor_id = 42 returning id), " + "act as (insert into nflow_workflow_action(workflow_id, executor_id, type, state, state_text, retry_no, " + "execution_start, execution_end) select wf.id, ?, ?::action_type, ?, ?, ?, ?, ? from wf returning id), " - + "ins16 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + + "ins17 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + "select wf.id,act.id,?,? from wf,act) select act.id from act", sql.getValue()); assertThat(args.getAllValues().size(), is(countMatches(sql.getValue(), "?"))); @@ -544,6 +547,7 @@ public void fakePostgreSQLupdateWorkflowInstance() { assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(i2.nextActivation.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) 42)); assertThat(args.getAllValues().get(i++), is((Object) i2.retries)); + assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(a1.executionStart.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) i2.id)); assertThat(args.getAllValues().get(i++), is((Object) 42)); assertThat(args.getAllValues().get(i++), is((Object) a1.type.name())); @@ -567,15 +571,16 @@ public void fakePostgreSQLinsertWorkflowInstance() { DateTime started = now(); WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") .setRootWorkflowId(9283).setParentWorkflowId(110).setParentActionId(421).setNextActivation(started.plusSeconds(1)) - .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)).build(); + .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) + .setStarted(started).build(); d.insertWorkflowInstance(wf); assertEquals( "with wf as (insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, " - + "external_id, executor_group, status, state, state_text, next_activation, workflow_signal) values " - + "(?, ?, ?, ?, ?, ?, ?, ?::workflow_status, ?, ?, ?, ?) returning id), ins12 as " + + "external_id, executor_group, status, state, state_text, next_activation, workflow_signal, started) values " + + "(?, ?, ?, ?, ?, ?, ?, ?::workflow_status, ?, ?, ?, ?, ?) returning id), ins13 as " + "(insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) select wf.id,0,?,? from wf), " - + "ins14 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + + "ins15 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + "select wf.id,0,?,? from wf) select wf.id from wf", sql.getValue()); assertThat(args.getAllValues().size(), is(countMatches(sql.getValue(), "?"))); @@ -593,6 +598,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { assertThat(args.getAllValues().get(i++), is((Object) wf.stateText)); assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(wf.nextActivation.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) wf.signal.get())); + assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(wf.started.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) "A")); assertThat(args.getAllValues().get(i++), is((Object) "B")); assertThat(args.getAllValues().get(i++), is((Object) "C")); @@ -740,8 +746,7 @@ public void insertingSubWorkflowWorks() { assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId)); assertThat(i2.parentActionId, equalTo(parentActionId)); - WorkflowInstance parent = dao.getWorkflowInstance(parentWorkflowId, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), - null); + WorkflowInstance parent = dao.getWorkflowInstance(parentWorkflowId, EnumSet.of(CHILD_WORKFLOW_IDS), null); assertThat(parent.childWorkflows.get(parentActionId), containsInAnyOrder(subWorkflowId1, subWorkflowId2)); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index a1fcd302e..30fc16c39 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -2,6 +2,8 @@ import static edu.umd.cs.mtc.TestFramework.runOnce; import static java.util.Arrays.asList; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.anyInt; @@ -206,7 +208,7 @@ class ShutdownWithoutStart extends MultithreadedTestCase { public void threadShutdown() { dispatcher.shutdown(); - assertFalse(dispatcher.isRunning()); + assertThat(dispatcher.isRunning(), is(false)); } } runOnce(new ShutdownWithoutStart()); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index c021ee15a..30763d5f6 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -1,5 +1,7 @@ package io.nflow.engine.internal.executor; +import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; +import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.moveToStateAfter; import static io.nflow.engine.workflow.definition.NextAction.retryAfter; @@ -28,7 +30,18 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.mockito.hamcrest.MockitoHamcrest.argThat; import java.util.ArrayList; import java.util.EnumSet; @@ -153,8 +166,7 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { private final DateTime tomorrow = now().plusDays(1); - private final Set INCLUDES = EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS, - WorkflowInstanceInclude.CURRENT_STATE_VARIABLES, WorkflowInstanceInclude.STARTED); + private final Set INCLUDES = EnumSet.of(CHILD_WORKFLOW_IDS, CURRENT_STATE_VARIABLES); @BeforeEach public void setup() { @@ -290,8 +302,9 @@ public void instanceWithUnsupportedStateIsRescheduled() { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, - is("Unsupported workflow state"), greaterThanOrEqualTo(oneHourInFuture)))); + argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, is("Unsupported workflow state"), + greaterThanOrEqualTo(oneHourInFuture))), + any(DateTime.class)); } @Test @@ -301,9 +314,9 @@ public void workflowStatusIsSetToManualForManualStates() { when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); executor.run(); verify(workflowInstanceDao).updateWorkflowInstanceAfterExecution( - MockitoHamcrest.argThat(matchesWorkflowInstance(manual, SimpleTestWorkflow.State.manualState, 0, is("Stopped in state manualState"), + argThat(matchesWorkflowInstance(manual, SimpleTestWorkflow.State.manualState, 0, is("Stopped in state manualState"), nullValue(DateTime.class))), - MockitoHamcrest.argThat(matchesWorkflowInstanceAction(SimpleTestWorkflow.State.beforeManual, is("Move to manual state."), + argThat(matchesWorkflowInstanceAction(SimpleTestWorkflow.State.beforeManual, is("Move to manual state."), simpleWf.getSettings().maxRetries, stateExecution)), argThat(isEmptyWorkflowList()), argThat(isEmptyWorkflowList()), eq(true)); } @@ -353,8 +366,9 @@ public NextAction answer(InvocationOnMock invocation) { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, SimpleTestWorkflow.State.start, 0, is("Scheduled by previous state start"), - is(skipped)))); + argThat(matchesWorkflowInstance(inProgress, SimpleTestWorkflow.State.start, 0, is("Scheduled by previous state start"), + is(skipped))), + any(DateTime.class)); } @Test @@ -732,8 +746,9 @@ public void instanceWithUnsupportedTypeIsRescheduled() { executor.run(); verify(workflowInstanceDao).updateWorkflowInstance( - MockitoHamcrest.argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, - is("Unsupported workflow type"), greaterThanOrEqualTo(oneHourInFuture)))); + argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type"), + greaterThanOrEqualTo(oneHourInFuture))), + any(DateTime.class)); } @Test diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 59300d38d..718c79c69 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -152,8 +152,6 @@ public Collection listWorkflowInstances(final List Collection instances = workflowInstances.listWorkflowInstances(q); List resp = new ArrayList<>(); Set parseIncludeEnums = parseIncludeEnums(include); - // TODO: move to include parameters in next major version - parseIncludeEnums.add(WorkflowInstanceInclude.STARTED); for (WorkflowInstance instance : instances) { resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums)); } @@ -173,8 +171,6 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance(final int id, final St final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) throws EmptyResultDataAccessException { Set includes = parseIncludeEnums(include); - // TODO: move to include parameters in next major version - includes.add(WorkflowInstanceInclude.STARTED); WorkflowInstance instance = workflowInstances.getWorkflowInstance(id, includes, maxActions); return listWorkflowConverter.convert(instance, includes); } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java index 54da7cc1b..0702d0a4d 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java @@ -46,9 +46,7 @@ public ListWorkflowInstanceResponse convert(WorkflowInstance instance, Set resource.fetchWorkflowInstance(42, null, null)); } @@ -226,11 +227,11 @@ public void fetchingNonExistingWorkflowThrowsNotFoundException() { @Test public void fetchingExistingWorkflowWorks() { WorkflowInstance instance = mock(WorkflowInstance.class); - when(workflowInstances.getWorkflowInstance(42, EnumSet.of(WorkflowInstanceInclude.STARTED), null)).thenReturn(instance); + when(workflowInstances.getWorkflowInstance(42, emptySet(), null)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, null, null); - verify(workflowInstances).getWorkflowInstance(42, EnumSet.of(WorkflowInstanceInclude.STARTED), null); + verify(workflowInstances).getWorkflowInstance(42, emptySet(), null); assertEquals(resp, result); } From 1a074097a632df26700968158b46f72f88f671a9 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 14 May 2019 21:26:04 +0300 Subject: [PATCH 02/20] do not set instance started when rescheduling unknown --- .../engine/internal/executor/WorkflowStateProcessor.java | 4 ++-- .../engine/internal/executor/WorkflowStateProcessorTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index a7cf1c79f..a9db9aa87 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -177,7 +177,7 @@ private void rescheduleUnknownWorkflowType(WorkflowInstance instance) { logger.warn("Workflow type {} not configured to this nFlow instance - rescheduling workflow instance", instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowTypeRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow type").build(); - workflowInstanceDao.updateWorkflowInstance(instance, now()); + workflowInstanceDao.updateWorkflowInstance(instance, null); logger.debug("Finished."); } @@ -186,7 +186,7 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) { instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowStateRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow state").build(); - workflowInstanceDao.updateWorkflowInstance(instance, now()); + workflowInstanceDao.updateWorkflowInstance(instance, null); logger.debug("Finished."); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 30763d5f6..a85372fa0 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -304,7 +304,7 @@ public void instanceWithUnsupportedStateIsRescheduled() { verify(workflowInstanceDao).updateWorkflowInstance( argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, is("Unsupported workflow state"), greaterThanOrEqualTo(oneHourInFuture))), - any(DateTime.class)); + eq(null)); } @Test @@ -748,7 +748,7 @@ public void instanceWithUnsupportedTypeIsRescheduled() { verify(workflowInstanceDao).updateWorkflowInstance( argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type"), greaterThanOrEqualTo(oneHourInFuture))), - any(DateTime.class)); + eq(null)); } @Test From e75b379ecacfcef8c1ea1976144f8b50ea84bfd4 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 00:21:56 +0300 Subject: [PATCH 03/20] do not set started timestamp when inserting instance --- .../engine/internal/dao/WorkflowInstanceDao.java | 7 +++---- .../engine/internal/dao/WorkflowInstanceDaoTest.java | 11 +++++------ 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 9994baee9..ede849813 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -183,7 +183,7 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { Object[] instanceValues = new Object[] { instance.type, instance.rootWorkflowId, instance.parentWorkflowId, instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), - toTimestamp(instance.nextActivation), instance.signal.orElse(null), toTimestamp(instance.started) }; + toTimestamp(instance.nextActivation), instance.signal.orElse(null) }; int pos = instanceValues.length; Object[] args = Arrays.copyOf(instanceValues, pos + instance.stateVariables.size() * 2); for (Entry var : instance.stateVariables.entrySet()) { @@ -202,8 +202,8 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { String insertWorkflowInstanceSql() { return "insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, external_id, " - + "executor_group, status, state, state_text, next_activation, workflow_signal, started) values (?, ?, ?, ?, ?, ?, ?, " - + sqlVariants.workflowStatus() + ", ?, ?, ?, ?, ?)"; + + "executor_group, status, state, state_text, next_activation, workflow_signal) values (?, ?, ?, ?, ?, ?, ?, " + + sqlVariants.workflowStatus() + ", ?, ?, ?, ?)"; } String insertWorkflowInstanceStateSql() { @@ -235,7 +235,6 @@ private int insertWorkflowInstanceWithTransaction(final WorkflowInstance instanc } else { ps.setNull(p++, Types.INTEGER); } - sqlVariants.setDateTime(ps, p++, instance.started); return ps; }, keyHolder); } catch (DuplicateKeyException e) { diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index e4f62b096..465498394 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -577,10 +577,10 @@ public void fakePostgreSQLinsertWorkflowInstance() { d.insertWorkflowInstance(wf); assertEquals( "with wf as (insert into nflow_workflow(type, root_workflow_id, parent_workflow_id, parent_action_id, business_key, " - + "external_id, executor_group, status, state, state_text, next_activation, workflow_signal, started) values " - + "(?, ?, ?, ?, ?, ?, ?, ?::workflow_status, ?, ?, ?, ?, ?) returning id), ins13 as " + + "external_id, executor_group, status, state, state_text, next_activation, workflow_signal) values " + + "(?, ?, ?, ?, ?, ?, ?, ?::workflow_status, ?, ?, ?, ?) returning id), ins12 as " + "(insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) select wf.id,0,?,? from wf), " - + "ins15 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + + "ins14 as (insert into nflow_workflow_state(workflow_id, action_id, state_key, state_value) " + "select wf.id,0,?,? from wf) select wf.id from wf", sql.getValue()); assertThat(args.getAllValues().size(), is(countMatches(sql.getValue(), "?"))); @@ -598,7 +598,6 @@ public void fakePostgreSQLinsertWorkflowInstance() { assertThat(args.getAllValues().get(i++), is((Object) wf.stateText)); assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(wf.nextActivation.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) wf.signal.get())); - assertThat(args.getAllValues().get(i++), is((Object) new Timestamp(wf.started.getMillis()))); assertThat(args.getAllValues().get(i++), is((Object) "A")); assertThat(args.getAllValues().get(i++), is((Object) "B")); assertThat(args.getAllValues().get(i++), is((Object) "C")); @@ -608,7 +607,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { @Test public void insertWorkflowInstanceActionWorks() { DateTime started = now(); - final WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStarted(started).build(); + final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); int id = dao.insertWorkflowInstance(i1); final WorkflowInstanceAction a1 = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42) @@ -726,7 +725,7 @@ public void getWorkflowInstanceStateWorks() { @Test public void insertingSubWorkflowWorks() { DateTime started = now(); - final WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStarted(started).build(); + final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("b", "2"); int parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); From f18b1fded8a90e2cb85effb0d8c45215324542a7 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 01:19:10 +0300 Subject: [PATCH 04/20] try zeroDateTimeBehavior=convertToNull --- nflow-engine/src/main/resources/nflow-engine.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index ea497dc51..09c6f3b6e 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -30,7 +30,7 @@ nflow.db.h2.tcp.port=8043 nflow.db.h2.console.port=8044 nflow.db.mysql.driver=com.mysql.cj.jdbc.Driver -nflow.db.mysql.url=jdbc:mysql://localhost/nflow +nflow.db.mysql.url=jdbc:mysql://localhost/nflow?zeroDateTimeBehavior=convertToNull nflow.db.mysql.user=nflow nflow.db.mysql.password=nflow From 0514e15fb54f8ded6bac55b1c1968436fb4259b7 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 02:09:42 +0300 Subject: [PATCH 05/20] try zeroDateTimeBehavior=convertToNull --- nflow-tests/src/main/resources/local.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-tests/src/main/resources/local.properties b/nflow-tests/src/main/resources/local.properties index 3d8357378..8a67b9769 100644 --- a/nflow-tests/src/main/resources/local.properties +++ b/nflow-tests/src/main/resources/local.properties @@ -15,6 +15,6 @@ nflow.db.postgresql.password=nflow ## Example configuration for MySQL / MariaDB. ## To enable postgres you also need set VM parameter -Dspring.profiles.active=nflow.db.mysql nflow.db.mysql.driver=com.mysql.cj.jdbc.Driver -nflow.db.mysql.url=jdbc:mysql://localhost/nflow +nflow.db.mysql.url=jdbc:mysql://localhost/nflow?zeroDateTimeBehavior=convertToNull nflow.db.mysql.user=nflow nflow.db.mysql.password=nflow From 31e22ec592ba12cc83bdff5690dd28633c73d5aa Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 02:59:26 +0300 Subject: [PATCH 06/20] set instance started in builder --- .../java/io/nflow/engine/workflow/instance/WorkflowInstance.java | 1 + 1 file changed, 1 insertion(+) diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java index ee79cb6cc..79b06c800 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java @@ -291,6 +291,7 @@ public Builder(WorkflowInstance copy) { this.retries = copy.retries; this.created = copy.created; this.modified = copy.modified; + this.started = copy.started; this.executorGroup = copy.executorGroup; this.signal = copy.signal; this.mapper = copy.mapper; From 8b94f63e9cf24a66ad90d90e79a485753323d711 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 04:07:45 +0300 Subject: [PATCH 07/20] wip --- nflow-engine/src/main/resources/nflow-engine.properties | 2 +- .../src/main/resources/scripts/db/mysql.create.ddl.sql | 4 ++-- .../main/resources/scripts/db/mysql.legacy.create.ddl.sql | 2 +- .../scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql | 2 +- .../resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql | 2 +- .../nflow/engine/internal/dao/WorkflowInstanceDaoTest.java | 1 + .../rest/v1/converter/ListWorkflowInstanceConverterTest.java | 5 +++-- nflow-tests/src/main/resources/local.properties | 2 +- 8 files changed, 11 insertions(+), 9 deletions(-) diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 09c6f3b6e..ea497dc51 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -30,7 +30,7 @@ nflow.db.h2.tcp.port=8043 nflow.db.h2.console.port=8044 nflow.db.mysql.driver=com.mysql.cj.jdbc.Driver -nflow.db.mysql.url=jdbc:mysql://localhost/nflow?zeroDateTimeBehavior=convertToNull +nflow.db.mysql.url=jdbc:mysql://localhost/nflow nflow.db.mysql.user=nflow nflow.db.mysql.password=nflow diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql index 8dfb2eec8..d070930dd 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.create.ddl.sql @@ -16,7 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3) on update current_timestamp(3), - started timestamp(3), + started timestamp(3) null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) @@ -99,7 +99,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp(3) default current_timestamp(3), modified timestamp(3) default current_timestamp(3), - started timestamp(3), + started timestamp(3) null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql index 6e984a3c8..4ab113660 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql @@ -16,7 +16,7 @@ create table if not exists nflow_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null default current_timestamp on update current_timestamp, - started timestamp, + started timestamp null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql index 32d7caf6b..3ff835212 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql @@ -1 +1 @@ -alter table nflow_workflow add started timestamp; +alter table nflow_workflow add started timestamp null; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql index 495339bd8..264cc4869 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql @@ -1 +1 @@ -alter table nflow_workflow add started timestamp(3); +alter table nflow_workflow add started timestamp(3) null; diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 465498394..53ff3730a 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -175,6 +175,7 @@ public void processRow(ResultSet rs) throws SQLException { assertThat(rs.getTimestamp("next_activation").getTime(), equalTo(i2.nextActivation.toDate().getTime())); assertThat(rs.getInt("executor_id") != 0, equalTo(i2.status == executing)); assertThat(rs.getTimestamp("modified").getTime(), greaterThan(originalModifiedTime.getMillis())); + assertThat(rs.getTimestamp("started"), is(notNullValue())); } }); QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().setBusinessKey("newKey").build(); diff --git a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java index 2c0c42f98..eb2a649ac 100644 --- a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java +++ b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; @@ -39,7 +40,6 @@ import io.nflow.engine.workflow.instance.WorkflowInstanceAction; import io.nflow.rest.v1.msg.Action; import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse; -import org.mockito.junit.jupiter.MockitoExtension; @ExtendWith(MockitoExtension.class) public class ListWorkflowInstanceConverterTest { @@ -56,12 +56,13 @@ public void convertWithActionsWorks() throws IOException { Map stateVariables = new LinkedHashMap<>(); stateVariables.put("foo", "1"); stateVariables.put("bar", "quux"); + DateTime started = now(); WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setStatus(inProgress).setType("dummy") .setBusinessKey("businessKey").setParentWorkflowId(942).setParentActionId(842).setExternalId("externalId") .setState("cState").setStateText("cState desc").setNextActivation(now()).setActions(asList(a)) .setCreated(now().minusMinutes(1)).setCreated(now().minusHours(2)).setModified(now().minusHours(1)).setRetries(42) - .setStateVariables(stateVariables).setSignal(Optional.of(42)).build(); + .setStateVariables(stateVariables).setSignal(Optional.of(42)).setStarted(started).build(); JsonNode node1 = mock(JsonNode.class); JsonNode nodeQuux = mock(JsonNode.class); diff --git a/nflow-tests/src/main/resources/local.properties b/nflow-tests/src/main/resources/local.properties index 8a67b9769..3d8357378 100644 --- a/nflow-tests/src/main/resources/local.properties +++ b/nflow-tests/src/main/resources/local.properties @@ -15,6 +15,6 @@ nflow.db.postgresql.password=nflow ## Example configuration for MySQL / MariaDB. ## To enable postgres you also need set VM parameter -Dspring.profiles.active=nflow.db.mysql nflow.db.mysql.driver=com.mysql.cj.jdbc.Driver -nflow.db.mysql.url=jdbc:mysql://localhost/nflow?zeroDateTimeBehavior=convertToNull +nflow.db.mysql.url=jdbc:mysql://localhost/nflow nflow.db.mysql.user=nflow nflow.db.mysql.password=nflow From a1710308b2744027ff20eeacf377d450da4a3219 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 11:31:54 +0300 Subject: [PATCH 08/20] populate started timestamp in instance table based on actions --- .../resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql | 3 +++ .../main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql | 3 +++ .../resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql | 3 +++ .../resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/postgresql.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql | 3 +++ 7 files changed, 21 insertions(+) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql index 495339bd8..7df68f7c7 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamp(3); + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql index 32d7caf6b..387613abd 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamp; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql index 3ff835212..dd5935a40 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamp null; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql index 264cc4869..b84e9359d 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamp(3) null; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql index 32d7caf6b..387613abd 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamp; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql index 88a32213c..4b832760b 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started timestamptz; + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql index 862c12b73..3a8c973f7 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql @@ -1 +1,4 @@ alter table nflow_workflow add started datetimeoffset(3); + +update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id From ed449104ec2a31777a7b7cc9503e644a07a30da1 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 11:38:49 +0300 Subject: [PATCH 09/20] update changelog --- CHANGELOG.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f55032ff..750310bf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,11 +17,12 @@ - jetty 9.4.17.v20190418 - `nflow-engine` - Retry workflow state processing until all steps in nFlow-side are executed successfully. This will prevent workflow instances from being locked in `executing` status, if e.g. database connection fails after locking the instance and before querying the full workflow instance information (`WorkflowStateProcessor`). - - Fix #306: create empty ArrayList with default initial size - - Log more executor details on startup + - Fix #306: create empty ArrayList with default initial size. + - Log more executor details on startup. - Fix #311: Replace references to WorkflowDefinition with AbstractWorkflowDefinition to support non-enum WorkflowStates - - Use name() instead of toString() when getting workflow instance initial state name - - Deprecated WorkflowInstanceInclude.STARTED enum value + - Use name() instead of toString() when getting workflow instance initial state name. + - Deprecated WorkflowInstanceInclude.STARTED enum value. This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. + - Add started timestamp to workflow instance table. This makes the instance queries much faster when instances have lots of actions, as there is no need to join the nflow_workflow_action table to the query anymore. ## 5.5.0 (2019-04-04) From 79609f74192641d977355aaad61cd4958205f2a5 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 15 May 2019 21:45:22 +0300 Subject: [PATCH 10/20] add missing semicolons --- .../main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql | 2 +- .../main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql | 2 +- .../scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql | 2 +- .../resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql | 2 +- .../resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql | 2 +- .../scripts/db/update-5.5.0-x/postgresql.update.ddl.sql | 2 +- .../scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql index 7df68f7c7..33d72716f 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamp(3); update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql index 387613abd..2c51dc842 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamp; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql index dd5935a40..e32a391f7 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamp null; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql index b84e9359d..d2877fef5 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamp(3) null; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql index 387613abd..2c51dc842 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamp; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql index 4b832760b..278f42d7f 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started timestamptz; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql index 3a8c973f7..dee93ada8 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql @@ -1,4 +1,4 @@ alter table nflow_workflow add started datetimeoffset(3); update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a - set w.started = a.started where w.id = a.workflow_id + set w.started = a.started where w.id = a.workflow_id; From 496008468bc95409d115d3609592580fca132cb5 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Thu, 16 May 2019 08:37:36 +0300 Subject: [PATCH 11/20] update db scripts --- .../src/main/resources/scripts/db/mysql.legacy.create.ddl.sql | 2 +- .../resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql | 3 +++ .../main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql | 3 +++ .../resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql | 3 +++ .../resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/postgresql.update.ddl.sql | 3 +++ .../scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql | 3 +++ 8 files changed, 22 insertions(+), 1 deletion(-) diff --git a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql index 4ab113660..9427681a7 100644 --- a/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/mysql.legacy.create.ddl.sql @@ -109,7 +109,7 @@ create table if not exists nflow_archive_workflow ( retries int not null default 0, created timestamp not null, modified timestamp not null, - started timestamp, + started timestamp null, executor_group varchar(64) not null, workflow_signal int, constraint nflow_archive_workflow_uniq unique (type, external_id, executor_group) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql index 33d72716f..131e58538 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamp(3); +alter table nflow_archive_workflow add started timestamp(3); update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql index 2c51dc842..d8662c53b 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamp; +alter table nflow_archive_workflow add started timestamp; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql index e32a391f7..82cd6ad3a 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamp null; +alter table nflow_archive_workflow add started timestamp null; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql index d2877fef5..2f188455f 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamp(3) null; +alter table nflow_archive_workflow add started timestamp(3) null; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql index 2c51dc842..d8662c53b 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamp; +alter table nflow_archive_workflow add started timestamp; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql index 278f42d7f..fac22a528 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started timestamptz; +alter table nflow_archive_workflow add started timestamptz; update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql index dee93ada8..5c674bdf2 100644 --- a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql +++ b/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql @@ -1,4 +1,7 @@ alter table nflow_workflow add started datetimeoffset(3); +alter table nflow_archive_workflow add started datetimeoffset(3); update nflow_workflow w, (select workflow_id, min(execution_start) as started from nflow_workflow_action group by workflow_id) a set w.started = a.started where w.id = a.workflow_id; +update nflow_archive_workflow w, (select workflow_id, min(execution_start) as started from nflow_archive_workflow_action group by workflow_id) a + set w.started = a.started where w.id = a.workflow_id; From 16934e091621565af43646cd79c20d999529f46b Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 17 May 2019 15:43:46 +0300 Subject: [PATCH 12/20] restore WorkflowInstanceInclude.STARTED, mark as deprecated --- .../io/nflow/engine/service/WorkflowInstanceInclude.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java index afcb33849..88dc589fc 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceInclude.java @@ -5,6 +5,13 @@ */ public enum WorkflowInstanceInclude { + /** + * The execution start date of the first action of the workflow instance (WorkflowInstance.started). + * @deprecated This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. + */ + @Deprecated + STARTED, + /** * The most recent values of all state variables (WorkflowInstance.stateVariables). */ From 7130edd70a952edc91566537f0928b363b4bc257 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 21 May 2019 23:04:48 +0300 Subject: [PATCH 13/20] fix test --- .../nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 9776bedad..73f957846 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -235,16 +235,18 @@ public void fetchingExistingWorkflowWorks() { assertEquals(resp, result); } - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "deprecation" }) @Test public void fetchingExistingWorkflowWorksWithAllIncludes() { WorkflowInstance instance = mock(WorkflowInstance.class); - when(workflowInstances.getWorkflowInstance(42, EnumSet.allOf(WorkflowInstanceInclude.class), 10L)).thenReturn(instance); + EnumSet includes = EnumSet.allOf(WorkflowInstanceInclude.class); + includes.remove(WorkflowInstanceInclude.STARTED); + when(workflowInstances.getWorkflowInstance(42, includes, 10L)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, "actions,currentStateVariables,actionStateVariables,childWorkflows", 10L); - verify(workflowInstances).getWorkflowInstance(42, EnumSet.allOf(WorkflowInstanceInclude.class), 10L); + verify(workflowInstances).getWorkflowInstance(42, includes, 10L); assertEquals(resp, result); } From 047831cd8f0d02d04a65243c23ca842f0862eb15 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 21 May 2019 23:06:58 +0300 Subject: [PATCH 14/20] update changelog --- CHANGELOG.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 750310bf3..603b09723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,14 +1,17 @@ -## 5.6.1-SNAPSHOT (future release) +## 5.7.0-SNAPSHOT (future release) **Highlights** +- Added `started` timestamp to workflow instance table (requires database update) **Details** +- `nflow-engine` + - Add started timestamp to workflow instance table. This makes the instance queries much faster when instances have lots of actions, as there is no need to join the nflow_workflow_action table to the query anymore. + - Deprecated WorkflowInstanceInclude.STARTED enum value. This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. ## 5.6.0 (2019-05-21) **Highlights** - Support non-enum WorkflowStates to enable, for example, Kotlin workflow definitions by extending AbstractWorkflowDefinition. -- Added `started` timestamp to workflow instance table (requires database update) **Details** - Dependency and plugin updates: @@ -21,8 +24,6 @@ - Log more executor details on startup. - Fix #311: Replace references to WorkflowDefinition with AbstractWorkflowDefinition to support non-enum WorkflowStates - Use name() instead of toString() when getting workflow instance initial state name. - - Deprecated WorkflowInstanceInclude.STARTED enum value. This is not needed anymore, since the started timestamp is always read from the database when the instance is loaded. - - Add started timestamp to workflow instance table. This makes the instance queries much faster when instances have lots of actions, as there is no need to join the nflow_workflow_action table to the query anymore. ## 5.5.0 (2019-04-04) From 4db5f31e2316529c713f9d0c85e43ac41eaf5ce4 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 21 May 2019 23:08:00 +0300 Subject: [PATCH 15/20] update db update script folder --- .../db/{update-5.5.0-x => update-5.6.0-x}/db2.update.ddl.sql | 0 .../db/{update-5.5.0-x => update-5.6.0-x}/h2.update.ddl.sql | 0 .../mysql.legacy.update.ddl.sql | 0 .../db/{update-5.5.0-x => update-5.6.0-x}/mysql.update.ddl.sql | 0 .../db/{update-5.5.0-x => update-5.6.0-x}/oracle.update.ddl.sql | 0 .../{update-5.5.0-x => update-5.6.0-x}/postgresql.update.ddl.sql | 0 .../{update-5.5.0-x => update-5.6.0-x}/sqlserver.update.ddl.sql | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/db2.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/h2.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/mysql.legacy.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/mysql.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/oracle.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/postgresql.update.ddl.sql (100%) rename nflow-engine/src/main/resources/scripts/db/{update-5.5.0-x => update-5.6.0-x}/sqlserver.update.ddl.sql (100%) diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/db2.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/db2.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/db2.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/h2.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/h2.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/h2.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.legacy.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.legacy.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.legacy.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/mysql.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/mysql.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/oracle.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/oracle.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/oracle.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/postgresql.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/postgresql.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/postgresql.update.ddl.sql diff --git a/nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql b/nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/sqlserver.update.ddl.sql similarity index 100% rename from nflow-engine/src/main/resources/scripts/db/update-5.5.0-x/sqlserver.update.ddl.sql rename to nflow-engine/src/main/resources/scripts/db/update-5.6.0-x/sqlserver.update.ddl.sql From 5aaac7ae1e6212cbec0efe838141d8ddaab65d6a Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 22 May 2019 07:45:36 +0300 Subject: [PATCH 16/20] set started timestamp to instance instead of passing the value as separate parameter --- .../internal/dao/WorkflowInstanceDao.java | 9 ++++---- .../executor/WorkflowStateProcessor.java | 22 ++++++++++--------- .../internal/dao/WorkflowInstanceDaoTest.java | 12 +++++----- .../executor/WorkflowStateProcessorTest.java | 21 ++++++++++-------- 4 files changed, 35 insertions(+), 29 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index ede849813..80a5e6675 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -322,16 +322,17 @@ public void updateWorkflowInstanceAfterExecution(WorkflowInstance instance, Work updateWorkflowInstanceWithTransaction(instance, action, childWorkflows, workflows, changedStateVariables); } } else { - updateWorkflowInstance(instance, action.executionStart); + updateWorkflowInstance(instance); } } - public int updateWorkflowInstance(WorkflowInstance instance, DateTime started) { + public int updateWorkflowInstance(WorkflowInstance instance) { // using sqlVariants.nextActivationUpdate() requires that nextActivation is used 3 times Object nextActivation = sqlVariants.toTimestampObject(instance.nextActivation); return jdbc.update(updateWorkflowInstanceSql(), instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), nextActivation, nextActivation, nextActivation, - instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, toTimestamp(started), instance.id); + instance.status == executing ? executorInfo.getExecutorId() : null, instance.retries, toTimestamp(instance.started), + instance.id); } private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instance, final WorkflowInstanceAction action, @@ -340,7 +341,7 @@ private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instan transaction.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { - updateWorkflowInstance(instance, action.executionStart); + updateWorkflowInstance(instance); int parentActionId = insertWorkflowInstanceAction(action); insertVariables(action.workflowInstanceId, parentActionId, changedStateVariables); for (WorkflowInstance childTemplate : childWorkflows) { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index a9db9aa87..d9731b2df 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -177,7 +177,7 @@ private void rescheduleUnknownWorkflowType(WorkflowInstance instance) { logger.warn("Workflow type {} not configured to this nFlow instance - rescheduling workflow instance", instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowTypeRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow type").build(); - workflowInstanceDao.updateWorkflowInstance(instance, null); + workflowInstanceDao.updateWorkflowInstance(instance); logger.debug("Finished."); } @@ -186,7 +186,7 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) { instance.type); instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowStateRetryDelay)) .setStatus(inProgress).setStateText("Unsupported workflow state").build(); - workflowInstanceDao.updateWorkflowInstance(instance, null); + workflowInstanceDao.updateWorkflowInstance(instance); logger.debug("Finished."); } @@ -219,7 +219,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, execution.wakeUpParentWorkflow(); } } - WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) // + WorkflowInstance.Builder instanceBuilder = new WorkflowInstance.Builder(instance) // .setNextActivation(execution.getNextActivation()) // .setStatus(getStatus(execution, nextState)) // .setStateText(getStateText(instance, execution)) // @@ -227,7 +227,7 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, .setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0); do { try { - return persistWorkflowInstanceState(execution, instance, actionBuilder, builder); + return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { logger.error("Failed to save workflow instance new state, retrying after {} seconds", stateSaveRetryDelay, ex); sleepIgnoreInterrupted(stateSaveRetryDelay); @@ -243,21 +243,23 @@ void setInternalRetryEnabled(boolean internalRetryEnabled) { this.internalRetryEnabled = internalRetryEnabled; } - private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance, - WorkflowInstanceAction.Builder actionBuilder, WorkflowInstance.Builder builder) { + private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map originalStateVars, + WorkflowInstanceAction.Builder actionBuilder, WorkflowInstance.Builder instanceBuilder) { WorkflowInstanceAction action = actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)) .setStateText(execution.getNextStateReason()).build(); if (execution.isStateProcessInvoked()) { + WorkflowInstance instance = instanceBuilder.setStarted(action.executionStart).build(); if (execution.isFailed()) { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), action, emptyList(), emptyList(), true); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(instance, action, emptyList(), emptyList(), true); } else { - workflowInstanceDao.updateWorkflowInstanceAfterExecution(builder.build(), action, execution.getNewChildWorkflows(), execution.getNewWorkflows(), execution.createAction()); + workflowInstanceDao.updateWorkflowInstanceAfterExecution(instance, action, execution.getNewChildWorkflows(), + execution.getNewWorkflows(), execution.createAction()); processSuccess(execution, instance); } } else { - workflowInstanceDao.updateWorkflowInstance(builder.build(), action.executionStart); + workflowInstanceDao.updateWorkflowInstance(instanceBuilder.build()); } - return builder.setOriginalStateVariables(instance.stateVariables).build(); + return instanceBuilder.setOriginalStateVariables(originalStateVars).build(); } private void processSuccess(StateExecutionImpl execution, WorkflowInstance instance) { diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 53ff3730a..134c4fe0f 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -154,9 +154,10 @@ public void updateWorkflowInstance() throws InterruptedException { int id = dao.insertWorkflowInstance(i1); List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); + DateTime started = now(); final WorkflowInstance i2 = new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") - .setStateText("update text").build(); + .setStateText("update text").setStarted(started).build(); final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null); assertThat(polledInstance.status, equalTo(executing)); final DateTime originalModifiedTime = polledInstance.modified; @@ -175,7 +176,7 @@ public void processRow(ResultSet rs) throws SQLException { assertThat(rs.getTimestamp("next_activation").getTime(), equalTo(i2.nextActivation.toDate().getTime())); assertThat(rs.getInt("executor_id") != 0, equalTo(i2.status == executing)); assertThat(rs.getTimestamp("modified").getTime(), greaterThan(originalModifiedTime.getMillis())); - assertThat(rs.getTimestamp("started"), is(notNullValue())); + assertThat(rs.getTimestamp("started").getTime(), is(equalTo(started.getMillis()))); } }); QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().setBusinessKey("newKey").build(); @@ -444,7 +445,6 @@ public void updateNotRunningWorkflowInstanceDoesNotUpdateRunningInstance() { @Test public void updatingNextActivationToNullWhileExternalNextActivationIsNotNull() { - DateTime started = now(); WorkflowInstance instance = constructWorkflowInstanceBuilder().setNextActivation(null).build(); int workflowId = dao.insertWorkflowInstance(instance); assertTrue(workflowId > -1); @@ -454,7 +454,7 @@ public void updatingNextActivationToNullWhileExternalNextActivationIsNotNull() { WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); i = new WorkflowInstance.Builder(i).setNextActivation(null).build(); - dao.updateWorkflowInstance(i, started); + dao.updateWorkflowInstance(i); WorkflowInstance updated = dao.getWorkflowInstance(workflowId, emptySet(), null); assertThat(updated.nextActivation, is(nullValue())); @@ -476,7 +476,7 @@ public void updatingNextActivationWhenExternalNextActivationIsEarlier() { WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); - assertThat(dao.updateWorkflowInstance(i, now), is(1)); + assertThat(dao.updateWorkflowInstance(i), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } @@ -493,7 +493,7 @@ public void updatingNextActivationWhenExternalNextActivationIsLater() { assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", new Timestamp(future.getMillis()), workflowId), is(1)); WorkflowInstance i = dao.getWorkflowInstance(workflowId, EnumSet.of(CURRENT_STATE_VARIABLES), null); - assertThat(dao.updateWorkflowInstance(i, now), is(1)); + assertThat(dao.updateWorkflowInstance(i), is(1)); assertThat(dao.getWorkflowInstance(workflowId, emptySet(), null).nextActivation, is(now)); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index a85372fa0..fe7816672 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -303,8 +303,7 @@ public void instanceWithUnsupportedStateIsRescheduled() { verify(workflowInstanceDao).updateWorkflowInstance( argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.invalid, 0, is("Unsupported workflow state"), - greaterThanOrEqualTo(oneHourInFuture))), - eq(null)); + greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } @Test @@ -367,8 +366,7 @@ public NextAction answer(InvocationOnMock invocation) { verify(workflowInstanceDao).updateWorkflowInstance( argThat(matchesWorkflowInstance(inProgress, SimpleTestWorkflow.State.start, 0, is("Scheduled by previous state start"), - is(skipped))), - any(DateTime.class)); + is(skipped), is(nullValue())))); } @Test @@ -628,14 +626,19 @@ public void runWorkflowWithParameters() { assertThat(state.get("hello"), is("[1,2,3]")); } - private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, - int retries, Matcher stateTextMatcher) { + private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, int retries, + Matcher stateTextMatcher) { return matchesWorkflowInstance(status, state, retries, stateTextMatcher, Matchers.any(DateTime.class)); } + private Matcher matchesWorkflowInstance(WorkflowInstanceStatus status, WorkflowState state, int retries, + Matcher stateTextMatcher, Matcher nextActivationMatcher) { + return matchesWorkflowInstance(status, state, retries, stateTextMatcher, nextActivationMatcher, Matchers.any(DateTime.class)); + } + private Matcher matchesWorkflowInstance(final WorkflowInstanceStatus status, final WorkflowState state, final int retries, final Matcher stateTextMatcher, - final Matcher nextActivationMatcher) { + final Matcher nextActivationMatcher, final Matcher startedMatcher) { return new TypeSafeMatcher() { @Override public void describeTo(Description description) { @@ -650,6 +653,7 @@ protected boolean matchesSafely(WorkflowInstance i) { assertThat(i.stateText, stateTextMatcher); assertThat(i.retries, is(retries)); assertThat(i.nextActivation, nextActivationMatcher); + assertThat(i.started, startedMatcher); return true; } }; @@ -747,8 +751,7 @@ public void instanceWithUnsupportedTypeIsRescheduled() { verify(workflowInstanceDao).updateWorkflowInstance( argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type"), - greaterThanOrEqualTo(oneHourInFuture))), - eq(null)); + greaterThanOrEqualTo(oneHourInFuture), is(nullValue())))); } @Test From aac5649407152d95a05f9acd9c8db95c1200cc5f Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 22 May 2019 07:53:30 +0300 Subject: [PATCH 17/20] update version --- nflow-engine/pom.xml | 2 +- nflow-explorer/pom.xml | 2 +- nflow-jetty/pom.xml | 2 +- nflow-metrics/pom.xml | 2 +- nflow-netty/pom.xml | 2 +- nflow-perf-test/pom.xml | 2 +- nflow-rest-api-common/pom.xml | 2 +- nflow-rest-api-jax-rs/pom.xml | 2 +- nflow-rest-api-spring-web/pom.xml | 2 +- nflow-server-common/pom.xml | 2 +- nflow-tests/pom.xml | 2 +- pom.xml | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/nflow-engine/pom.xml b/nflow-engine/pom.xml index 46cb3cfdf..bee0efe2d 100644 --- a/nflow-engine/pom.xml +++ b/nflow-engine/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-explorer/pom.xml b/nflow-explorer/pom.xml index c05fdb393..a9025977b 100644 --- a/nflow-explorer/pom.xml +++ b/nflow-explorer/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT .. diff --git a/nflow-jetty/pom.xml b/nflow-jetty/pom.xml index 080964334..c31df1ec0 100644 --- a/nflow-jetty/pom.xml +++ b/nflow-jetty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-metrics/pom.xml b/nflow-metrics/pom.xml index 9d570c87d..09a3bd21f 100644 --- a/nflow-metrics/pom.xml +++ b/nflow-metrics/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-netty/pom.xml b/nflow-netty/pom.xml index 4d1d27bbc..741b7b301 100644 --- a/nflow-netty/pom.xml +++ b/nflow-netty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-perf-test/pom.xml b/nflow-perf-test/pom.xml index 72aa21d0e..9181467bb 100644 --- a/nflow-perf-test/pom.xml +++ b/nflow-perf-test/pom.xml @@ -13,7 +13,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-rest-api-common/pom.xml b/nflow-rest-api-common/pom.xml index 092d23037..717327e9b 100644 --- a/nflow-rest-api-common/pom.xml +++ b/nflow-rest-api-common/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-rest-api-jax-rs/pom.xml b/nflow-rest-api-jax-rs/pom.xml index fe9ca2c1e..7c0c322e7 100644 --- a/nflow-rest-api-jax-rs/pom.xml +++ b/nflow-rest-api-jax-rs/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-rest-api-spring-web/pom.xml b/nflow-rest-api-spring-web/pom.xml index 1fdd0e02f..b80cc7ad8 100644 --- a/nflow-rest-api-spring-web/pom.xml +++ b/nflow-rest-api-spring-web/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/nflow-server-common/pom.xml b/nflow-server-common/pom.xml index 1d3f6c520..25d50f8b7 100644 --- a/nflow-server-common/pom.xml +++ b/nflow-server-common/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT UTF-8 diff --git a/nflow-tests/pom.xml b/nflow-tests/pom.xml index 1e5e96026..5926dac76 100644 --- a/nflow-tests/pom.xml +++ b/nflow-tests/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 72ca8032c..4fd0a2921 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ nflow-root pom nFlow Root - 5.6.1-SNAPSHOT + 5.7.0-SNAPSHOT http://nflow.io From 6eceea47917fa96b744d0aaa3904490af1e3f2e8 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 22 May 2019 08:33:55 +0300 Subject: [PATCH 18/20] remove unnecessary table alias from instance query --- .../internal/dao/WorkflowInstanceDao.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 80a5e6675..389e94eef 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -627,28 +627,28 @@ public int compareTo(OptimisticLockKey other) { } public List queryWorkflowInstances(QueryWorkflowInstances query) { - String sql = "select w.* from nflow_workflow w "; + String sql = "select * from nflow_workflow "; List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); conditions.add(executorInfo.getExecutorGroupCondition()); if (!isEmpty(query.ids)) { - conditions.add("w.id in (:ids)"); + conditions.add("id in (:ids)"); params.addValue("ids", query.ids); } if (!isEmpty(query.types)) { - conditions.add("w.type in (:types)"); + conditions.add("type in (:types)"); params.addValue("types", query.types); } if (query.parentWorkflowId != null) { - conditions.add("w.parent_workflow_id = :parent_workflow_id"); + conditions.add("parent_workflow_id = :parent_workflow_id"); params.addValue("parent_workflow_id", query.parentWorkflowId); } if (query.parentActionId != null) { - conditions.add("w.parent_action_id = :parent_action_id"); + conditions.add("parent_action_id = :parent_action_id"); params.addValue("parent_action_id", query.parentActionId); } if (!isEmpty(query.states)) { - conditions.add("w.state in (:states)"); + conditions.add("state in (:states)"); params.addValue("states", query.states); } if (!isEmpty(query.statuses)) { @@ -656,20 +656,20 @@ public List queryWorkflowInstances(QueryWorkflowInstances quer for (WorkflowInstanceStatus s : query.statuses) { convertedStatuses.add(s.name()); } - conditions.add("w.status" + sqlVariants.castToText() + " in (:statuses)"); + conditions.add("status" + sqlVariants.castToText() + " in (:statuses)"); params.addValue("statuses", convertedStatuses); } if (query.businessKey != null) { - conditions.add("w.business_key = :business_key"); + conditions.add("business_key = :business_key"); params.addValue("business_key", query.businessKey); } if (query.externalId != null) { - conditions.add("w.external_id = :external_id"); + conditions.add("external_id = :external_id"); params.addValue("external_id", query.externalId); } - conditions.add("w.executor_group = :executor_group"); + conditions.add("executor_group = :executor_group"); params.addValue("executor_group", executorInfo.getExecutorGroup()); - sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by w.created desc"; + sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by created desc"; sql = sqlVariants.limit(sql, getMaxResults(query.maxResults)); List ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() .map(WorkflowInstance.Builder::build).collect(toList()); From 55cc89fa31d12fee3d5eb6fc5fb8e9cb13d5aa0a Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 22 May 2019 09:40:06 +0300 Subject: [PATCH 19/20] add WorkflowInstance.setStartedIfNotSet and deprecate WorkflowInstance.setStarted --- .../internal/dao/WorkflowInstanceDao.java | 2 +- .../executor/WorkflowStateProcessor.java | 2 +- .../workflow/instance/WorkflowInstance.java | 26 +++++++++++++++++-- .../internal/dao/WorkflowInstanceDaoTest.java | 4 +-- .../testdata/TestDataGenerator.java | 2 +- .../ListWorkflowInstanceConverterTest.java | 2 +- 6 files changed, 30 insertions(+), 8 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 389e94eef..611d1921f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -784,7 +784,7 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep .setRetries(rs.getInt("retries")) // .setCreated(sqlVariants.getDateTime(rs, "created")) // .setModified(sqlVariants.getDateTime(rs, "modified")) // - .setStarted(sqlVariants.getDateTime(rs, "started")) // + .setStartedIfNotSet(sqlVariants.getDateTime(rs, "started")) // .setExecutorGroup(rs.getString("executor_group")) // .setSignal(ofNullable(getInt(rs, "workflow_signal"))); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index d9731b2df..360c5db41 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -248,7 +248,7 @@ private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl executi WorkflowInstanceAction action = actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)) .setStateText(execution.getNextStateReason()).build(); if (execution.isStateProcessInvoked()) { - WorkflowInstance instance = instanceBuilder.setStarted(action.executionStart).build(); + WorkflowInstance instance = instanceBuilder.setStartedIfNotSet(action.executionStart).build(); if (execution.isFailed()) { workflowInstanceDao.updateWorkflowInstanceAfterExecution(instance, action, emptyList(), emptyList(), true); } else { diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java index 79b06c800..353358504 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java @@ -12,6 +12,8 @@ import java.util.Optional; import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.nflow.engine.internal.workflow.ObjectStringMapper; @@ -229,6 +231,8 @@ public String getStateVariable(String name, String defaultValue) { */ public static class Builder { + private static final Logger LOG = LoggerFactory.getLogger(WorkflowInstance.Builder.class); + Integer id; Integer executorId; Integer rootWorkflowId; @@ -526,12 +530,30 @@ public Builder setModified(DateTime modified) { } /** - * Set the start timestamp. + * Set the started timestamp. Log a warning if the value was already set and ignore the new value. * @param started Start time. * @return this. + * @deprecated Use setStartedIfNotSet instead. */ + @Deprecated public Builder setStarted(DateTime started) { - this.started = started; + if (this.started == null) { + this.started = started; + } else { + LOG.warn("Started is already set to {}, ignoring new value {}."); + } + return this; + } + + /** + * Set the started timestamp if it has not already been set. + * @param started Start time. + * @return this. + */ + public Builder setStartedIfNotSet(DateTime started) { + if (this.started == null) { + this.started = started; + } return this; } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 134c4fe0f..6b7914d6b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -157,7 +157,7 @@ public void updateWorkflowInstance() throws InterruptedException { DateTime started = now(); final WorkflowInstance i2 = new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") - .setStateText("update text").setStarted(started).build(); + .setStateText("update text").setStartedIfNotSet(started).build(); final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null); assertThat(polledInstance.status, equalTo(executing)); final DateTime originalModifiedTime = polledInstance.modified; @@ -573,7 +573,7 @@ public void fakePostgreSQLinsertWorkflowInstance() { WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") .setRootWorkflowId(9283).setParentWorkflowId(110).setParentActionId(421).setNextActivation(started.plusSeconds(1)) .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) - .setStarted(started).build(); + .setStartedIfNotSet(started).build(); d.insertWorkflowInstance(wf); assertEquals( diff --git a/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java b/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java index 04a39090c..f86127e67 100644 --- a/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java +++ b/nflow-perf-test/src/main/java/io/nflow/performance/testdata/TestDataGenerator.java @@ -81,7 +81,7 @@ private WorkflowInstance generateWorkflowInstance(List Date: Wed, 22 May 2019 09:50:44 +0300 Subject: [PATCH 20/20] update decision log --- DECISION_LOG.md | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/DECISION_LOG.md b/DECISION_LOG.md index c9557d5bb..ab219be2c 100644 --- a/DECISION_LOG.md +++ b/DECISION_LOG.md @@ -2,28 +2,34 @@ (newest first) -2015-04-19 efonsell ---------------------------- +6: 2019-05-22 efonsell +---------------------- +Add WorkflowInstance.started value (time when processing of the first state of the workflow was started) to the database as nflow_workflow.started. Reading the value from nflow_workflow_action table was causing performance issues when the amount of actions in the database was growing. + +This decision overrides the decision number 2. + +5: 2015-04-19 efonsell +---------------------- Define default property-values in properties-files only and make them required when there is a static default value. Do not define default values in properties-files for properties that have a dynamic default value. For example, nflow.dispatcher.sleep.ms default value 1000 is defined in nflow-engine.properties, but nflow.executor.thread.count default value which is based on number of processors is defined in the Java code. -2015-01-10 gmokki, efonsell ---------------------------- +4: 2015-01-10 gmokki, efonsell +------------------------------ When polling for next workflow instances in WorkflowInstanceDao, the modified field in OptimisticLockKey is handled as String instead of Timestamp to avoid problems caused by losing millisecond precision from timestamps in some cases (for example with some older versions of MySQL). -2014-12-10 eputtone -------------------- +3: 2014-12-10 eputtone +---------------------- Internal nFlow functionalities can access DAO layer (package com.nitorcreations.nflow.engine.internal.dao) directly instead of going through service layer (package com.nitorcreations.nflow.engine.service). Rationale: service layer is currently part of public API that we wish to keep as simple as possible. Example: WorkflowDefinitionResource in nflow-rest-api uses WorkflowDefinitionDao directly for retrieving StoredWorkflowDefinitions, because we don't want to confuse public API users with multiple workflow definition representations. -2014-11-26 jsyrjala -------------------- +2: 2014-11-26 jsyrjala +---------------------- WorkflowInstance.started value (time when processing of the workflow is first started) is fetched from the earliest nflow_workflow_action.start_execution value. This way no changes to database is needed. If needed because of performance, the value may be later added as nflow_workflow.started. Started value is fetched with a subselect. This requires that nflow_workflow_action.workflow_id is indexed. Postgres has explicit index, h2 and mysql have implicit index via foreign key. -2014-11-26 efonsell -------------------- +1: 2014-11-26 efonsell +---------------------- Swagger-UI static resources are downloaded from GitHub based on version defined in pom.xml and extracted to nflow-jetty target directory, except index.html which is customized and thus updated manually in nFlow repository.