From 2a7a185f9aefbfdbfd52c86413809a1109fe600a Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 22 Nov 2019 10:39:23 +0200 Subject: [PATCH 1/2] Refactor workflow and workflow action id from int to long --- .../nflow/engine/internal/dao/ArchiveDao.java | 12 +- .../io/nflow/engine/internal/dao/DaoUtil.java | 5 + .../internal/dao/WorkflowInstanceDao.java | 136 +++++++++-------- .../internal/executor/InstanceInfo.java | 2 +- .../internal/executor/WorkflowDispatcher.java | 6 +- .../executor/WorkflowStateProcessor.java | 8 +- .../WorkflowStateProcessorFactory.java | 4 +- .../internal/workflow/StateExecutionImpl.java | 4 +- .../nflow/engine/service/ArchiveService.java | 2 +- .../service/WorkflowInstanceService.java | 12 +- .../workflow/definition/StateExecution.java | 4 +- .../instance/QueryWorkflowInstances.java | 18 +-- .../workflow/instance/WorkflowInstance.java | 28 ++-- .../instance/WorkflowInstanceAction.java | 12 +- .../engine/internal/dao/ArchiveDaoTest.java | 104 ++++++------- .../internal/dao/StatisticsDaoTest.java | 6 +- .../internal/dao/WorkflowInstanceDaoTest.java | 139 +++++++++--------- .../internal/executor/BaseNflowTest.java | 2 +- .../executor/WorkflowDispatcherTest.java | 20 +-- .../WorkflowStateProcessorFactoryTest.java | 4 +- .../executor/WorkflowStateProcessorTest.java | 15 +- .../workflow/StateExecutionImplTest.java | 12 +- .../engine/service/ArchiveServiceTest.java | 4 +- .../service/WorkflowInstanceServiceTest.java | 6 +- .../performance/client/LoadGenerator.java | 8 +- .../java/io/nflow/rest/v1/ResourceBase.java | 10 +- .../java/io/nflow/rest/v1/msg/Action.java | 6 +- .../v1/msg/CreateWorkflowInstanceRequest.java | 2 +- .../msg/CreateWorkflowInstanceResponse.java | 2 +- .../v1/msg/ListWorkflowInstanceResponse.java | 8 +- .../ListWorkflowInstanceConverterTest.java | 4 +- .../v1/jaxrs/WorkflowInstanceResource.java | 16 +- .../jaxrs/WorkflowInstanceResourceTest.java | 18 +-- .../springweb/WorkflowInstanceResource.java | 16 +- .../java/io/nflow/tests/demo/DemoServer.java | 4 +- .../CreateCreditApplicationRequest.java | 2 +- .../io/nflow/tests/AbstractNflowTest.java | 22 +-- .../test/java/io/nflow/tests/ArchiveTest.java | 10 +- .../java/io/nflow/tests/BulkWorkflowTest.java | 4 +- .../io/nflow/tests/ChildWorkflowTest.java | 2 +- .../java/io/nflow/tests/DemoWorkflowTest.java | 2 +- .../test/java/io/nflow/tests/WakeupTest.java | 2 +- 42 files changed, 351 insertions(+), 352 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java index 377e63373..800904a04 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ArchiveDao.java @@ -40,7 +40,7 @@ public void ensureValidArchiveTablesExist() { tableMetadataChecker.ensureCopyingPossible("nflow_workflow_state", "nflow_archive_workflow_state"); } - public List listArchivableWorkflows(DateTime before, int maxRows) { + public List listArchivableWorkflows(DateTime before, int maxRows) { return jdbc.query( "select w.id id from nflow_workflow w, " + "(" + sqlVariants.limit( @@ -58,7 +58,7 @@ public List listArchivableWorkflows(DateTime before, int maxRows) { } @Transactional - public int archiveWorkflows(Collection workflowIds) { + public int archiveWorkflows(Collection workflowIds) { String workflowIdParams = params(workflowIds); int archivedWorkflows = archiveWorkflowTable(workflowIdParams); @@ -99,14 +99,14 @@ private String columnsFromMetadata(String tableName) { return join(columnNames, ","); } - private String params(Collection workflowIds) { + private String params(Collection workflowIds) { return "(" + join(workflowIds, ",") + ")"; } - static class ArchivableWorkflowsRowMapper implements RowMapper { + static class ArchivableWorkflowsRowMapper implements RowMapper { @Override - public Integer mapRow(ResultSet rs, int rowNum) throws SQLException { - return rs.getInt("id"); + public Long mapRow(ResultSet rs, int rowNum) throws SQLException { + return rs.getLong("id"); } } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/DaoUtil.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/DaoUtil.java index 32a318dc0..05b7eeb39 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/DaoUtil.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/DaoUtil.java @@ -39,6 +39,11 @@ public static Integer getInt(ResultSet rs, String columnLabel) throws SQLExcepti return rs.wasNull() ? null : value; } + public static Long getLong(ResultSet rs, String columnLabel) throws SQLException { + long value = rs.getLong(columnLabel); + return rs.wasNull() ? null : value; + } + public static final class ColumnNamesExtractor implements ResultSetExtractor> { static final ColumnNamesExtractor columnNamesExtractor = new ColumnNamesExtractor(); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index 3854dd3a2..f7eeb0446 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -1,8 +1,6 @@ package io.nflow.engine.internal.dao; -import static io.nflow.engine.internal.dao.DaoUtil.firstColumnLengthExtractor; -import static io.nflow.engine.internal.dao.DaoUtil.getInt; -import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp; +import static io.nflow.engine.internal.dao.DaoUtil.*; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; @@ -85,7 +83,7 @@ public class WorkflowInstanceDao { private static final Logger logger = getLogger(WorkflowInstanceDao.class); - static final Map> EMPTY_ACTION_STATE_MAP = Collections.> emptyMap(); + static final Map> EMPTY_ACTION_STATE_MAP = Collections.emptyMap(); final JdbcTemplate jdbc; private final NamedParameterJdbcTemplate namedJdbc; @@ -149,8 +147,8 @@ int getActionStateTextLength() { return actionStateTextLength; } - public int insertWorkflowInstance(WorkflowInstance instance) { - int id; + public long insertWorkflowInstance(WorkflowInstance instance) { + long id; if (sqlVariants.hasUpdateableCTE()) { id = insertWorkflowInstanceWithCte(instance); } else { @@ -162,7 +160,7 @@ public int insertWorkflowInstance(WorkflowInstance instance) { return id; } - private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { + private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { try { StringBuilder sqlb = new StringBuilder(256); sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)"); @@ -179,7 +177,7 @@ private int insertWorkflowInstanceWithCte(WorkflowInstance instance) { args[pos++] = variable.getValue(); } sqlb.append(" select wf.id from wf"); - return jdbc.queryForObject(sqlb.toString(), Integer.class, args); + return jdbc.queryForObject(sqlb.toString(), Long.class, args); } catch (DuplicateKeyException e) { logger.warn("Failed to insert workflow instance", e); return -1; @@ -202,7 +200,7 @@ String insertWorkflowInstanceStateSql() { @SuppressFBWarnings(value = { "OBL_UNSATISFIED_OBLIGATION_EXCEPTION_EDGE", "SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING" }, justification = "findbugs does not trust jdbctemplate, sql string is practically constant") - private int insertWorkflowInstanceWithTransaction(final WorkflowInstance instance) { + private long insertWorkflowInstanceWithTransaction(final WorkflowInstance instance) { return transaction.execute(status -> { KeyHolder keyHolder = new GeneratedKeyHolder(); try { @@ -229,15 +227,15 @@ private int insertWorkflowInstanceWithTransaction(final WorkflowInstance instanc }, keyHolder); } catch (DuplicateKeyException e) { logger.warn("Failed to insert workflow instance", e); - return -1; + return -1L; } - int id = keyHolder.getKey().intValue(); + long id = keyHolder.getKey().longValue(); insertVariables(id, 0, instance.stateVariables); return id; }); } - void insertVariables(final int id, final int actionId, Map changedStateVariables) { + void insertVariables(final long id, final long actionId, Map changedStateVariables) { if (changedStateVariables.isEmpty()) { return; } @@ -248,7 +246,7 @@ void insertVariables(final int id, final int actionId, Map chang } } - private void insertVariablesWithMultipleUpdates(final int id, final int actionId, Map changedStateVariables) { + private void insertVariablesWithMultipleUpdates(final long id, final long actionId, Map changedStateVariables) { for (Entry entry : changedStateVariables.entrySet()) { int updated = jdbc.update(insertWorkflowInstanceStateSql() + " values (?,?,?,?)", id, actionId, entry.getKey(), entry.getValue()); @@ -258,7 +256,7 @@ private void insertVariablesWithMultipleUpdates(final int id, final int actionId } } - private void insertVariablesWithBatchUpdate(final int id, final int actionId, Map changedStateVariables) { + private void insertVariablesWithBatchUpdate(final long id, final long actionId, Map changedStateVariables) { final Iterator> variables = changedStateVariables.entrySet().iterator(); int[] updateStatus = jdbc.batchUpdate(insertWorkflowInstanceStateSql() + " values (?,?,?,?)", new AbstractInterruptibleBatchPreparedStatementSetter() { @@ -268,8 +266,8 @@ protected boolean setValuesIfAvailable(PreparedStatement ps, int i) throws SQLEx return false; } Entry variable = variables.next(); - ps.setInt(1, id); - ps.setInt(2, actionId); + ps.setLong(1, id); + ps.setLong(2, actionId); ps.setString(3, variable.getKey()); ps.setString(4, variable.getValue()); return true; @@ -332,10 +330,10 @@ private void updateWorkflowInstanceWithTransaction(final WorkflowInstance instan @Override protected void doInTransactionWithoutResult(TransactionStatus status) { updateWorkflowInstance(instance); - int parentActionId = insertWorkflowInstanceAction(action); + long parentActionId = insertWorkflowInstanceAction(action); insertVariables(action.workflowInstanceId, parentActionId, changedStateVariables); for (WorkflowInstance childTemplate : childWorkflows) { - Integer rootWorkflowId = instance.rootWorkflowId == null ? instance.id : instance.rootWorkflowId; + Long rootWorkflowId = instance.rootWorkflowId == null ? instance.id : instance.rootWorkflowId; WorkflowInstance childWorkflow = new WorkflowInstance.Builder(childTemplate).setRootWorkflowId(rootWorkflowId) .setParentWorkflowId(instance.id).setParentActionId(parentActionId).build(); insertWorkflowInstance(childWorkflow); @@ -364,14 +362,14 @@ private List getRecoverableInstances() { @Override public InstanceInfo mapRow(ResultSet rs, int rowNum) throws SQLException { InstanceInfo instance = new InstanceInfo(); - instance.id = rs.getInt("id"); + instance.id = rs.getLong("id"); instance.state = rs.getString("state"); return instance; } }); } - private void recoverWorkflowInstance(final int instanceId, final WorkflowInstanceAction action) { + private void recoverWorkflowInstance(final long instanceId, final WorkflowInstanceAction action) { transaction.execute(new TransactionCallbackWithoutResult() { @Override protected void doInTransactionWithoutResult(TransactionStatus status) { @@ -411,7 +409,7 @@ private void updateWorkflowInstanceWithCTE(WorkflowInstance instance, final Work args[pos++] = variable.getValue(); } sqlb.append(" select act.id from act"); - jdbc.queryForObject(sqlb.toString(), Integer.class, args); + jdbc.queryForObject(sqlb.toString(), Long.class, args); } String insertWorkflowActionSql() { @@ -451,7 +449,7 @@ public boolean updateNotRunningWorkflowInstance(WorkflowInstance instance) { } @Transactional - public boolean wakeUpWorkflowExternally(int workflowInstanceId, List expectedStates) { + public boolean wakeUpWorkflowExternally(long workflowInstanceId, List expectedStates) { StringBuilder sql = new StringBuilder("update nflow_workflow set next_activation = (case when executor_id is null then ") .append("case when ").append(sqlVariants.dateLtEqDiff("next_activation", "current_timestamp")) .append(" then next_activation else current_timestamp end else next_activation end), ") @@ -482,7 +480,7 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl return jdbc.update(sql.toString(), args) == 1; } - public WorkflowInstance getWorkflowInstance(int id, Set includes, Long maxActions) { + public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions) { String sql = "select * from nflow_workflow where id = ?"; WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build(); if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) { @@ -510,7 +508,7 @@ public void processRow(ResultSet rs) throws SQLException { instance.originalStateVariables.putAll(instance.stateVariables); } - public List pollNextWorkflowInstanceIds(final int batchSize) { + public List pollNextWorkflowInstanceIds(final int batchSize) { if (sqlVariants.hasUpdateReturning()) { return pollNextWorkflowInstanceIdsWithUpdateReturning(batchSize); } @@ -528,25 +526,25 @@ String whereConditionForInstanceUpdate() { + " and " + executorInfo.getExecutorGroupCondition() + " order by next_activation asc"; } - private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) { + private List pollNextWorkflowInstanceIdsWithUpdateReturning(int batchSize) { String sql = updateInstanceForExecutionQuery() + " where id in (" + sqlVariants.limit("select id from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize) + ") and executor_id is null returning id"; - return jdbc.queryForList(sql, Integer.class); + return jdbc.queryForList(sql, Long.class); } - private List pollNextWorkflowInstanceIdsWithTransaction(final int batchSize) { - return transaction.execute(new TransactionCallback>() { + private List pollNextWorkflowInstanceIdsWithTransaction(final int batchSize) { + return transaction.execute(new TransactionCallback>() { @Override - public List doInTransaction(TransactionStatus transactionStatus) { + public List doInTransaction(TransactionStatus transactionStatus) { String sql = sqlVariants.limit("select id, modified from nflow_workflow " + whereConditionForInstanceUpdate(), batchSize); List instances = jdbc.query(sql, - (rs, rowNum) -> new OptimisticLockKey(rs.getInt("id"), sqlVariants.getTimestamp(rs, "modified"))); + (rs, rowNum) -> new OptimisticLockKey(rs.getLong("id"), sqlVariants.getTimestamp(rs, "modified"))); if (instances.isEmpty()) { return emptyList(); } sort(instances); - List ids = new ArrayList<>(instances.size()); + List ids = new ArrayList<>(instances.size()); if (useBatchUpdate()) { updateNextWorkflowInstancesWithBatchUpdate(instances, ids); } else { @@ -555,7 +553,7 @@ public List doInTransaction(TransactionStatus transactionStatus) { return ids; } - private void updateNextWorkflowInstancesWithMultipleUpdates(List instances, List ids) { + private void updateNextWorkflowInstancesWithMultipleUpdates(List instances, List ids) { boolean raceConditionDetected = false; for (OptimisticLockKey instance : instances) { int updated = jdbc.update(updateInstanceForExecutionQuery() + " where id = ? and modified = ? and executor_id is null", @@ -572,7 +570,7 @@ private void updateNextWorkflowInstancesWithMultipleUpdates(List instances, List ids) { + private void updateNextWorkflowInstancesWithBatchUpdate(List instances, List ids) { List batchArgs = new ArrayList<>(instances.size()); for (OptimisticLockKey instance : instances) { batchArgs.add(new Object[] { instance.id, sqlVariants.tuneTimestampForDb(instance.modified) }); @@ -580,7 +578,7 @@ private void updateNextWorkflowInstancesWithBatchUpdate(List } int[] updateStatuses = jdbc .batchUpdate(updateInstanceForExecutionQuery() + " where id = ? and modified = ? and executor_id is null", batchArgs); - Iterator idIt = ids.iterator(); + Iterator idIt = ids.iterator(); for (int status : updateStatuses) { idIt.next(); if (status == 0) { @@ -601,10 +599,10 @@ private void updateNextWorkflowInstancesWithBatchUpdate(List } private static class OptimisticLockKey extends ModelObject implements Comparable { - public final int id; + public final long id; public final Object modified; - public OptimisticLockKey(int id, Object modified) { + public OptimisticLockKey(long id, Object modified) { this.id = id; this.modified = modified; } @@ -612,7 +610,7 @@ public OptimisticLockKey(int id, Object modified) { @Override @SuppressFBWarnings(value = "EQ_COMPARETO_USE_OBJECT_EQUALS", justification = "This class has a natural ordering that is inconsistent with equals") public int compareTo(OptimisticLockKey other) { - return this.id - other.id; + return Long.compare(this.id, other.id); } } @@ -680,9 +678,9 @@ private void fillChildWorkflowIds(final WorkflowInstance instance) { jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() { @Override public void processRow(ResultSet rs) throws SQLException { - int parentActionId = rs.getInt(1); - int childWorkflowInstanceId = rs.getInt(2); - List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); + long parentActionId = rs.getLong(1); + long childWorkflowInstanceId = rs.getLong(2); + List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); children.add(childWorkflowInstanceId); } }, instance.id); @@ -696,7 +694,7 @@ private long getMaxResults(Long maxResults) { } private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { - Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) + Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) : EMPTY_ACTION_STATE_MAP; String sql = sqlVariants.limit( "select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", @@ -711,19 +709,19 @@ private long getMaxActions(Long maxActions) { return min(maxActions.longValue(), workflowInstanceQueryMaxActions); } - private Map> fetchActionStateVariables(WorkflowInstance instance) { + private Map> fetchActionStateVariables(WorkflowInstance instance) { return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc", new WorkflowActionStateRowMapper(), instance.id); } @Transactional(propagation = MANDATORY) - public int insertWorkflowInstanceAction(final WorkflowInstance instance, final WorkflowInstanceAction action) { - int actionId = insertWorkflowInstanceAction(action); + public long insertWorkflowInstanceAction(final WorkflowInstance instance, final WorkflowInstanceAction action) { + long actionId = insertWorkflowInstanceAction(action); insertVariables(action.workflowInstanceId, actionId, instance.getChangedStateVariables()); return actionId; } - public int insertWorkflowInstanceAction(final WorkflowInstanceAction action) { + public long insertWorkflowInstanceAction(final WorkflowInstanceAction action) { KeyHolder keyHolder = new GeneratedKeyHolder(); jdbc.update(new PreparedStatementCreator() { @Override @@ -733,7 +731,7 @@ public PreparedStatement createPreparedStatement(Connection con) throws SQLExcep PreparedStatement p = con.prepareStatement( insertWorkflowActionSql() + " values (?, ?, " + sqlVariants.actionType() + ", ?, ?, ?, ?, ?)", new String[] { "id" }); int field = 1; - p.setInt(field++, action.workflowInstanceId); + p.setLong(field++, action.workflowInstanceId); p.setInt(field++, executorInfo.getExecutorId()); p.setString(field++, action.type.name()); p.setString(field++, action.state); @@ -744,10 +742,10 @@ public PreparedStatement createPreparedStatement(Connection con) throws SQLExcep return p; } }, keyHolder); - return keyHolder.getKey().intValue(); + return keyHolder.getKey().longValue(); } - public String getWorkflowInstanceState(int workflowInstanceId) { + public String getWorkflowInstanceState(long workflowInstanceId) { return jdbc.queryForObject("select state from nflow_workflow where id = ?", String.class, workflowInstanceId); } @@ -755,11 +753,11 @@ class WorkflowInstanceRowMapper implements RowMapper { @Override public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLException { return workflowInstanceFactory.newWorkflowInstanceBuilder() // - .setId(rs.getInt("id")) // + .setId(rs.getLong("id")) // .setExecutorId(getInt(rs, "executor_id")) // - .setRootWorkflowId(getInt(rs, "root_workflow_id")) // - .setParentWorkflowId(getInt(rs, "parent_workflow_id")) // - .setParentActionId(getInt(rs, "parent_action_id")) // + .setRootWorkflowId(getLong(rs, "root_workflow_id")) // + .setParentWorkflowId(getLong(rs, "parent_workflow_id")) // + .setParentActionId(getLong(rs, "parent_action_id")) // .setStatus(WorkflowInstanceStatus.valueOf(rs.getString("status"))) // .setType(rs.getString("type")) // .setBusinessKey(rs.getString("business_key")) // @@ -779,20 +777,20 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep static class WorkflowInstanceActionRowMapper implements RowMapper { private final SQLVariants sqlVariants; - private final Map> actionStates; + private final Map> actionStates; - public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants, Map> actionStates) { + public WorkflowInstanceActionRowMapper(SQLVariants sqlVariants, Map> actionStates) { this.sqlVariants = sqlVariants; this.actionStates = actionStates; } @Override public WorkflowInstanceAction mapRow(ResultSet rs, int rowNum) throws SQLException { - int actionId = rs.getInt("id"); + long actionId = rs.getLong("id"); Map actionState = actionStates.getOrDefault(actionId, emptyMap()); return new WorkflowInstanceAction.Builder() // .setId(actionId) // - .setWorkflowInstanceId(rs.getInt("workflow_id")) // + .setWorkflowInstanceId(rs.getLong("workflow_id")) // .setExecutorId(rs.getInt("executor_id")) // .setType(WorkflowActionType.valueOf(rs.getString("type"))) // .setState(rs.getString("state")) // @@ -804,13 +802,13 @@ public WorkflowInstanceAction mapRow(ResultSet rs, int rowNum) throws SQLExcepti } } - static class WorkflowActionStateRowMapper implements ResultSetExtractor>> { - private final Map> actionStates = new LinkedHashMap<>(); + static class WorkflowActionStateRowMapper implements ResultSetExtractor>> { + private final Map> actionStates = new LinkedHashMap<>(); @Override - public Map> extractData(ResultSet rs) throws SQLException { + public Map> extractData(ResultSet rs) throws SQLException { while (rs.next()) { - int actionId = rs.getInt("action_id"); + long actionId = rs.getLong("action_id"); String stateKey = rs.getString("state_key"); String stateValue = rs.getString("state_value"); if (!actionStates.containsKey(actionId)) { @@ -823,13 +821,13 @@ public Map> extractData(ResultSet rs) throws SQLExc } } - public Optional getSignal(Integer workflowInstanceId) { + public Optional getSignal(long workflowInstanceId) { return ofNullable( jdbc.queryForObject("select workflow_signal from nflow_workflow where id = ?", Integer.class, workflowInstanceId)); } @Transactional - public boolean setSignal(Integer workflowInstanceId, Optional signal, String reason, WorkflowActionType actionType) { + public boolean setSignal(long workflowInstanceId, Optional signal, String reason, WorkflowActionType actionType) { boolean updated = jdbc.update("update nflow_workflow set workflow_signal = ? where id = ?", signal.orElse(null), workflowInstanceId) > 0; if (updated) { @@ -846,24 +844,24 @@ public boolean setSignal(Integer workflowInstanceId, Optional signal, S return updated; } - public String getWorkflowInstanceType(Integer workflowInstanceId) { + public String getWorkflowInstanceType(long workflowInstanceId) { return jdbc.queryForObject("select type from nflow_workflow where id = ?", String.class, workflowInstanceId); } @Transactional - public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer historyDeletableAfterHours) { + public int deleteWorkflowInstanceHistory(long workflowInstanceId, Integer historyDeletableAfterHours) { MapSqlParameterSource params = new MapSqlParameterSource(); params.addValue("workflowId", workflowInstanceId); params.addValue("deleteUpToTime", sqlVariants.toTimestampObject(now().minusHours(historyDeletableAfterHours))); - Integer maxActionId = namedJdbc + Long maxActionId = namedJdbc .queryForObject("select max(id) from nflow_workflow_action where workflow_id = :workflowId and " - + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Integer.class); + + sqlVariants.dateLtEqDiff("execution_end", ":deleteUpToTime"), params, Long.class); int deletedActions = 0; if (maxActionId != null) { params.addValue("maxActionId", maxActionId); - List referredActionIds = namedJdbc.queryForList( + List referredActionIds = namedJdbc.queryForList( "select distinct(max(action_id)) from nflow_workflow_state where workflow_id = :workflowId group by state_key", params, - Integer.class); + Long.class); if (referredActionIds.isEmpty()) { namedJdbc.update("delete from nflow_workflow_state where workflow_id = :workflowId and action_id <= :maxActionId", params); @@ -874,7 +872,7 @@ public int deleteWorkflowInstanceHistory(Integer workflowInstanceId, Integer his params); } referredActionIds.addAll(namedJdbc.queryForList( - "select distinct parent_action_id from nflow_workflow where parent_workflow_id = :workflowId", params, Integer.class)); + "select distinct parent_action_id from nflow_workflow where parent_workflow_id = :workflowId", params, Long.class)); if (referredActionIds.isEmpty()) { deletedActions = namedJdbc .update("delete from nflow_workflow_action where workflow_id = :workflowId and id <= :maxActionId", params); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/InstanceInfo.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/InstanceInfo.java index a8eda7ec2..df48bf449 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/InstanceInfo.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/InstanceInfo.java @@ -1,6 +1,6 @@ package io.nflow.engine.internal.executor; public class InstanceInfo { - public int id; + public long id; public String state; } \ No newline at end of file diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index fc187c78c..10cbb1803 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -141,19 +141,19 @@ private void shutdownPool() { } } - private void dispatch(List nextInstanceIds) { + private void dispatch(List nextInstanceIds) { if (nextInstanceIds.isEmpty()) { logger.debug("Found no workflow instances, sleeping."); sleep(false); return; } logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size()); - for (Integer instanceId : nextInstanceIds) { + for (Long instanceId : nextInstanceIds) { executor.execute(stateProcessorFactory.createProcessor(instanceId)); } } - private List getNextInstanceIds() { + private List getNextInstanceIds() { int nextBatchSize = executor.getQueueRemainingCapacity(); logger.debug("Polling next {} workflow instances.", nextBatchSize); return workflowInstances.pollNextWorkflowInstanceIds(nextBatchSize); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 41e1aa095..2fdd7c27c 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -62,7 +62,7 @@ class WorkflowStateProcessor implements Runnable { private static final PeriodicLogger threadStuckLogger = new PeriodicLogger(logger, 60); private static final String MDC_KEY = "workflowInstanceId"; - private final int instanceId; + private final long instanceId; private final WorkflowDefinitionService workflowDefinitions; private final WorkflowInstanceService workflowInstances; private final WorkflowInstancePreProcessor workflowInstancePreProcessor; @@ -75,14 +75,14 @@ class WorkflowStateProcessor implements Runnable { private final int stateProcessingRetryDelay; private final int stateSaveRetryDelay; private boolean internalRetryEnabled = true; - private final Map processingInstances; + private final Map processingInstances; private long startTimeSeconds; private Thread thread; - WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, + WorkflowStateProcessor(long instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env, - Map processingInstances, WorkflowExecutorListener... executorListeners) { + Map processingInstances, WorkflowExecutorListener... executorListeners) { this.instanceId = instanceId; this.objectMapper = objectMapper; this.workflowDefinitions = workflowDefinitions; diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java index d20c19ffd..11e3013ae 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java @@ -28,7 +28,7 @@ public class WorkflowStateProcessorFactory { private final Environment env; @Autowired(required = false) protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0]; - final Map processingInstances = new ConcurrentHashMap<>(); + final Map processingInstances = new ConcurrentHashMap<>(); private final int stuckThreadThresholdSeconds; @Inject @@ -44,7 +44,7 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio this.env = env; } - public WorkflowStateProcessor createProcessor(int instanceId) { + public WorkflowStateProcessor createProcessor(long instanceId) { return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, workflowInstancePreProcessor, env, processingInstances, listeners); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index e3bec8d10..d1bb88a58 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -72,7 +72,7 @@ public String getCurrentStateName() { } @Override - public int getWorkflowInstanceId() { + public long getWorkflowInstanceId() { return instance.id; } @@ -254,7 +254,7 @@ public void setSignal(Optional signal, String reason) { } @Override - public Optional getParentId() { + public Optional getParentId() { return Optional.ofNullable(instance.parentWorkflowId); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/ArchiveService.java b/nflow-engine/src/main/java/io/nflow/engine/service/ArchiveService.java index 63e6c5c1d..02ecf13c0 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/ArchiveService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/ArchiveService.java @@ -49,7 +49,7 @@ public int archiveWorkflows(DateTime olderThan, int batchSize) { log.info("Archiving starting. Archiving passive workflows older than {}, in batches of {}.", olderThan, batchSize); StopWatch stopWatch = new StopWatch(); stopWatch.start(); - List workflowIds; + List workflowIds; PeriodicLogger periodicLogger = new PeriodicLogger(log, 60); int archivedWorkflowsTotal = 0; do { diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java index 225b23eec..2b9c1caab 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java @@ -51,7 +51,7 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow * @param maxActions Maximum number of actions to be loaded. * @return The workflow instance, or null if not found. */ - public WorkflowInstance getWorkflowInstance(int id, Set includes, Long maxActions) { + public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions) { return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions); } @@ -63,10 +63,10 @@ public WorkflowInstance getWorkflowInstance(int id, Set * @return The id of the inserted or existing workflow instance. */ @SuppressFBWarnings(value = "BC_UNCONFIRMED_CAST_OF_RETURN_VALUE", justification = "getInitialState().toString() has no cast") - public int insertWorkflowInstance(WorkflowInstance instance) { + public long insertWorkflowInstance(WorkflowInstance instance) { Assert.notNull(workflowInstancePreProcessor, "workflowInstancePreProcessor can not be null"); WorkflowInstance processedInstance = workflowInstancePreProcessor.process(instance); - int id = workflowInstanceDao.insertWorkflowInstance(processedInstance); + long id = workflowInstanceDao.insertWorkflowInstance(processedInstance); if (id == -1 && !isEmpty(instance.externalId)) { QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().addTypes(instance.type).setExternalId(instance.externalId).build(); id = workflowInstanceDao.queryWorkflowInstances(query).get(0).id; @@ -131,7 +131,7 @@ public Collection listWorkflowInstances(QueryWorkflowInstances * @param workflowInstanceId Workflow instance id. * @return Current signal value. */ - public Optional getSignal(Integer workflowInstanceId) { + public Optional getSignal(long workflowInstanceId) { return workflowInstanceDao.getSignal(workflowInstanceId); } @@ -143,7 +143,7 @@ public Optional getSignal(Integer workflowInstanceId) { * @param actionType The type of workflow action that is stored to instance actions. * @return True when signal was set, false otherwise. */ - public boolean setSignal(Integer workflowInstanceId, Optional signal, String reason, WorkflowActionType actionType) { + public boolean setSignal(long workflowInstanceId, Optional signal, String reason, WorkflowActionType actionType) { Assert.notNull(workflowDefinitionService, "workflowDefinitionService cannot be null"); signal.ifPresent(signalValue -> { AbstractWorkflowDefinition definition = getDefinition(workflowInstanceId); @@ -154,7 +154,7 @@ public boolean setSignal(Integer workflowInstanceId, Optional signal, S return workflowInstanceDao.setSignal(workflowInstanceId, signal, reason, actionType); } - private AbstractWorkflowDefinition getDefinition(Integer workflowInstanceId) { + private AbstractWorkflowDefinition getDefinition(Long workflowInstanceId) { return workflowDefinitionService.getWorkflowDefinition(workflowInstanceDao.getWorkflowInstanceType(workflowInstanceId)); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/StateExecution.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/StateExecution.java index d72694a80..72d0b59c4 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/StateExecution.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/StateExecution.java @@ -18,7 +18,7 @@ public interface StateExecution { * * @return The workflow instance id. */ - int getWorkflowInstanceId(); + long getWorkflowInstanceId(); /** * Return the business key associated to the workflow instance. @@ -175,6 +175,6 @@ public interface StateExecution { * * @return The parent workflow instance id or empty. */ - Optional getParentId(); + Optional getParentId(); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java index 3f7934013..e4711ba14 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java @@ -16,7 +16,7 @@ public class QueryWorkflowInstances extends ModelObject { /** * Workflow instance identifiers. */ - public final List ids; + public final List ids; /** * Workflow instance definition type. @@ -26,12 +26,12 @@ public class QueryWorkflowInstances extends ModelObject { /** * Parent workflow instance id. */ - public Integer parentWorkflowId; + public Long parentWorkflowId; /** * Parent workflow action id. */ - public Integer parentActionId; + public Long parentActionId; /** * Workflow instance states. @@ -106,10 +106,10 @@ public class QueryWorkflowInstances extends ModelObject { * Builder for workflow instance queries. */ public static class Builder { - List ids = new ArrayList<>(); + List ids = new ArrayList<>(); List types = new ArrayList<>(); - Integer parentWorkflowId; - Integer parentActionId; + Long parentWorkflowId; + Long parentActionId; List states = new ArrayList<>(); List statuses = new ArrayList<>(); String businessKey; @@ -148,7 +148,7 @@ public Builder(QueryWorkflowInstances copy) { * @param newIds The identifiers. * @return this. */ - public Builder addIds(Integer ... newIds) { + public Builder addIds(Long ... newIds) { this.ids.addAll(asList(newIds)); return this; } @@ -168,7 +168,7 @@ public Builder addTypes(String ... newTypes) { * @param parentWorkflowId The parent workflow instance id. * @return this. */ - public Builder setParentWorkflowId(Integer parentWorkflowId) { + public Builder setParentWorkflowId(Long parentWorkflowId) { this.parentWorkflowId = parentWorkflowId; return this; } @@ -178,7 +178,7 @@ public Builder setParentWorkflowId(Integer parentWorkflowId) { * @param parentActionId The parent action id. * @return this. */ - public Builder setParentActionId(Integer parentActionId) { + public Builder setParentActionId(Long parentActionId) { this.parentActionId = parentActionId; return this; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java index b76d94eb9..224f99145 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java @@ -42,7 +42,7 @@ public enum WorkflowInstanceStatus { /** * The workflow instance identifier. */ - public final Integer id; + public final Long id; /** * The id of executor that is currently processing this workflow. May be null. @@ -53,17 +53,17 @@ public enum WorkflowInstanceStatus { * The id of the workflow that created the hierarchy of workflow where this sub workflow belongs to. * Null for workflows that are the root of hierarchy. */ - public final Integer rootWorkflowId; + public final Long rootWorkflowId; /** * The id of the workflow that created this sub workflow. Is null for root workflows. */ - public final Integer parentWorkflowId; + public final Long parentWorkflowId; /** * The id of the workflow action that created this sub workflow. Is null for root workflows. */ - public final Integer parentActionId; + public final Long parentActionId; /** * The current status of the workflow instance. @@ -149,7 +149,7 @@ public enum WorkflowInstanceStatus { /** * Child workflow instance IDs created by this workflow instance, grouped by instance action ID. */ - public Map> childWorkflows; + public Map> childWorkflows; ObjectStringMapper mapper; @@ -228,11 +228,11 @@ public String getStateVariable(String name, String defaultValue) { */ public static class Builder { - Integer id; + Long id; Integer executorId; - Integer rootWorkflowId; - Integer parentWorkflowId; - Integer parentActionId; + Long rootWorkflowId; + Long parentWorkflowId; + Long parentActionId; WorkflowInstanceStatus status; String type; String businessKey; @@ -243,7 +243,7 @@ public static class Builder { final Map originalStateVariables = new LinkedHashMap<>(); final Map stateVariables = new LinkedHashMap<>(); List actions = new ArrayList<>(); - final Map> childWorkflows = new LinkedHashMap<>(); + final Map> childWorkflows = new LinkedHashMap<>(); int retries; DateTime created; DateTime started; @@ -301,7 +301,7 @@ public Builder(WorkflowInstance copy) { * @param id The identifier. * @return this. */ - public Builder setId(Integer id) { + public Builder setId(long id) { this.id = id; return this; } @@ -321,7 +321,7 @@ public Builder setExecutorId(Integer executorId) { * @param rootWorkflowId The identifier. * @return this */ - public Builder setRootWorkflowId(Integer rootWorkflowId) { + public Builder setRootWorkflowId(Long rootWorkflowId) { this.rootWorkflowId = rootWorkflowId; return this; } @@ -331,7 +331,7 @@ public Builder setRootWorkflowId(Integer rootWorkflowId) { * @param parentWorkflowId The identifier. * @return this. */ - public Builder setParentWorkflowId(Integer parentWorkflowId) { + public Builder setParentWorkflowId(Long parentWorkflowId) { this.parentWorkflowId = parentWorkflowId; return this; } @@ -341,7 +341,7 @@ public Builder setParentWorkflowId(Integer parentWorkflowId) { * @param parentActionId The identifier. * @return this. */ - public Builder setParentActionId(Integer parentActionId) { + public Builder setParentActionId(Long parentActionId) { this.parentActionId = parentActionId; return this; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java index a1f673b10..0177fd533 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstanceAction.java @@ -41,12 +41,12 @@ public enum WorkflowActionType { /** * The action id (generated by database). */ - public int id; + public long id; /** * The workflow instance identifier. */ - public final int workflowInstanceId; + public final long workflowInstanceId; /** * The id for executor that processed this state. @@ -106,8 +106,8 @@ public enum WorkflowActionType { */ public static class Builder { - int id; - int workflowInstanceId; + long id; + long workflowInstanceId; int executorId; WorkflowActionType type; String state; @@ -156,7 +156,7 @@ public Builder(WorkflowInstance instance) { * @param id Action id * @return this. */ - public Builder setId(int id) { + public Builder setId(long id) { this.id = id; return this; } @@ -166,7 +166,7 @@ public Builder setId(int id) { * @param workflowInstanceId The workflow instance identifier. * @return this. */ - public Builder setWorkflowInstanceId(int workflowInstanceId) { + public Builder setWorkflowInstanceId(long workflowInstanceId) { this.workflowInstanceId = workflowInstanceId; return this; } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/ArchiveDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/ArchiveDaoTest.java index cb799faea..70e3243c0 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/ArchiveDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/ArchiveDaoTest.java @@ -44,7 +44,7 @@ public class ArchiveDaoTest extends BaseDaoTest { @Test public void listingArchivableWorkflows() { - List expectedArchive = new ArrayList<>(); + List expectedArchive = new ArrayList<>(); storeActiveWorkflow(archiveTime1); storeActiveWorkflow(prodTime1); @@ -53,15 +53,15 @@ public void listingArchivableWorkflows() { expectedArchive.add(storePassiveWorkflow(archiveTime1)); expectedArchive.add(storePassiveWorkflow(archiveTime2)); - List archivableIds = archiveDao.listArchivableWorkflows(archiveTimeLimit, 10); + List archivableIds = archiveDao.listArchivableWorkflows(archiveTimeLimit, 10); assertEqualsInAnyOrder(expectedArchive, archivableIds); } @Test public void listingReturnsOldestRowsAndMaxBatchSizeRows() { - List expectedArchive = new ArrayList<>(); + List expectedArchive = new ArrayList<>(); - int eleventh = storePassiveWorkflow(archiveTime2); + long eleventh = storePassiveWorkflow(archiveTime2); for (int i = 0; i < 9; i++) { expectedArchive.add(storePassiveWorkflow(archiveTime4)); @@ -72,7 +72,7 @@ public void listingReturnsOldestRowsAndMaxBatchSizeRows() { storeActiveWorkflow(prodTime3); storePassiveWorkflow(prodTime4); - List archivableIds = archiveDao.listArchivableWorkflows(archiveTimeLimit, 10); + List archivableIds = archiveDao.listArchivableWorkflows(archiveTimeLimit, 10); Collections.sort(archivableIds); assertEquals(expectedArchive, archivableIds); @@ -83,7 +83,7 @@ public void listingReturnsOldestRowsAndMaxBatchSizeRows() { @Test public void archivingSimpleWorkflowsWorks() { - List archivableWorkflows = new ArrayList<>(); + List archivableWorkflows = new ArrayList<>(); storeActiveWorkflow(archiveTime1); storeActiveWorkflow(prodTime1); @@ -105,15 +105,15 @@ public void archivingSimpleWorkflowsWorks() { @Test public void archivingWorkflowsWithActionsWorks() { - List archivableWorkflows = new ArrayList<>(); - List archivableActions = new ArrayList<>(); + List archivableWorkflows = new ArrayList<>(); + List archivableActions = new ArrayList<>(); storeActions(storeActiveWorkflow(archiveTime1), 3); storeActions(storeActiveWorkflow(prodTime1), 1); storeActions(storePassiveWorkflow(prodTime1), 2); - int archivable1 = storePassiveWorkflow(archiveTime1); - int archivable2 = storePassiveWorkflow(archiveTime2); + long archivable1 = storePassiveWorkflow(archiveTime1); + long archivable2 = storePassiveWorkflow(archiveTime2); archivableActions.addAll(storeActions(archivable1, 1)); archivableActions.addAll(storeActions(archivable2, 3)); @@ -135,23 +135,23 @@ public void archivingWorkflowsWithActionsWorks() { @Test public void archivingWorkflowsWithActionsAndStatesWorks() { - List archivableWorkflows = new ArrayList<>(); - List archivableActions = new ArrayList<>(); + List archivableWorkflows = new ArrayList<>(); + List archivableActions = new ArrayList<>(); List archivableStates = new ArrayList<>(); - int nonArchivableWorkflow1 = storeActiveWorkflow(archiveTime1); + long nonArchivableWorkflow1 = storeActiveWorkflow(archiveTime1); storeStateVariables(nonArchivableWorkflow1, storeActions(nonArchivableWorkflow1, 3), 1); - int nonArchivableWorkflow2 = storeActiveWorkflow(prodTime1); + long nonArchivableWorkflow2 = storeActiveWorkflow(prodTime1); storeStateVariables(nonArchivableWorkflow2, storeActions(nonArchivableWorkflow2, 1), 3); - int nonArchivableWorkflow3 = storePassiveWorkflow(prodTime1); + long nonArchivableWorkflow3 = storePassiveWorkflow(prodTime1); storeStateVariables(nonArchivableWorkflow3, storeActions(nonArchivableWorkflow3, 2), 2); - int archivable1 = storePassiveWorkflow(archiveTime1); - int archivable2 = storePassiveWorkflow(archiveTime2); - List actions1 = storeActions(archivable1, 1); - List actions2 = storeActions(archivable2, 2); + long archivable1 = storePassiveWorkflow(archiveTime1); + long archivable2 = storePassiveWorkflow(archiveTime2); + List actions1 = storeActions(archivable1, 1); + List actions2 = storeActions(archivable2, 2); archivableActions.addAll(actions1); archivableActions.addAll(actions2); @@ -181,8 +181,8 @@ public void archivingWorkflowsWithActionsAndStatesWorks() { assertEquals(variablesCountAfter, variablesCountBefore - archivableStates.size() - requestDataVariableCount); } - private void assertActiveWorkflowsRemoved(List workflowIds) { - for (int id : workflowIds) { + private void assertActiveWorkflowsRemoved(List workflowIds) { + for (long id : workflowIds) { try { workflowInstanceDao.getWorkflowInstance(id, emptySet(), null); fail("Expected workflow " + id + " to be removed"); @@ -192,22 +192,22 @@ private void assertActiveWorkflowsRemoved(List workflowIds) { } } - private void assertArchiveWorkflowsExist(List workflowIds) { - for (int workflowId : workflowIds) { + private void assertArchiveWorkflowsExist(List workflowIds) { + for (long workflowId : workflowIds) { Map archived = getArchivedWorkflow(workflowId); - assertEquals(workflowId, archived.get("id")); + assertEquals(workflowId, ((Number) archived.get("id")).longValue()); } } - private void assertActiveActionsRemoved(List actionIds) { - for (int actionId : actionIds) { + private void assertActiveActionsRemoved(List actionIds) { + for (long actionId : actionIds) { int found = rowCount("select 1 from nflow_workflow_action where id = ?", actionId); assertEquals(0, found, "Found unexpected action " + actionId + " in nflow_workflow_action"); } } - private void assertArchiveActionsExist(List actionIds) { - for (int actionId : actionIds) { + private void assertArchiveActionsExist(List actionIds) { + for (long actionId : actionIds) { int found = rowCount("select 1 from nflow_archive_workflow_action where id = ?", actionId); assertEquals(1, found, "Action " + actionId + " not found in nflow_archive_workflow_action"); } @@ -234,40 +234,40 @@ private int rowCount(String sql, Object... params) { return jdbc.queryForList(sql, params).size(); } - private Map getArchivedWorkflow(int workflowId) { - return jdbc.queryForMap("select * from nflow_archive_workflow where id = ?", new Object[] { workflowId }); + private Map getArchivedWorkflow(long workflowId) { + return jdbc.queryForMap("select * from nflow_archive_workflow where id = ?", workflowId); } - private int storePassiveWorkflow(DateTime modified) { + private long storePassiveWorkflow(DateTime modified) { WorkflowInstance instance = constructWorkflowInstanceBuilder().setStatus(created).setNextActivation(null) .setModified(modified).build(); - int id = insert(instance); + long id = insert(instance); return id; } - private int storeActiveWorkflow(DateTime modified) { + private long storeActiveWorkflow(DateTime modified) { WorkflowInstance instance = constructWorkflowInstanceBuilder().setStatus(created).setModified(modified).build(); - int id = insert(instance); + long id = insert(instance); return id; } - private List storeActions(int workflowId, int actionCount) { - List actionIds = new ArrayList<>(); + private List storeActions(long workflowId, int actionCount) { + List actionIds = new ArrayList<>(); for (int i = 0; i < actionCount; i++) { actionIds.add(storeAction(workflowId)); } return actionIds; } - private List storeStateVariables(int workflowId, List actionIds, int count) { + private List storeStateVariables(long workflowId, List actionIds, int count) { List stateKeys = new ArrayList<>(); - for (int actionId : actionIds) { + for (long actionId : actionIds) { stateKeys.addAll(storeStateVariables(workflowId, actionId, count)); } return stateKeys; } - private List storeStateVariables(int workflowId, int actionId, int stateCount) { + private List storeStateVariables(long workflowId, long actionId, int stateCount) { List stateKeys = new ArrayList<>(); int index = 1; for (int i = 0; i < stateCount; i++) { @@ -276,7 +276,7 @@ private List storeStateVariables(int workflowId, int actionId, int sta return stateKeys; } - private StateKey storeStateVariable(int workflowId, int actionId, String key) { + private StateKey storeStateVariable(long workflowId, long actionId, String key) { String value = key + "_value"; int updated = jdbc.update( "insert into nflow_workflow_state (workflow_id, action_id, state_key, state_value) values (?, ?, ?, ?)", workflowId, @@ -285,19 +285,19 @@ private StateKey storeStateVariable(int workflowId, int actionId, String key) { return new StateKey(workflowId, actionId, key); } - private int storeAction(int workflowId) { + private long storeAction(long workflowId) { WorkflowInstanceAction action = actionBuilder(workflowId).build(); return workflowInstanceDao.insertWorkflowInstanceAction(action); } - private WorkflowInstanceAction.Builder actionBuilder(int workflowId) { + private WorkflowInstanceAction.Builder actionBuilder(long workflowId) { return new WorkflowInstanceAction.Builder().setState("dummyState") .setType(WorkflowInstanceAction.WorkflowActionType.stateExecution).setExecutionStart(DateTime.now()) .setExecutionEnd(DateTime.now()).setWorkflowInstanceId(workflowId); } - private int insert(WorkflowInstance instance) { - int id = workflowInstanceDao.insertWorkflowInstance(instance); + private long insert(WorkflowInstance instance) { + long id = workflowInstanceDao.insertWorkflowInstance(instance); assertTrue(id > 0); DateTime modified = instance.modified; updateModified(id, modified); @@ -306,26 +306,26 @@ private int insert(WorkflowInstance instance) { return id; } - private void updateModified(int workflowId, DateTime modified) { + private void updateModified(long workflowId, DateTime modified) { int updateCount = jdbc.update("update nflow_workflow set modified = ? where id = ?", - new Object[] { DaoUtil.toTimestamp(modified), workflowId }); + DaoUtil.toTimestamp(modified), workflowId); assertEquals(1, updateCount); } - private void assertEqualsInAnyOrder(List expected, List actual) { - List expectedCopy = new ArrayList<>(expected); - List actualCopy = new ArrayList<>(actual); + private > void assertEqualsInAnyOrder(List expected, List actual) { + List expectedCopy = new ArrayList<>(expected); + List actualCopy = new ArrayList<>(actual); Collections.sort(expectedCopy); Collections.sort(actualCopy); assertEquals(expectedCopy, actualCopy); } private static class StateKey extends ModelObject { - public final int workflowId; - public final int actionId; + public final long workflowId; + public final long actionId; public final String stateKey; - public StateKey(int workflowId, int actionId, String stateKey) { + public StateKey(long workflowId, long actionId, String stateKey) { this.workflowId = workflowId; this.actionId = actionId; this.stateKey = stateKey; diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/StatisticsDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/StatisticsDaoTest.java index faf774ec2..289017098 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/StatisticsDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/StatisticsDaoTest.java @@ -50,17 +50,17 @@ public void getQueueStatisticsReasonableResults() { assertThat(queued.minAgeMillis, greaterThanOrEqualTo(0L)); } - private int createInstance() { + private long createInstance() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); - int id = instanceDao.insertWorkflowInstance(i1); + long id = instanceDao.insertWorkflowInstance(i1); return id; } @Test public void getWorkflowDefinitionStatisticsWorks() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(null).setStatus(created).build(); - int id = instanceDao.insertWorkflowInstance(i1); + long id = instanceDao.insertWorkflowInstance(i1); Map> stats = statisticsDao.getWorkflowDefinitionStatistics(i1.type, null, null, null, null); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index b67616cf5..bd87f6765 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -75,7 +75,7 @@ public class WorkflowInstanceDaoTest extends BaseDaoTest { public void roundTripTest() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); WorkflowInstance i2 = dao.getWorkflowInstance(id, EnumSet.allOf(WorkflowInstanceInclude.class), null); assertThat(i2.id, notNullValue()); assertThat(i2.created, notNullValue()); @@ -87,15 +87,15 @@ public void roundTripTest() { public void queryWorkflowInstanceWithAllConditions() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("b", "2"); - int workflowId = dao.insertWorkflowInstance(i1); + long workflowId = dao.insertWorkflowInstance(i1); assertThat(workflowId, not(equalTo(-1))); WorkflowInstanceAction action = constructActionBuilder(workflowId).build(); - int actionId = dao.insertWorkflowInstanceAction(action); + long actionId = dao.insertWorkflowInstanceAction(action); WorkflowInstance child = constructWorkflowInstanceBuilder().setParentWorkflowId(workflowId).setParentActionId(actionId) .build(); - int childId = dao.insertWorkflowInstance(child); + long childId = dao.insertWorkflowInstance(child); assertThat(childId, not(equalTo(-1))); dao.insertWorkflowInstanceAction(constructActionBuilder(childId).build()); dao.insertWorkflowInstanceAction(constructActionBuilder(childId).build()); @@ -124,7 +124,7 @@ public void queryWorkflowInstanceWithAllConditions() { @Test public void queryWorkflowInstanceWithMinimalConditions() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); assertThat(id, not(equalTo(-1))); QueryWorkflowInstances q = new QueryWorkflowInstances.Builder().build(); List createdInstances = dao.queryWorkflowInstances(q); @@ -137,8 +137,8 @@ public void queryWorkflowInstanceWithMinimalConditions() { @Test public void updateWorkflowInstance() throws InterruptedException { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStatus(created).build(); - int id = dao.insertWorkflowInstance(i1); - List ids = dao.pollNextWorkflowInstanceIds(1); + long id = dao.insertWorkflowInstance(i1); + List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); DateTime started = now(); final WorkflowInstance i2 = new WorkflowInstance.Builder( @@ -229,7 +229,7 @@ public void updateWorkflowInstanceCreatesActionWhenStateVariablesAreModified() { private WorkflowInstance.Builder updateInstanceBuilder() { WorkflowInstance instance = constructWorkflowInstanceBuilder().setStatus(created).setBusinessKey("updatedKey").build(); - int id = dao.insertWorkflowInstance(instance); + long id = dao.insertWorkflowInstance(instance); return new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)) // .setStatus(inProgress) // @@ -254,10 +254,10 @@ public void updateWorkflowInstanceWithRootWorkflowAndChildWorkflowsWorks() { dao.updateWorkflowInstanceAfterExecution(instance, action, asList(childWorkflow), emptyWorkflows, false); - Map> childWorkflows = dao.getWorkflowInstance(instance.id, + Map> childWorkflows = dao.getWorkflowInstance(instance.id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows; assertThat(childWorkflows.size(), is(1)); - for (List childIds : childWorkflows.values()) { + for (List childIds : childWorkflows.values()) { assertThat(childIds.size(), is(1)); WorkflowInstance childInstance = dao.getWorkflowInstance(childIds.get(0), emptySet(), null); assertThat(childInstance.rootWorkflowId, is(instance.id)); @@ -269,7 +269,7 @@ public void updateWorkflowInstanceWithRootWorkflowAndChildWorkflowsWorks() { @Test public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStatus(created).build(); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); WorkflowInstance i2 = new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)) // .setStatus(inProgress) // @@ -297,10 +297,10 @@ public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks( dao.updateWorkflowInstanceAfterExecution(i2, a1, asList(childWorkflow1, childWorkflow2), emptyWorkflows, false); - Map> childWorkflows = dao.getWorkflowInstance(id, + Map> childWorkflows = dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows; assertThat(childWorkflows.size(), is(1)); - for (List childIds : childWorkflows.values()) { + for (List childIds : childWorkflows.values()) { assertThat(childIds.size(), is(1)); WorkflowInstance childInstance = dao.getWorkflowInstance(childIds.get(0), EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null); @@ -317,7 +317,7 @@ public void updateWorkflowInstanceWithChildWorkflowHavingExistinExternalIdWorks( public void updateWorkflowInstanceWithNonRootWorkflowAndChildWorkflowsWorks() { // create 3 level hierarchy of workflows WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStatus(created).build(); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); WorkflowInstance i2 = new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) .setState("updateState").setStateText("update text").build(); @@ -330,8 +330,8 @@ public void updateWorkflowInstanceWithNonRootWorkflowAndChildWorkflowsWorks() { dao.updateWorkflowInstanceAfterExecution(i2, a1, asList(middleWorkflow), emptyWorkflows, false); - int middleWorkflowId = -1; - for (List childIds : dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), + long middleWorkflowId = -1; + for (List childIds : dao.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows.values()) { middleWorkflowId = childIds.get(0); } @@ -347,10 +347,10 @@ public void updateWorkflowInstanceWithNonRootWorkflowAndChildWorkflowsWorks() { WorkflowInstance childWorkflow = constructWorkflowInstanceBuilder().setBusinessKey("childKey").build(); dao.updateWorkflowInstanceAfterExecution(middleWorkflow, middleAction, asList(childWorkflow), emptyWorkflows, false); - Map> childWorkflows = dao.getWorkflowInstance(middleWorkflowId, + Map> childWorkflows = dao.getWorkflowInstance(middleWorkflowId, EnumSet.of(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS), null).childWorkflows; assertThat(childWorkflows.size(), is(1)); - for (List childIds : childWorkflows.values()) { + for (List childIds : childWorkflows.values()) { assertThat(childIds.size(), is(1)); WorkflowInstance childInstance = dao.getWorkflowInstance(childIds.get(0), emptySet(), null); assertThat(childInstance.rootWorkflowId, is(id)); @@ -362,7 +362,7 @@ public void updateWorkflowInstanceWithNonRootWorkflowAndChildWorkflowsWorks() { @Test public void updateNotRunningWorkflowInstanceUpdatesStatusForNotRunningInstance() { final WorkflowInstance instance = constructWorkflowInstanceBuilder().build(); - int id = dao.insertWorkflowInstance(instance); + long id = dao.insertWorkflowInstance(instance); WorkflowInstance modifiedInstance = new WorkflowInstance.Builder(instance).setId(id).setState(null).setNextActivation(null) .setStatus(manual).setStateText("modified").build(); boolean updated = dao.updateNotRunningWorkflowInstance(modifiedInstance); @@ -381,7 +381,7 @@ public void processRow(ResultSet rs) throws SQLException { @Test public void updateNotRunningWorkflowInstanceUpdatesStateForNotRunningInstance() { final WorkflowInstance instance = constructWorkflowInstanceBuilder().build(); - int id = dao.insertWorkflowInstance(instance); + long id = dao.insertWorkflowInstance(instance); WorkflowInstance modifiedInstance = new WorkflowInstance.Builder(instance).setId(id).setState("manualState") .setNextActivation(null).setStatus(null).setStateText("modified").build(); boolean updated = dao.updateNotRunningWorkflowInstance(modifiedInstance); @@ -400,7 +400,7 @@ public void processRow(ResultSet rs) throws SQLException { @Test public void updateNotRunningWorkflowInstanceUpdatesNextActivationForNotRunningInstance() { final WorkflowInstance instance = constructWorkflowInstanceBuilder().build(); - int id = dao.insertWorkflowInstance(instance); + long id = dao.insertWorkflowInstance(instance); final DateTime tomorrow = now().plusDays(1); WorkflowInstance modifiedInstance = new WorkflowInstance.Builder(instance).setId(id).setState(null) .setNextActivation(tomorrow).setStatus(null).setStateText("modified").build(); @@ -420,7 +420,7 @@ public void processRow(ResultSet rs) throws SQLException { @Test public void updateNotRunningWorkflowInstanceDoesNotUpdateRunningInstance() { WorkflowInstance instance = constructWorkflowInstanceBuilder().build(); - int id = dao.insertWorkflowInstance(instance); + long id = dao.insertWorkflowInstance(instance); assertThat(jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), id), is(1)); final DateTime tomorrow = now().plusDays(1); WorkflowInstance modifiedInstance = new WorkflowInstance.Builder(instance).setId(id).setState("manualState") @@ -432,7 +432,7 @@ public void updateNotRunningWorkflowInstanceDoesNotUpdateRunningInstance() { @Test public void updatingNextActivationToNullWhileExternalNextActivationIsNotNull() { WorkflowInstance instance = constructWorkflowInstanceBuilder().setNextActivation(null).build(); - int workflowId = dao.insertWorkflowInstance(instance); + long workflowId = dao.insertWorkflowInstance(instance); assertTrue(workflowId > -1); jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), workflowId); assertThat(jdbc.update("update nflow_workflow set external_next_activation = ? where id = ?", @@ -453,7 +453,7 @@ public void updatingNextActivationWhenExternalNextActivationIsEarlier() { WorkflowInstance instance = constructWorkflowInstanceBuilder().setNextActivation(future).build(); - final int workflowId = dao.insertWorkflowInstance(instance); + final long workflowId = dao.insertWorkflowInstance(instance); jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), workflowId); assertTrue(workflowId > -1); @@ -472,7 +472,7 @@ public void updatingNextActivationWhenExternalNextActivationIsLater() { DateTime future = now.plusDays(1); WorkflowInstance instance = constructWorkflowInstanceBuilder().setNextActivation(now).build(); - int workflowId = dao.insertWorkflowInstance(instance); + long workflowId = dao.insertWorkflowInstance(instance); jdbc.update("update nflow_workflow set executor_id = ? where id = ?", executorDao.getExecutorId(), workflowId); assertTrue(workflowId > -1); @@ -486,8 +486,8 @@ public void updatingNextActivationWhenExternalNextActivationIsLater() { @Test public void postgreSQLUpdateWithoutActionIsNotAllowed() throws InterruptedException { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setStatus(created).build(); - int id = dao.insertWorkflowInstance(i1); - List ids = dao.pollNextWorkflowInstanceIds(1); + long id = dao.insertWorkflowInstance(i1); + List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); final WorkflowInstance i2 = new WorkflowInstance.Builder( dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress) @@ -503,7 +503,7 @@ public void fakePostgreSQLupdateWorkflowInstance() { WorkflowInstanceDao d = preparePostgreSQLDao(j); ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); ArgumentCaptor args = ArgumentCaptor.forClass(Object.class); - when(j.queryForObject(sql.capture(), eq(Integer.class), args.capture())).thenReturn(42); + when(j.queryForObject(sql.capture(), eq(Long.class), args.capture())).thenReturn(42L); DateTime started = now(); WorkflowInstance i2 = new WorkflowInstance.Builder().setState("updateState").setStateText("update text") @@ -553,11 +553,11 @@ public void fakePostgreSQLinsertWorkflowInstance() { WorkflowInstanceDao d = preparePostgreSQLDao(j); ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); ArgumentCaptor args = ArgumentCaptor.forClass(Object.class); - when(j.queryForObject(sql.capture(), eq(Integer.class), args.capture())).thenReturn(42); + when(j.queryForObject(sql.capture(), eq(Long.class), args.capture())).thenReturn(42L); DateTime started = now(); WorkflowInstance wf = new WorkflowInstance.Builder().setStatus(inProgress).setState("updateState").setStateText("update text") - .setRootWorkflowId(9283).setParentWorkflowId(110).setParentActionId(421).setNextActivation(started.plusSeconds(1)) + .setRootWorkflowId(9283L).setParentWorkflowId(110L).setParentActionId(421L).setNextActivation(started.plusSeconds(1)) .setRetries(3).setId(43).putStateVariable("A", "B").putStateVariable("C", "D").setSignal(Optional.of(1)) .setStartedIfNotSet(started).build(); @@ -596,7 +596,7 @@ public void insertWorkflowInstanceActionWorks() { DateTime started = now(); final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); final WorkflowInstanceAction a1 = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42) .setExecutionEnd(started.plusMillis(100)).setRetryNo(1).setType(stateExecution).setState("test") .setStateText("state text").setWorkflowInstanceId(id).build(); @@ -616,9 +616,9 @@ public Void doInTransaction(TransactionStatus status) { public void pollNextWorkflowInstances() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)).setExecutorGroup("junit") .build(); - int id = dao.insertWorkflowInstance(i1); - List firstBatch = dao.pollNextWorkflowInstanceIds(100); - List secondBatch = dao.pollNextWorkflowInstanceIds(100); + long id = dao.insertWorkflowInstance(i1); + List firstBatch = dao.pollNextWorkflowInstanceIds(100); + List secondBatch = dao.pollNextWorkflowInstanceIds(100); assertThat(firstBatch.size(), equalTo(1)); assertThat(firstBatch.get(0), equalTo(id)); assertThat(secondBatch.size(), equalTo(0)); @@ -629,8 +629,8 @@ public void fakePostgreSQLpollNextWorkflowInstances() { JdbcTemplate j = mock(JdbcTemplate.class); WorkflowInstanceDao d = preparePostgreSQLDao(j); ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); - when(j.queryForList(sql.capture(), eq(Integer.class))).thenReturn(asList(1, 2, 3)); - assertThat(d.pollNextWorkflowInstanceIds(5), is(asList(1, 2, 3))); + when(j.queryForList(sql.capture(), eq(Long.class))).thenReturn(asList(1L, 2L, 3L)); + assertThat(d.pollNextWorkflowInstanceIds(5), is(asList(1L, 2L, 3L))); assertEquals( "update nflow_workflow set executor_id = 42, status = 'executing'::workflow_status, external_next_activation = null where id in (select id from nflow_workflow where executor_id is null and status in ('created'::workflow_status, 'inProgress'::workflow_status) and next_activation <= current_timestamp and group matches order by next_activation asc limit 5) and executor_id is null returning id", sql.getValue()); @@ -679,7 +679,7 @@ public void pollNextWorkflowInstancesWithPartialRaceCondition() throws Interrupt @Test public void wakesUpSleepingWorkflow() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(null).build(); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); assertThat(dao.getWorkflowInstance(id, emptySet(), null).nextActivation, nullValue()); dao.wakeupWorkflowInstanceIfNotExecuting(id, new ArrayList()); assertThat(dao.getWorkflowInstance(id, emptySet(), null).nextActivation, notNullValue()); @@ -689,8 +689,8 @@ public void wakesUpSleepingWorkflow() { public void doesNotWakeUpRunningWorkflow() { DateTime past = now().minusDays(1); WorkflowInstance i1 = constructWorkflowInstanceBuilder().setExecutorGroup("junit").setNextActivation(past).build(); - int id = dao.insertWorkflowInstance(i1); - List ids = dao.pollNextWorkflowInstanceIds(1); + long id = dao.insertWorkflowInstance(i1); + List ids = dao.pollNextWorkflowInstanceIds(1); assertThat(ids, contains(id)); assertThat(dao.getWorkflowInstance(id, emptySet(), null).nextActivation, is(past)); dao.wakeupWorkflowInstanceIfNotExecuting(id, asList(i1.state)); @@ -700,7 +700,7 @@ public void doesNotWakeUpRunningWorkflow() { @Test public void wakesUpWorkflowInMatchingState() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(null).build(); - int id = dao.insertWorkflowInstance(i1); + long id = dao.insertWorkflowInstance(i1); assertThat(dao.getWorkflowInstance(id, emptySet(), null).nextActivation, nullValue()); dao.wakeupWorkflowInstanceIfNotExecuting(id, asList("otherState", i1.state)); assertThat(dao.getWorkflowInstance(id, emptySet(), null).nextActivation, notNullValue()); @@ -709,7 +709,7 @@ public void wakesUpWorkflowInMatchingState() { @Test public void getWorkflowInstanceStateWorks() { WorkflowInstance instance = constructWorkflowInstanceBuilder().build(); - int workflowInstanceId = dao.insertWorkflowInstance(instance); + long workflowInstanceId = dao.insertWorkflowInstance(instance); String state = dao.getWorkflowInstanceState(workflowInstanceId); @@ -721,18 +721,18 @@ public void insertingSubWorkflowWorks() { DateTime started = now(); final WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("b", "2"); - int parentWorkflowId = dao.insertWorkflowInstance(i1); + long parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, started, started.plusMillis(100)); + long parentActionId = addWorkflowAction(parentWorkflowId, i1, started, started.plusMillis(100)); WorkflowInstance createdInstance = dao.getWorkflowInstance(parentWorkflowId, EnumSet.allOf(WorkflowInstanceInclude.class), null); checkSameWorkflowInfo(i1, createdInstance); - int subWorkflowId1 = addSubWorkflow(parentWorkflowId, parentActionId); + long subWorkflowId1 = addSubWorkflow(parentWorkflowId, parentActionId); assertThat(subWorkflowId1, not(equalTo(-1))); - int subWorkflowId2 = addSubWorkflow(parentWorkflowId, parentActionId); + long subWorkflowId2 = addSubWorkflow(parentWorkflowId, parentActionId); assertThat(subWorkflowId2, not(equalTo(-1))); WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId1, emptySet(), null); @@ -748,16 +748,16 @@ public void wakeUpWorkflowExternallyWorksWithEmptyExpectedStates() { DateTime now = now(); DateTime scheduled = now.plusDays(1); WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).build(); - int parentWorkflowId = dao.insertWorkflowInstance(i1); + long parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); WorkflowInstance createdWorkflow = dao.getWorkflowInstance(parentWorkflowId, emptySet(), null); assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); + long parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); - int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); + long subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId, emptySet(), null); assertThat(subWorkflowId, not(equalTo(-1))); assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId)); @@ -773,16 +773,16 @@ public void wakeUpWorkflowExternallyWorksWithExpectedStates() { DateTime now = now(); DateTime scheduled = now.plusDays(1); WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).build(); - int parentWorkflowId = dao.insertWorkflowInstance(i1); + long parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); WorkflowInstance createdWorkflow = dao.getWorkflowInstance(parentWorkflowId, emptySet(), null); assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); + long parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); - int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); + long subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId, emptySet(), null); assertThat(subWorkflowId, not(equalTo(-1))); assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId)); @@ -798,16 +798,16 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() { DateTime now = now(); DateTime scheduled = now.plusDays(1); WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).setState("unexpected").build(); - int parentWorkflowId = dao.insertWorkflowInstance(i1); + long parentWorkflowId = dao.insertWorkflowInstance(i1); assertThat(parentWorkflowId, not(equalTo(-1))); WorkflowInstance createdWorkflow = dao.getWorkflowInstance(parentWorkflowId, emptySet(), null); assertThat(createdWorkflow.nextActivation, equalTo(scheduled)); - int parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); + long parentActionId = addWorkflowAction(parentWorkflowId, i1, now, now.plusMillis(100)); assertThat(parentActionId, not(equalTo(-1))); - int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); + long subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId); WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId, emptySet(), null); assertThat(subWorkflowId, not(equalTo(-1))); assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId)); @@ -822,7 +822,7 @@ public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() { public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusToInProgressAndInsertsAction() { int crashedExecutorId = 999; insertCrashedExecutor(crashedExecutorId, executorDao.getExecutorGroup()); - int id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId") + long id = dao.insertWorkflowInstance(new WorkflowInstance.Builder().setType("test").setExternalId("extId") .setExecutorGroup(executorDao.getExecutorGroup()).setStatus(executing).setState("processing").build()); int updated = jdbc.update("update nflow_workflow set executor_id = ? where id = ?", crashedExecutorId, id); assertThat(updated, is(1)); @@ -835,7 +835,7 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo assertThat(status, is(inProgress.name())); List actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?", - new WorkflowInstanceActionRowMapper(sqlVariant, Collections.> emptyMap()), id); + new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id); assertThat(actions.size(), is(1)); WorkflowInstanceAction workflowInstanceAction = actions.get(0); assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId())); @@ -848,7 +848,7 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo assertThat(executorId, is(nullValue())); actions = jdbc.query("select * from nflow_workflow_action where workflow_id = ?", - new WorkflowInstanceActionRowMapper(sqlVariant, Collections.> emptyMap()), id); + new WorkflowInstanceActionRowMapper(sqlVariant, Collections.emptyMap()), id); assertThat(actions.size(), is(1)); assertThat(workflowInstanceAction.executorId, is(executorDao.getExecutorId())); assertThat(workflowInstanceAction.type, is(recovery)); @@ -858,7 +858,7 @@ public void recoverWorkflowInstancesFromDeadNodesSetsExecutorIdToNullAndStatusTo @Test public void settingSignalInsertsAction() { WorkflowInstance i = constructWorkflowInstanceBuilder().setBusinessKey("setSignalTest").build(); - int instanceId = dao.insertWorkflowInstance(i); + long instanceId = dao.insertWorkflowInstance(i); dao.setSignal(instanceId, Optional.of(42), "testing", WorkflowActionType.externalChange); @@ -876,7 +876,7 @@ public void settingSignalInsertsAction() { @Test public void clearingSignalInsertsAction() { WorkflowInstance i = constructWorkflowInstanceBuilder().setBusinessKey("clearSignalTest").build(); - int instanceId = dao.insertWorkflowInstance(i); + long instanceId = dao.insertWorkflowInstance(i); dao.setSignal(instanceId, Optional.of(42), "testing", WorkflowActionType.externalChange); @@ -902,12 +902,12 @@ public void clearingSignalInsertsAction() { @Test public void deleteExpiredWorkflowHistory() { WorkflowInstance parentWorkflow = constructWorkflowInstanceBuilder().build(); - int parentWorkflowId = dao.insertWorkflowInstance(parentWorkflow); - int addChildActionId = addWorkflowAction(parentWorkflowId, new WorkflowInstance.Builder(parentWorkflow).build(), now(), + long parentWorkflowId = dao.insertWorkflowInstance(parentWorkflow); + long addChildActionId = addWorkflowAction(parentWorkflowId, new WorkflowInstance.Builder(parentWorkflow).build(), now(), now()); WorkflowInstance childWorkflow = constructWorkflowInstanceBuilder().setParentWorkflowId(parentWorkflowId) .setParentActionId(addChildActionId).build(); - int childWorkflowId = dao.insertWorkflowInstance(childWorkflow); + long childWorkflowId = dao.insertWorkflowInstance(childWorkflow); addWorkflowAction(parentWorkflowId, new WorkflowInstance.Builder(parentWorkflow).putStateVariable("variable", "deletedValue").build(), now(), now().minusDays(1)); @@ -936,8 +936,8 @@ private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance assertTrue(i2.stateVariables.containsKey(entry.getKey())); assertThat(i2.stateVariables.get(entry.getKey()), equalTo(entry.getValue())); } - for (Entry> entry : i1.childWorkflows.entrySet()) { - Integer key = entry.getKey(); + for (Entry> entry : i1.childWorkflows.entrySet()) { + Long key = entry.getKey(); assertTrue(i2.childWorkflows.containsKey(key)); assertThat(i2.childWorkflows.get(key), is(entry.getValue())); } @@ -945,20 +945,15 @@ private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance assertThat(i1.started, equalTo(i2.started)); } - private int addWorkflowAction(int workflowId, final WorkflowInstance instance, DateTime started, DateTime ended) { + private long addWorkflowAction(long workflowId, final WorkflowInstance instance, DateTime started, DateTime ended) { final WorkflowInstanceAction action = new WorkflowInstanceAction.Builder().setExecutionStart(started).setExecutorId(42) .setExecutionEnd(ended).setRetryNo(1).setType(stateExecution).setState("test").setStateText("state text") .setWorkflowInstanceId(workflowId).build(); - int actionId = transaction.execute(new TransactionCallback() { - @Override - public Integer doInTransaction(TransactionStatus status) { - return dao.insertWorkflowInstanceAction(instance, action); - } - }); + long actionId = transaction.execute((TransactionCallback) status -> dao.insertWorkflowInstanceAction(instance, action)); return actionId; } - private int addSubWorkflow(int parentWorkflowId, int parentActionId) { + private long addSubWorkflow(long parentWorkflowId, long parentActionId) { final WorkflowInstance subWorkflow = constructWorkflowInstanceBuilder().setParentWorkflowId(parentWorkflowId) .setParentActionId(parentActionId).build(); return dao.insertWorkflowInstance(subWorkflow); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java index 1c3d6d0a7..6563d1d1f 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java @@ -37,7 +37,7 @@ protected WorkflowInstance.Builder constructWorkflowInstanceBuilder() { .setSignal(Optional.of(42)); } - protected WorkflowInstanceAction.Builder constructActionBuilder(int workflowInstanceID) { + protected WorkflowInstanceAction.Builder constructActionBuilder(long workflowInstanceID) { return new WorkflowInstanceAction.Builder() // .setExecutionStart(DateTime.now()) // .setExecutorId(42) // diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index ec14d1bc1..9eb85ad7a 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -106,9 +106,9 @@ public void exceptionDuringDispatcherExecutionCausesRetry() throws Throwable { @SuppressWarnings("unused") class ExceptionDuringDispatcherExecutionCausesRetry extends MultithreadedTestCase { public void threadDispatcher() { - when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenReturn(ids(1)) + when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenReturn(ids(1L)) .thenThrow(new RuntimeException("Expected: exception during dispatcher execution")) - .thenAnswer(waitForTickAndAnswer(2, ids(2), this)); + .thenAnswer(waitForTickAndAnswer(2, ids(2L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable()); when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor); WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable()); @@ -137,7 +137,7 @@ public void errorDuringDispatcherExecutionStopsDispatcher() throws Throwable { @SuppressWarnings("unused") class ErrorDuringDispatcherExecutionStopsDispatcher extends MultithreadedTestCase { public void threadDispatcher() { - when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenThrow(new AssertionError()).thenReturn(ids(1)); + when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenThrow(new AssertionError()).thenReturn(ids(1L)); try { dispatcher.run(); Assertions.fail("Error should stop the dispatcher"); @@ -185,7 +185,7 @@ public void shutdownBlocksUntilPoolShutdown() throws Throwable { @SuppressWarnings("unused") class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase { public void threadDispatcher() { - when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1), this)); + when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); @@ -224,7 +224,7 @@ public void threadDispatcher() { public Object answer(InvocationOnMock invocation) throws Throwable { waitForTick(2); getThreadByName("threadShutdown").interrupt(); - return ids(1); + return ids(1L); } }); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); @@ -303,7 +303,7 @@ public void dispatcherLogsWarningWhenAllThreadsArePotentiallyStuck() throws Thro class DispatcherLogsWarning extends MultithreadedTestCase { public void threadDispatcher() throws InterruptedException { when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())) - .thenAnswer(waitForTickAndAnswer(2, Collections. emptyList(), this)); + .thenAnswer(waitForTickAndAnswer(2, Collections.emptyList(), this)); when(executorFactory.getPotentiallyStuckProcessors()).thenReturn(executor.getThreadCount()); dispatcher.run(); } @@ -358,9 +358,9 @@ public void run() { }; } - WorkflowStateProcessor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) { + WorkflowStateProcessor fakeWorkflowExecutor(long instanceId, final Runnable fakeCommand) { return new WorkflowStateProcessor(instanceId, null, null, null, null, null, env, - new ConcurrentHashMap(), (WorkflowExecutorListener) null) { + new ConcurrentHashMap<>(), (WorkflowExecutorListener) null) { @Override public void run() { fakeCommand.run(); @@ -368,7 +368,7 @@ public void run() { }; } - Answer> waitForTickAndAnswer(final int tick, final List answer, final MultithreadedTestCase mtc) { + Answer> waitForTickAndAnswer(final int tick, final List answer, final MultithreadedTestCase mtc) { return invocation -> { mtc.waitForTick(tick); return answer; @@ -383,7 +383,7 @@ protected BlockingQueue createQueue(int queueCapacity) { } } - static List ids(Integer... ids) { + static List ids(Long... ids) { return asList(ids); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index c54c00a3d..e48af1268 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -73,8 +73,8 @@ public void checkIfStateProcessorsAreStuckLogsLongRunningInstance() { WorkflowStateProcessor executor2 = mock(WorkflowStateProcessor.class); when(executor1.getStartTimeSeconds()).thenReturn(currentTimeMillis() / 1000 - STUCK_THREAD_THRESHOLD - 1); when(executor2.getStartTimeSeconds()).thenReturn(currentTimeMillis() / 1000 - STUCK_THREAD_THRESHOLD); - factory.processingInstances.put(111, executor1); - factory.processingInstances.put(222, executor2); + factory.processingInstances.put(111L, executor1); + factory.processingInstances.put(222L, executor2); int potentiallyStuckProcessors = factory.getPotentiallyStuckProcessors(); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 191d3f3c3..d5d6b7234 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -27,6 +27,7 @@ import static org.joda.time.DateTimeUtils.setCurrentMillisSystem; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; @@ -158,7 +159,7 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); - static Map processingInstances; + static Map processingInstances; private final TestWorkflow testWorkflowDef = new TestWorkflow(); @@ -421,12 +422,12 @@ public void doNothingWhenNotifyingParentWithoutParentWorkflowId() { executor.run(); - verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(List.class)); + verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Long.class), any(List.class)); } @Test public void whenWakingUpParentWorkflowSucceeds() { - WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent") + WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999L).setType("wake-test").setState("wakeParent") .build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType"); @@ -443,7 +444,7 @@ public void whenWakingUpParentWorkflowSucceeds() { @Test public void whenWakingUpParentWorkflowFails() { - WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent") + WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999L).setType("wake-test").setState("wakeParent") .build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType"); @@ -460,7 +461,7 @@ public void whenWakingUpParentWorkflowFails() { @Test public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() { - WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("simple-test").setState("processing") + WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999L).setType("simple-test").setState("processing") .build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn(BulkWorkflow.BULK_WORKFLOW_TYPE); @@ -819,7 +820,7 @@ public void deleteWorkflowInstanceHistoryNotExecutedWithDefaultSettings() { when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); executor.run(); - verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(any(), any()); + verify(workflowInstanceDao, never()).deleteWorkflowInstanceHistory(anyLong(), any()); } @Test @@ -897,7 +898,7 @@ public NextAction process(StateExecution execution, @StateVar("string") String s @StateVar("pojo") Pojo pojo, @StateVar(value = "nullPojo", instantiateIfNotExists = true) Pojo pojo2, @StateVar(value = "immutablePojo", readOnly = true) Pojo unmodifiablePojo, @StateVar("nullInt") int zero, @StateVar("mutableString") Mutable mutableString) { - assertThat(execution.getWorkflowInstanceId(), is(1)); + assertThat(execution.getWorkflowInstanceId(), is(1L)); assertThat(execution.getWorkflowInstanceExternalId(), is(notNullValue())); Pojo pojo1 = execution.getVariable("pojo", Pojo.class); assertThat(pojo.field, is(pojo1.field)); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java index fa1ea3364..dfd1da74d 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/workflow/StateExecutionImplTest.java @@ -102,7 +102,7 @@ public void addWorkflows() { @Test public void wakeUpParentWorkflowSetsWakeUpStates() { instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState") - .setBusinessKey("business").setParentWorkflowId(123).build(); + .setBusinessKey("business").setParentWorkflowId(123L).build(); execution = createExecution(instance); assertThat(execution.getWakeUpParentWorkflowStates().isPresent(), is(false)); execution.wakeUpParentWorkflow(); @@ -120,7 +120,7 @@ public void nonChildWorkflowCannotWakeUpParent() { @Test public void queryChildWorkflowsIsRestrictedToChildsOfCurrentWorkflow() { QueryWorkflowInstances query = new QueryWorkflowInstances.Builder() - .setParentActionId(42).addTypes("a","b") + .setParentActionId(42L).addTypes("a","b") .setBusinessKey("123").build(); List result = singletonList(mock(WorkflowInstance.class)); @@ -129,7 +129,7 @@ public void queryChildWorkflowsIsRestrictedToChildsOfCurrentWorkflow() { assertThat(execution.queryChildWorkflows(query), is(result)); QueryWorkflowInstances actualQuery = queryCaptor.getValue(); - assertThat(actualQuery.parentWorkflowId, is(99)); + assertThat(actualQuery.parentWorkflowId, is(99L)); assertThat(actualQuery.types, is(asList("a", "b"))); assertThat(actualQuery.businessKey, is("123")); } @@ -142,7 +142,7 @@ public void getAllChildWorkflowsQueriesAllChildWorkflows() { assertThat(execution.getAllChildWorkflows(), is(result)); QueryWorkflowInstances actualQuery = queryCaptor.getValue(); - assertThat(actualQuery.parentWorkflowId, is(99)); + assertThat(actualQuery.parentWorkflowId, is(99L)); assertThat(actualQuery.types, is(Collections.emptyList())); assertThat(actualQuery.businessKey, is(nullValue())); } @@ -254,10 +254,10 @@ public void getParentIdReturnsEmptyWhenParentWorkflowIdIsNull() { @Test public void getParentIdReturnsParentWorkflowId() { - instance = new WorkflowInstance.Builder().setParentWorkflowId(42).build(); + instance = new WorkflowInstance.Builder().setParentWorkflowId(42L).build(); execution = createExecution(instance); - assertThat(execution.getParentId(), is(Optional.of(42))); + assertThat(execution.getParentId(), is(Optional.of(42L))); } @Test diff --git a/nflow-engine/src/test/java/io/nflow/engine/service/ArchiveServiceTest.java b/nflow-engine/src/test/java/io/nflow/engine/service/ArchiveServiceTest.java index 98fbd054c..a0734ca83 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/service/ArchiveServiceTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/service/ArchiveServiceTest.java @@ -29,8 +29,8 @@ public class ArchiveServiceTest { @Mock private ArchiveDao dao; private final DateTime limit = new DateTime(2015, 7, 10, 19, 57, 0, 0); - private final List emptyList = Collections.emptyList(); - private final List dataList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + private final List emptyList = Collections.emptyList(); + private final List dataList = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L); @BeforeEach public void setup() { diff --git a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java index d0c76a505..998338e41 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java @@ -88,8 +88,8 @@ public void getWorkflowInstance() { public void insertWorkflowInstanceWorks() { WorkflowInstance i = constructWorkflowInstanceBuilder().setStatus(created).setExternalId("123").setState(null).build(); when(workflowInstancePreProcessor.process(i)).thenReturn(i); - when(workflowInstanceDao.insertWorkflowInstance(stored.capture())).thenReturn(42); - assertThat(service.insertWorkflowInstance(i), is(42)); + when(workflowInstanceDao.insertWorkflowInstance(stored.capture())).thenReturn(42L); + assertThat(service.insertWorkflowInstance(i), is(42L)); assertThat(stored.getValue().externalId, is("123")); assertThat(stored.getValue().status, is(created)); } @@ -152,7 +152,7 @@ public void updateRunningWorkflowInstanceFails() { public void wakeUpWorkflowInstance() { List states = asList("abc", "xyz"); service.wakeupWorkflowInstance(99, states); - verify(workflowInstanceDao).wakeupWorkflowInstanceIfNotExecuting(99L, states); + verify(workflowInstanceDao).wakeupWorkflowInstanceIfNotExecuting(99, states); } @Test diff --git a/nflow-perf-test/src/main/java/io/nflow/performance/client/LoadGenerator.java b/nflow-perf-test/src/main/java/io/nflow/performance/client/LoadGenerator.java index bd747f623..5254b23d0 100644 --- a/nflow-perf-test/src/main/java/io/nflow/performance/client/LoadGenerator.java +++ b/nflow-perf-test/src/main/java/io/nflow/performance/client/LoadGenerator.java @@ -33,8 +33,8 @@ public class LoadGenerator { private static final StopWatch elapsedTime = new StopWatch(); - private List generateSomeLoad(int threadCount, int loadCount) throws InterruptedException { - List allInstanceIds = new LinkedList<>(); + private List generateSomeLoad(int threadCount, int loadCount) throws InterruptedException { + List allInstanceIds = new LinkedList<>(); List threads = new LinkedList<>(); for (int i = 0; i < threadCount; i++) { LoadGeneratorThread t = new LoadGeneratorThread(i, client, loadCount); @@ -55,7 +55,7 @@ private static final class LoadGeneratorThread extends Thread { private final PerfTestClient c; private final int loadCount; private final StopWatch stopWatch = new StopWatch(); - private final List instanceIds = new LinkedList<>(); + private final List instanceIds = new LinkedList<>(); public LoadGeneratorThread(int id, PerfTestClient c, int loadCount) { this.id = id; @@ -74,7 +74,7 @@ public void run() { logger.info("Generated {} items took {} msec for {}", loadCount, stopWatch.getTime(), id); } - public List getInstanceIds() { + public List getInstanceIds() { return instanceIds; } } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 718c79c69..434fc8cd1 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -91,7 +91,7 @@ public List listWorkflowDefinitions(final List listWorkflowInstances(final List ids, final List types, - final Integer parentWorkflowId, final Integer parentActionId, final List states, + public Collection listWorkflowInstances(final List ids, final List types, + final Long parentWorkflowId, final Long parentActionId, final List states, final List statuses, final String businessKey, final String externalId, final String include, final Long maxResults, final Long maxActions, final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) { Set includeStrings = parseIncludeStrings(include).collect(toSet()); QueryWorkflowInstances q = new QueryWorkflowInstances.Builder() // - .addIds(ids.toArray(new Integer[ids.size()])) // + .addIds(ids.toArray(new Long[ids.size()])) // .addTypes(types.toArray(new String[types.size()])) // .setParentWorkflowId(parentWorkflowId) // .setParentActionId(parentActionId) // @@ -167,7 +167,7 @@ private Stream parseIncludeStrings(String include) { return Stream.of(trimToEmpty(include).split(",")); } - public ListWorkflowInstanceResponse fetchWorkflowInstance(final int id, final String include, final Long maxActions, + public ListWorkflowInstanceResponse fetchWorkflowInstance(final long id, final String include, final Long maxActions, final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) throws EmptyResultDataAccessException { Set includes = parseIncludeEnums(include); diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/Action.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/Action.java index 6f0b1d180..6d0c4975b 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/Action.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/Action.java @@ -14,7 +14,7 @@ public class Action extends ModelObject { @ApiModelProperty(value = "Identifier of the workflow instance action") - public int id; + public long id; @ApiModelProperty(value = "Type of state (stateExecution, stateExecutionFailed, externalChange, recovery)") public String type; @ApiModelProperty(value = "Name of state") @@ -35,12 +35,12 @@ public class Action extends ModelObject { public Action() { } - public Action(int id, String type, String state, String stateText, int retryNo, DateTime executionStartTime, DateTime executionEndTime, + public Action(long id, String type, String state, String stateText, int retryNo, DateTime executionStartTime, DateTime executionEndTime, int executorId) { this(id, type, state, stateText, retryNo, executionStartTime, executionEndTime, executorId, null); } - public Action(int id, String type, String state, String stateText, int retryNo, DateTime executionStartTime, DateTime executionEndTime, + public Action(long id, String type, String state, String stateText, int retryNo, DateTime executionStartTime, DateTime executionEndTime, int executorId, Map updatedStateVariables) { this(); this.id = id; diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java index 94ec341ca..832dbc7f0 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java @@ -43,7 +43,7 @@ public class CreateWorkflowInstanceRequest extends ModelObject { public Boolean activate; @ApiModelProperty("Create the workflow as a child of the given parent workflow.") - public Integer parentWorkflowId; + public Long parentWorkflowId; @ApiModelProperty("State variables to be set for the new workflow instance.") public Map stateVariables = new HashMap<>(); diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceResponse.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceResponse.java index a8e26e185..fb2798d44 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceResponse.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceResponse.java @@ -12,7 +12,7 @@ public class CreateWorkflowInstanceResponse extends ModelObject { @ApiModelProperty(value = "Idenfier of the new workflow instance", required = true) - public int id; + public long id; @ApiModelProperty(value = "Workflow definition type", required = true) public String type; diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowInstanceResponse.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowInstanceResponse.java index 94b573029..6d259cf56 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowInstanceResponse.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowInstanceResponse.java @@ -19,7 +19,7 @@ public class ListWorkflowInstanceResponse extends ModelObject { @ApiModelProperty(value = "Identifier of the workflow instance", required = true) - public int id; + public long id; @ApiModelProperty(value = "Workflow instance status (created, executing, inProgress, finished, manual)", required = true) public String status; @@ -28,10 +28,10 @@ public class ListWorkflowInstanceResponse extends ModelObject { public String type; @ApiModelProperty("Parent workflow instance id for child workflows") - public Integer parentWorkflowId; + public Long parentWorkflowId; @ApiModelProperty("Parent workflow instance action id for child workflows (action that created the child workflow)") - public Integer parentActionId; + public Long parentActionId; @ApiModelProperty("Main business key or identifier for the workflow instance") public String businessKey; @@ -68,7 +68,7 @@ public class ListWorkflowInstanceResponse extends ModelObject { public DateTime started; @ApiModelProperty("Child workflow instance IDs created by this instance, grouped by instance action ID") - public Map> childWorkflows; + public Map> childWorkflows; @ApiModelProperty("Current signal value") public Integer signal; diff --git a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java index 09ca484dc..3f273344c 100644 --- a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java +++ b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverterTest.java @@ -59,7 +59,7 @@ public void convertWithActionsWorks() throws IOException { DateTime started = now(); WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setStatus(inProgress).setType("dummy") - .setBusinessKey("businessKey").setParentWorkflowId(942).setParentActionId(842).setExternalId("externalId") + .setBusinessKey("businessKey").setParentWorkflowId(942L).setParentActionId(842L).setExternalId("externalId") .setState("cState").setStateText("cState desc").setNextActivation(now()).setActions(asList(a)) .setCreated(now().minusMinutes(1)).setCreated(now().minusHours(2)).setModified(now().minusHours(1)).setRetries(42) .setStateVariables(stateVariables).setSignal(Optional.of(42)).setStartedIfNotSet(started).build(); @@ -109,7 +109,7 @@ public void convertWithActionStateVariablesWorks() throws IOException { .setExecutorId(999).setUpdatedStateVariables(stateVariables).build(); WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setStatus(inProgress).setType("dummy") - .setBusinessKey("businessKey").setParentWorkflowId(942).setParentActionId(842).setExternalId("externalId") + .setBusinessKey("businessKey").setParentWorkflowId(942L).setParentActionId(842L).setExternalId("externalId") .setState("cState").setStateText("cState desc").setNextActivation(now()).setActions(asList(a)) .setCreated(now().minusMinutes(1)).setCreated(now().minusHours(2)).setModified(now().minusHours(1)).setRetries(42) .setSignal(Optional.empty()).build(); diff --git a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java index 6c7662838..f8b4fa741 100644 --- a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java +++ b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java @@ -90,7 +90,7 @@ public Response corsPreflight() { public Response createWorkflowInstance( @Valid @ApiParam(value = "Submitted workflow instance information", required = true) CreateWorkflowInstanceRequest req) { WorkflowInstance instance = createWorkflowConverter.convert(req); - int id = workflowInstances.insertWorkflowInstance(instance); + long id = workflowInstances.insertWorkflowInstance(instance); instance = workflowInstances.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null); return Response.created(URI.create(String.valueOf(id))).entity(createWorkflowConverter.convert(instance)).build(); } @@ -101,7 +101,7 @@ public Response createWorkflowInstance( + "transition via nFlow Explorer or a business UI.") @ApiResponses({ @ApiResponse(code = 204, message = "If update was successful"), @ApiResponse(code = 409, message = "If workflow was executing and no update was done") }) - public Response updateWorkflowInstance(@ApiParam("Internal id for workflow instance") @PathParam("id") int id, + public Response updateWorkflowInstance(@ApiParam("Internal id for workflow instance") @PathParam("id") long id, @ApiParam("Submitted workflow instance information") UpdateWorkflowInstanceRequest req) { boolean updated = super.updateWorkflowInstance(id, req, workflowInstanceFactory, workflowInstances); return (updated ? noContent() : status(CONFLICT)).build(); @@ -111,7 +111,7 @@ public Response updateWorkflowInstance(@ApiParam("Internal id for workflow insta @Path("/id/{id}") @ApiOperation(value = "Fetch a workflow instance", notes = "Fetch full state and action history of a single workflow instance.") public ListWorkflowInstanceResponse fetchWorkflowInstance( - @ApiParam("Internal id for workflow instance") @PathParam("id") int id, + @ApiParam("Internal id for workflow instance") @PathParam("id") long id, @QueryParam("include") @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { try { @@ -124,10 +124,10 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance( @GET @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") public Collection listWorkflowInstances( - @QueryParam("id") @ApiParam("Internal id of workflow instance") List ids, + @QueryParam("id") @ApiParam("Internal id of workflow instance") List ids, @QueryParam("type") @ApiParam("Workflow definition type of workflow instance") List types, - @QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Integer parentWorkflowId, - @QueryParam("parentActionId") @ApiParam("Id of parent workflow instance action") Integer parentActionId, + @QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Long parentWorkflowId, + @QueryParam("parentActionId") @ApiParam("Id of parent workflow instance action") Long parentActionId, @QueryParam("state") @ApiParam("Current state of workflow instance") List states, @QueryParam("status") @ApiParam("Current status of workflow instance") List statuses, @QueryParam("businessKey") @ApiParam("Business key for workflow instance") String businessKey, @@ -144,7 +144,7 @@ public Collection listWorkflowInstances( @Path("/{id}/signal") @ApiOperation(value = "Set workflow instance signal value", notes = "The service may be used for example to interrupt executing workflow instance.") @ApiResponses({ @ApiResponse(code = 200, message = "When operation was successful") }) - public Response setSignal(@ApiParam("Internal id for workflow instance") @PathParam("id") int id, + public Response setSignal(@ApiParam("Internal id for workflow instance") @PathParam("id") long id, @Valid @ApiParam("New signal value") SetSignalRequest req) { boolean updated = workflowInstances.setSignal(id, ofNullable(req.signal), req.reason, WorkflowActionType.externalChange); return (updated ? ok("Signal was set successfully") : ok("Signal was not set")).build(); @@ -154,7 +154,7 @@ public Response setSignal(@ApiParam("Internal id for workflow instance") @PathPa @Path("/{id}/wakeup") @ApiOperation(value = "Wake up sleeping workflow instance. If expected states are given, only wake up if the instance is in one of the expected states.") @ApiResponses({ @ApiResponse(code = 200, message = "When workflow wakeup was attempted")}) - public WakeupResponse wakeup(@ApiParam("Internal id for workflow instance") @PathParam("id") int id, + public WakeupResponse wakeup(@ApiParam("Internal id for workflow instance") @PathParam("id") long id, @Valid @ApiParam("Expected states") WakeupRequest req) { WakeupResponse response = new WakeupResponse(); List expectedStates = ofNullable(req.expectedStates).orElseGet(Collections::emptyList); diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index b1b6b67b6..5b0ea0285 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -89,7 +89,7 @@ public void createWorkflowInstanceWorks() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); WorkflowInstance inst = mock(WorkflowInstance.class); when(createWorkflowConverter.convert(req)).thenReturn(inst); - when(workflowInstances.insertWorkflowInstance(inst)).thenReturn(1); + when(workflowInstances.insertWorkflowInstance(inst)).thenReturn(1L); try (Response r = resource.createWorkflowInstance(req)) { assertThat(r.getStatus(), is(201)); assertThat(r.getHeaderString("Location"), is("1")); @@ -175,13 +175,13 @@ public void whenUpdatingStateVariablesUpdateWorkflowInstanceWorks() { @Test public void listWorkflowInstancesWorks() { - resource.listWorkflowInstances(asList(42), asList("type"), 99, 88, asList("state"), + resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null); verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( - hasField("ids", contains(42)), + hasField("ids", contains(42L)), hasField("types", contains("type")), - hasField("parentWorkflowId", is(99)), - hasField("parentActionId", is(88)), + hasField("parentWorkflowId", is(99L)), + hasField("parentActionId", is(88L)), hasField("states", contains("state")), hasField("statuses", contains(WorkflowInstanceStatus.created)), hasField("businessKey", equalTo("businessKey")), @@ -196,14 +196,14 @@ public void listWorkflowInstancesWorks() { @Test public void listWorkflowInstancesWorksWithAllIncludes() { - resource.listWorkflowInstances(asList(42), asList("type"), 99, 88, asList("state"), + resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), asList(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing), "businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L); verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( - hasField("ids", contains(42)), + hasField("ids", contains(42L)), hasField("types", contains("type")), - hasField("parentWorkflowId", is(99)), - hasField("parentActionId", is(88)), + hasField("parentWorkflowId", is(99L)), + hasField("parentActionId", is(88L)), hasField("states", contains("state")), hasField("statuses", contains(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing)), hasField("businessKey", equalTo("businessKey")), diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index 31f7519d2..f3e67a237 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -73,7 +73,7 @@ public WorkflowInstanceResource(WorkflowInstanceService workflowInstances, Creat public ResponseEntity createWorkflowInstance( @RequestBody @ApiParam(value = "Submitted workflow instance information", required = true) CreateWorkflowInstanceRequest req) { WorkflowInstance instance = createWorkflowConverter.convert(req); - int id = workflowInstances.insertWorkflowInstance(instance); + long id = workflowInstances.insertWorkflowInstance(instance); instance = workflowInstances.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES), null); return ResponseEntity.created(URI.create(String.valueOf(id))).body(createWorkflowConverter.convert(instance)); } @@ -83,7 +83,7 @@ public ResponseEntity createWorkflowInstance( + "transition via nFlow Explorer or a business UI.") @ApiResponses({ @ApiResponse(code = 204, message = "If update was successful"), @ApiResponse(code = 409, message = "If workflow was executing and no update was done") }) - public ResponseEntity updateWorkflowInstance(@ApiParam("Internal id for workflow instance") @PathVariable("id") int id, + public ResponseEntity updateWorkflowInstance(@ApiParam("Internal id for workflow instance") @PathVariable("id") long id, @RequestBody @ApiParam("Submitted workflow instance information") UpdateWorkflowInstanceRequest req) { boolean updated = super.updateWorkflowInstance(id, req, workflowInstanceFactory, workflowInstances); return (updated ? ResponseEntity.noContent() : ResponseEntity.status(HttpStatus.CONFLICT)).build(); @@ -92,7 +92,7 @@ public ResponseEntity updateWorkflowInstance(@ApiParam("Internal id for workf @GetMapping(path = "/id/{id}") @ApiOperation(value = "Fetch a workflow instance", notes = "Fetch full state and action history of a single workflow instance.") public ResponseEntity fetchWorkflowInstance( - @ApiParam("Internal id for workflow instance") @PathVariable("id") int id, + @ApiParam("Internal id for workflow instance") @PathVariable("id") long id, @RequestParam(value = "include", required = false) @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { try { @@ -106,10 +106,10 @@ public ResponseEntity fetchWorkflowInstance( @GetMapping @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") public Collection listWorkflowInstances( - @RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List ids, + @RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List ids, @RequestParam(value = "type", defaultValue = "") @ApiParam("Workflow definition type of workflow instance") List types, - @RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Integer parentWorkflowId, - @RequestParam(value = "parentActionId", required = false) @ApiParam("Id of parent workflow instance action") Integer parentActionId, + @RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Long parentWorkflowId, + @RequestParam(value = "parentActionId", required = false) @ApiParam("Id of parent workflow instance action") Long parentActionId, @RequestParam(value = "state", defaultValue = "") @ApiParam("Current state of workflow instance") List states, @RequestParam(value = "status", defaultValue = "") @ApiParam("Current status of workflow instance") List statuses, @RequestParam(value = "businessKey", required = false) @ApiParam("Business key for workflow instance") String businessKey, @@ -125,7 +125,7 @@ public Collection listWorkflowInstances( @PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE) @ApiOperation(value = "Set workflow instance signal value", notes = "The service may be used for example to interrupt executing workflow instance.") @ApiResponses({ @ApiResponse(code = 200, message = "When operation was successful") }) - public ResponseEntity setSignal(@ApiParam("Internal id for workflow instance") @PathVariable("id") int id, + public ResponseEntity setSignal(@ApiParam("Internal id for workflow instance") @PathVariable("id") long id, @RequestBody @Valid @ApiParam("New signal value") SetSignalRequest req) { boolean updated = workflowInstances.setSignal(id, ofNullable(req.signal), req.reason, WorkflowActionType.externalChange); return (updated ? ResponseEntity.ok("Signal was set successfully") : ResponseEntity.ok("Signal was not set")); @@ -134,7 +134,7 @@ public ResponseEntity setSignal(@ApiParam("Internal id for workflow instance" @PutMapping(path = "/{id}/wakeup", consumes = APPLICATION_JSON_VALUE) @ApiOperation(value = "Wake up sleeping workflow instance. If expected states are given, only wake up if the instance is in one of the expected states.") @ApiResponses({ @ApiResponse(code = 200, message = "When workflow wakeup was attempted")}) - public WakeupResponse wakeup(@ApiParam("Internal id for workflow instance") @PathVariable("id") int id, + public WakeupResponse wakeup(@ApiParam("Internal id for workflow instance") @PathVariable("id") long id, @RequestBody @Valid @ApiParam("Expected states") WakeupRequest req) { WakeupResponse response = new WakeupResponse(); List expectedStates = ofNullable(req.expectedStates).orElseGet(Collections::emptyList); diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java index 5fd95db03..12b13aa2b 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java @@ -52,13 +52,13 @@ private static void insertDemoWorkflows() { WorkflowInstanceFactory workflowInstanceFactory = applicationContext.getBean(WorkflowInstanceFactory.class); WorkflowInstance instance = new WorkflowInstance.Builder().setType(DEMO_WORKFLOW_TYPE) .setState(DemoWorkflow.State.begin.name()).build(); - int id = workflowInstanceService.insertWorkflowInstance(instance); + long id = workflowInstanceService.insertWorkflowInstance(instance); instance = workflowInstanceService.getWorkflowInstance(id, emptySet(), null); WorkflowInstanceAction action = new WorkflowInstanceAction.Builder(instance).setType(externalChange).setExecutionEnd(now()) .build(); workflowInstanceService.updateWorkflowInstance(instance, action); instance = workflowInstanceService.getWorkflowInstance(id, EnumSet.of(WorkflowInstanceInclude.ACTIONS), 1L); - int actionId = instance.actions.get(0).id; + long actionId = instance.actions.get(0).id; WorkflowInstance child = new WorkflowInstance.Builder().setType(DEMO_WORKFLOW_TYPE).setState(DemoWorkflow.State.begin.name()) .setParentActionId(actionId).setParentWorkflowId(id).build(); workflowInstanceService.insertWorkflowInstance(child); diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/domain/CreateCreditApplicationRequest.java b/nflow-tests/src/main/java/io/nflow/tests/demo/domain/CreateCreditApplicationRequest.java index 492613a6c..d8d668bde 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/domain/CreateCreditApplicationRequest.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/domain/CreateCreditApplicationRequest.java @@ -6,6 +6,6 @@ public class CreateCreditApplicationRequest extends AbstractRequest { public int amount; public String clientId; public String productId; - public int processWorkflowId; + public long processWorkflowId; } diff --git a/nflow-tests/src/test/java/io/nflow/tests/AbstractNflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/AbstractNflowTest.java index baf826019..59e5d97be 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/AbstractNflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/AbstractNflowTest.java @@ -76,31 +76,31 @@ public void setStatisticsResource(@Named("statistics") WebClient client) { this.statisticsResource = fromClient(client, true).to(newUri, false); } - protected ListWorkflowInstanceResponse getWorkflowInstance(int instanceId) { + protected ListWorkflowInstanceResponse getWorkflowInstance(long instanceId) { return getInstanceIdResource(instanceId).query("include", "currentStateVariables,actions,actionStateVariables,childWorkflows") .get(ListWorkflowInstanceResponse.class); } - protected WakeupResponse wakeup(int instanceId, List expectedStates) { + protected WakeupResponse wakeup(long instanceId, List expectedStates) { WakeupRequest request = new WakeupRequest(); request.expectedStates = expectedStates; return getInstanceResource(instanceId).path("wakeup").put(request, WakeupResponse.class); } - protected String setSignal(int instanceId, int signal, String reason) { + protected String setSignal(long instanceId, int signal, String reason) { SetSignalRequest request = new SetSignalRequest(); request.signal = signal; request.reason = reason; return getInstanceResource(instanceId).path("signal").put(request, String.class); } - private WebClient getInstanceResource(int instanceId) { - WebClient client = fromClient(workflowInstanceResource, true).path(Integer.toString(instanceId)); + private WebClient getInstanceResource(long instanceId) { + WebClient client = fromClient(workflowInstanceResource, true).path(Long.toString(instanceId)); return client; } - private WebClient getInstanceIdResource(int instanceId) { - WebClient client = fromClient(workflowInstanceIdResource, true).path(Integer.toString(instanceId)); + private WebClient getInstanceIdResource(long instanceId) { + WebClient client = fromClient(workflowInstanceIdResource, true).path(Long.toString(instanceId)); return client; } @@ -119,7 +119,7 @@ public WorkflowDefinitionStatisticsResponse getDefinitionStatistics(String defin return client.get(WorkflowDefinitionStatisticsResponse.class); } - protected ListWorkflowInstanceResponse getWorkflowInstance(int id, String expectedState) throws InterruptedException { + protected ListWorkflowInstanceResponse getWorkflowInstance(long id, String expectedState) throws InterruptedException { ListWorkflowInstanceResponse wf = null; do { sleep(200); @@ -128,7 +128,7 @@ protected ListWorkflowInstanceResponse getWorkflowInstance(int id, String expect return wf; } - protected ListWorkflowInstanceResponse getWorkflowInstanceWithTimeout(int id, String expectedState, Duration timeout) { + protected ListWorkflowInstanceResponse getWorkflowInstanceWithTimeout(long id, String expectedState, Duration timeout) { return assertTimeoutPreemptively(timeout, () -> { ListWorkflowInstanceResponse resp; do { @@ -138,7 +138,7 @@ protected ListWorkflowInstanceResponse getWorkflowInstanceWithTimeout(int id, St }); } - protected void assertWorkflowInstance(int instanceId, WorkflowInstanceValidator... validators) { + protected void assertWorkflowInstance(long instanceId, WorkflowInstanceValidator... validators) { ListWorkflowInstanceResponse instance = getWorkflowInstance(instanceId); for (WorkflowInstanceValidator validator : validators) { validator.validate(instance); @@ -168,7 +168,7 @@ protected ObjectMapper nflowObjectMapper() { return mapper; } - protected String updateWorkflowInstance(int instanceId, UpdateWorkflowInstanceRequest request) { + protected String updateWorkflowInstance(long instanceId, UpdateWorkflowInstanceRequest request) { return getInstanceIdResource(instanceId).put(request, String.class); } diff --git a/nflow-tests/src/test/java/io/nflow/tests/ArchiveTest.java b/nflow-tests/src/test/java/io/nflow/tests/ArchiveTest.java index e9213a4b3..7c6e52596 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/ArchiveTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/ArchiveTest.java @@ -124,15 +124,15 @@ public void archiveAgainBeforeTime2DoesNotArchiveAnything() { assertEquals(0, archived); } - private List createWorkflows(int count) { - List ids = new ArrayList<>(); + private List createWorkflows(int count) { + List ids = new ArrayList<>(); for (int i = 0; i < count; i++) { ids.add(createWorkflow()); } return ids; } - private int createWorkflow() { + private long createWorkflow() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); req.type = FibonacciWorkflow.WORKFLOW_TYPE; req.stateVariables.put("requestData", nflowObjectMapper().valueToTree(new FibonacciWorkflow.FiboData(3))); @@ -142,9 +142,9 @@ private int createWorkflow() { return resp.id; } - private void waitUntilWorkflowsFinished(List workflowIds) { + private void waitUntilWorkflowsFinished(List workflowIds) { assertTimeoutPreemptively(ofSeconds(15), () -> { - for (int workflowId : workflowIds) { + for (long workflowId : workflowIds) { try { getWorkflowInstance(workflowId, "done"); } catch (@SuppressWarnings("unused") InterruptedException e) { diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index 7485620ab..4018a0ef3 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -43,7 +43,7 @@ public class BulkWorkflowTest extends AbstractNflowTest { public static NflowServerConfig server = new NflowServerConfig.Builder().prop("nflow.dispatcher.sleep.ms", 25) .springContextClass(Configuration.class).build(); - private static int workflowId; + private static long workflowId; private static final int CHILDREN_COUNT = 10; @@ -117,7 +117,7 @@ public void t13_waitForBulkToFinish() { private void waitForBulkToFinish() { ListWorkflowInstanceResponse instance = getWorkflowInstanceWithTimeout(workflowId, done.name(), ofSeconds(30)); assertThat(instance.childWorkflows.size(), equalTo(1)); - List childWorkflowIds = instance.childWorkflows.values().iterator().next(); + List childWorkflowIds = instance.childWorkflows.values().iterator().next(); assertThat(childWorkflowIds.size(), equalTo(CHILDREN_COUNT)); List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); diff --git a/nflow-tests/src/test/java/io/nflow/tests/ChildWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/ChildWorkflowTest.java index 788eba7fa..9f631501e 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/ChildWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/ChildWorkflowTest.java @@ -25,7 +25,7 @@ public class ChildWorkflowTest extends AbstractNflowTest { public static NflowServerConfig server = new NflowServerConfig.Builder().build(); - private static int workflowId; + private static long workflowId; public ChildWorkflowTest() { super(server); diff --git a/nflow-tests/src/test/java/io/nflow/tests/DemoWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/DemoWorkflowTest.java index 072a2b53a..a3184108c 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/DemoWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/DemoWorkflowTest.java @@ -98,7 +98,7 @@ public void queryWorkflowWithActionsReturnsEmptyActions() { @Test @Order(5) public void queryWorkflowWithoutActionsReturnsNullActions() { - ListWorkflowInstanceResponse instance = fromClient(workflowInstanceIdResource, true).path(Integer.toString(resp.id)) + ListWorkflowInstanceResponse instance = fromClient(workflowInstanceIdResource, true).path(Long.toString(resp.id)) .get(ListWorkflowInstanceResponse.class); assertThat(instance.actions, is(nullValue())); diff --git a/nflow-tests/src/test/java/io/nflow/tests/WakeupTest.java b/nflow-tests/src/test/java/io/nflow/tests/WakeupTest.java index f84e5c15f..d6af30c27 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/WakeupTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/WakeupTest.java @@ -89,7 +89,7 @@ public void wakeupAgainInAnyState() throws InterruptedException { waitUntilActionCount(createdWorkflow.id, 4, 10 * 1000); } - private void waitUntilActionCount(int workflowId, int expectedActionCount, long maxWaitTime) throws InterruptedException { + private void waitUntilActionCount(long workflowId, int expectedActionCount, long maxWaitTime) throws InterruptedException { long start = currentTimeMillis(); while ((currentTimeMillis() - start) < maxWaitTime) { Thread.sleep(500); From 5256f72db2bb821e16c2af671e973dc1f81b37e2 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Sat, 23 Nov 2019 20:33:11 +0200 Subject: [PATCH 2/2] Fix review comments --- .../io/nflow/engine/internal/dao/WorkflowInstanceDao.java | 8 +++++--- .../engine/internal/executor/WorkflowDispatcherTest.java | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index f7eeb0446..b399ddb20 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -1,6 +1,9 @@ package io.nflow.engine.internal.dao; -import static io.nflow.engine.internal.dao.DaoUtil.*; +import static io.nflow.engine.internal.dao.DaoUtil.firstColumnLengthExtractor; +import static io.nflow.engine.internal.dao.DaoUtil.getInt; +import static io.nflow.engine.internal.dao.DaoUtil.getLong; +import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; @@ -28,7 +31,6 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -83,7 +85,7 @@ public class WorkflowInstanceDao { private static final Logger logger = getLogger(WorkflowInstanceDao.class); - static final Map> EMPTY_ACTION_STATE_MAP = Collections.emptyMap(); + static final Map> EMPTY_ACTION_STATE_MAP = emptyMap(); final JdbcTemplate jdbc; private final NamedParameterJdbcTemplate namedJdbc; diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index 9eb85ad7a..f0bff98d1 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -2,6 +2,7 @@ import static edu.umd.cs.mtc.TestFramework.runOnce; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -17,7 +18,6 @@ import static org.slf4j.Logger.ROOT_LOGGER_NAME; import static org.slf4j.LoggerFactory.getLogger; -import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -303,7 +303,7 @@ public void dispatcherLogsWarningWhenAllThreadsArePotentiallyStuck() throws Thro class DispatcherLogsWarning extends MultithreadedTestCase { public void threadDispatcher() throws InterruptedException { when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())) - .thenAnswer(waitForTickAndAnswer(2, Collections.emptyList(), this)); + .thenAnswer(waitForTickAndAnswer(2, emptyList(), this)); when(executorFactory.getPotentiallyStuckProcessors()).thenReturn(executor.getThreadCount()); dispatcher.run(); }