diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index a04652fc5..bc186042a 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -4,6 +4,8 @@ import static io.nflow.engine.internal.dao.DaoUtil.getInt; import static io.nflow.engine.internal.dao.DaoUtil.getLong; import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp; +import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE; +import static io.nflow.engine.internal.dao.TablePrefix.MAIN; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; @@ -14,6 +16,7 @@ import static java.util.Collections.sort; import static java.util.Optional.ofNullable; import static java.util.stream.Collectors.toList; +import static java.util.stream.Stream.concat; import static java.util.stream.Stream.empty; import static org.apache.commons.lang3.StringUtils.abbreviate; import static org.apache.commons.lang3.StringUtils.join; @@ -53,7 +56,6 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCreator; import org.springframework.jdbc.core.ResultSetExtractor; -import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -504,30 +506,31 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl } public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions) { - String sql = "select * from nflow_workflow where id = ?"; + return getWorkflowInstance(id, includes, maxActions, MAIN); + } + + public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions, TablePrefix tablePrefix) { + String sql = "select * from " + tablePrefix.nameOf("workflow") + " where id = ?"; WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build(); if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) { - fillState(instance); + fillState(instance, tablePrefix); } if (includes.contains(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS)) { - fillChildWorkflowIds(instance); + fillChildWorkflowIds(instance, tablePrefix); } if (includes.contains(WorkflowInstanceInclude.ACTIONS)) { - fillActions(instance, includes.contains(WorkflowInstanceInclude.ACTION_STATE_VARIABLES), maxActions); + fillActions(instance, includes.contains(WorkflowInstanceInclude.ACTION_STATE_VARIABLES), maxActions, tablePrefix); } return instance; } - private void fillState(final WorkflowInstance instance) { - jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " - + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " + private void fillState(final WorkflowInstance instance, TablePrefix tablePrefix) { + jdbc.query("select outside.state_key, outside.state_value from " + tablePrefix.nameOf("workflow_state") + " outside inner join " + + "(select workflow_id, max(action_id) action_id, state_key from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? group by workflow_id, state_key) inside " + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", - new RowCallbackHandler() { - @Override - public void processRow(ResultSet rs) throws SQLException { - instance.stateVariables.put(rs.getString(1), rs.getString(2)); - } - }, instance.id); + rs -> { + instance.stateVariables.put(rs.getString(1), rs.getString(2)); + }, instance.id); instance.originalStateVariables.putAll(instance.stateVariables); } @@ -623,7 +626,10 @@ public int compareTo(OptimisticLockKey other) { } public List queryWorkflowInstances(QueryWorkflowInstances query) { - String sql = "select * from nflow_workflow "; + return queryWorkflowInstancesAsStream(query).collect(toList()); + } + + public Stream queryWorkflowInstancesAsStream(QueryWorkflowInstances query) { List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); conditions.add(executorInfo.getExecutorGroupCondition()); @@ -662,37 +668,39 @@ public List queryWorkflowInstances(QueryWorkflowInstances quer } conditions.add("executor_group = :executor_group"); params.addValue("executor_group", executorInfo.getExecutorGroup()); - sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc"; - sql = sqlVariants.limit(sql, getMaxResults(query.maxResults)); - List ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() - .map(WorkflowInstance.Builder::build).collect(toList()); + String sqlQuery = " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc"; + + long maxResults = getMaxResults(query.maxResults); + String sql = sqlVariants.limit("select * from " + MAIN.nameOf("workflow") + sqlQuery, maxResults); + List results = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()); + maxResults -= results.size(); + Stream resultStream = results.stream(); + + if (query.queryArchive && maxResults > 0) { + sql = sqlVariants.limit("select * from " + ARCHIVE.nameOf("workflow") + sqlQuery, maxResults); + resultStream = concat(resultStream, namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() + .peek(builder -> builder.setArchived(true))); + } + + Stream ret = resultStream.map(WorkflowInstance.Builder::build); if (query.includeCurrentStateVariables) { - for (WorkflowInstance instance : ret) { - fillState(instance); - } + ret = ret.peek(instance -> fillState(instance, instance.isArchived ? ARCHIVE : MAIN)); } if (query.includeActions) { - for (WorkflowInstance instance : ret) { - fillActions(instance, query.includeActionStateVariables, query.maxActions); - } + ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions, instance.isArchived ? ARCHIVE : MAIN)); } if (query.includeChildWorkflows) { - for (final WorkflowInstance instance : ret) { - fillChildWorkflowIds(instance); - } + ret = ret.peek(instance -> fillChildWorkflowIds(instance, instance.isArchived ? ARCHIVE : MAIN)); } return ret; } - private void fillChildWorkflowIds(final WorkflowInstance instance) { - jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() { - @Override - public void processRow(ResultSet rs) throws SQLException { - long parentActionId = rs.getLong(1); - long childWorkflowInstanceId = rs.getLong(2); - List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); - children.add(childWorkflowInstanceId); - } + private void fillChildWorkflowIds(final WorkflowInstance instance, TablePrefix tablePrefix) { + jdbc.query("select parent_action_id, id from " + tablePrefix.nameOf("workflow") + " where parent_workflow_id = ?", rs -> { + long parentActionId = rs.getLong(1); + long childWorkflowInstanceId = rs.getLong(2); + List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); + children.add(childWorkflowInstanceId); }, instance.id); } @@ -700,14 +708,14 @@ private long getMaxResults(Long maxResults) { if (maxResults == null) { return workflowInstanceQueryMaxResultsDefault; } - return min(maxResults.longValue(), workflowInstanceQueryMaxResults); + return min(maxResults, workflowInstanceQueryMaxResults); } - private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { - Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance) + private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions, TablePrefix tablePrefix) { + Map> actionStates = includeStateVariables ? fetchActionStateVariables(instance, tablePrefix) : EMPTY_ACTION_STATE_MAP; String sql = sqlVariants.limit( - "select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc", + "select nflow_workflow_action.* from " + tablePrefix.nameOf("workflow_action") + " where workflow_id = ? order by id desc", getMaxActions(maxActions)); instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id)); } @@ -716,11 +724,11 @@ private long getMaxActions(Long maxActions) { if (maxActions == null) { return workflowInstanceQueryMaxActionsDefault; } - return min(maxActions.longValue(), workflowInstanceQueryMaxActions); + return min(maxActions, workflowInstanceQueryMaxActions); } - private Map> fetchActionStateVariables(WorkflowInstance instance) { - return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc", + private Map> fetchActionStateVariables(WorkflowInstance instance, TablePrefix tablePrefix) { + return jdbc.query("select * from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? order by action_id, state_key asc", new WorkflowActionStateRowMapper(), instance.id); } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java index 628769eca..0c5a747f2 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/workflow/StateExecutionImpl.java @@ -1,6 +1,7 @@ package io.nflow.engine.internal.workflow; import static java.util.Collections.unmodifiableList; +import static java.util.stream.Collectors.toList; import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.util.Assert.notNull; diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java index 2b9c1caab..709c131dd 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java @@ -1,5 +1,8 @@ package io.nflow.engine.service; +import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE; +import static io.nflow.engine.internal.dao.TablePrefix.MAIN; +import static java.util.stream.Collectors.toList; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.util.StringUtils.isEmpty; @@ -7,10 +10,13 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import javax.inject.Inject; +import io.nflow.engine.internal.dao.TablePrefix; import org.slf4j.Logger; +import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -52,7 +58,27 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow * @return The workflow instance, or null if not found. */ public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions) { - return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions); + return getWorkflowInstance(id, includes, maxActions, false); + } + + /** + * Return the workflow instance matching the given id. + * @param id Workflow instance id. + * @param includes Set of properties to be loaded. + * @param maxActions Maximum number of actions to be loaded. + * @param queryArchive Query archive if not found from main tables. + * @return The workflow instance + * @throws EmptyResultDataAccessException if not found + */ + public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions, boolean queryArchive) { + try { + return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions, MAIN); + } catch (EmptyResultDataAccessException ex) { + if (queryArchive) { + return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions, ARCHIVE); + } + throw ex; + } } /** @@ -126,6 +152,15 @@ public Collection listWorkflowInstances(QueryWorkflowInstances return workflowInstanceDao.queryWorkflowInstances(query); } + /** + * Return workflow instances matching the given query. + * @param query The query parameters. + * @return Matching workflow instances as Stream. The stream does not need to be closed. + */ + public Stream listWorkflowInstancesAsStream(QueryWorkflowInstances query) { + return workflowInstanceDao.queryWorkflowInstancesAsStream(query); + } + /** * Return current signal value for given workflow instance. * @param workflowInstanceId Workflow instance id. diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java index c298c81f7..874af6691 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java @@ -85,6 +85,11 @@ public class QueryWorkflowInstances extends ModelObject { */ public final Long maxActions; + /** + * When set also the workflow archive will be queried. + */ + public final boolean queryArchive; + QueryWorkflowInstances(Builder builder) { this.ids = new ArrayList<>(builder.ids); this.types = new ArrayList<>(builder.types); @@ -100,6 +105,7 @@ public class QueryWorkflowInstances extends ModelObject { this.includeChildWorkflows = builder.includeChildWorkflows; this.maxResults = builder.maxResults; this.maxActions = builder.maxActions; + this.queryArchive = builder.queryArchive; } /** @@ -120,6 +126,7 @@ public static class Builder { boolean includeChildWorkflows; Long maxResults; Long maxActions; + boolean queryArchive; /** * Create a workflow instance query builder. @@ -142,6 +149,7 @@ public Builder(QueryWorkflowInstances copy) { this.includeChildWorkflows = copy.includeChildWorkflows; this.maxResults = copy.maxResults; this.maxActions = copy.maxActions; + this.queryArchive = copy.queryArchive; } /** * Add identifiers to query parameters. @@ -287,6 +295,17 @@ public Builder setMaxActions(Long maxActions) { return this; } + /** + * If true the workflow instance archive is also searched. + * + * @param queryArchive True if archive should also be queried. + * @return this. + */ + public Builder setQueryArchive(boolean queryArchive) { + this.queryArchive = queryArchive; + return this; + } + /** * Create the workflow instance query object. * diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java index 7344c28b1..2e13c06a5 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/WorkflowInstance.java @@ -148,6 +148,11 @@ public enum WorkflowInstanceStatus { */ public final Optional signal; + /** + * True if this instance is archived. + */ + public final boolean isArchived; + /** * Child workflow instance IDs created by this workflow instance, grouped by instance action ID. */ @@ -178,6 +183,7 @@ public enum WorkflowInstanceStatus { this.started = builder.started; this.executorGroup = builder.executorGroup; this.signal = builder.signal; + this.isArchived = builder.isArchived; this.mapper = builder.mapper; } @@ -251,6 +257,7 @@ public static class Builder { DateTime started; DateTime modified; String executorGroup; + boolean isArchived; Optional signal = Optional.empty(); ObjectStringMapper mapper; @@ -295,6 +302,7 @@ public Builder(WorkflowInstance copy) { this.started = copy.started; this.executorGroup = copy.executorGroup; this.signal = copy.signal; + this.isArchived = copy.isArchived; this.mapper = copy.mapper; } @@ -561,6 +569,16 @@ public Builder setSignal(Optional signal) { return this; } + /** + * Set the workflow source. + * @param isArchived True if this workflow is archived. + * @return this. + */ + public Builder setArchived(boolean isArchived) { + this.isArchived = isArchived; + return this; + } + /** * Create the workflow instance object. * @return The workflow instance. diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/MaintenanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/MaintenanceDaoTest.java index 28f9ab60e..9fa5641b4 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/MaintenanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/MaintenanceDaoTest.java @@ -417,11 +417,11 @@ public void deleteExpiredWorkflowHistory() { maintenanceDao.deleteActionAndStateHistory(parentWorkflowId, now()); parentWorkflow = workflowInstanceDao.getWorkflowInstance(parentWorkflowId, EnumSet.allOf(WorkflowInstanceInclude.class), - null); + null, MAIN); assertThat(parentWorkflow.getStateVariable("requestData"), equalTo("{ \"parameter\": \"abc\" }")); assertThat(parentWorkflow.getStateVariable("variable"), equalTo("preservedValue")); assertThat(parentWorkflow.actions.size(), equalTo(2)); - childWorkflow = workflowInstanceDao.getWorkflowInstance(childWorkflowId, emptySet(), null); + childWorkflow = workflowInstanceDao.getWorkflowInstance(childWorkflowId, emptySet(), null, MAIN); assertThat(childWorkflow.parentWorkflowId, equalTo(parentWorkflowId)); } @@ -459,7 +459,7 @@ private long addWorkflowAction(long workflowId, final WorkflowInstance instance, private void assertActiveWorkflowsRemoved(List workflowIds) { for (long id : workflowIds) { - assertThrows(EmptyResultDataAccessException.class, () -> workflowInstanceDao.getWorkflowInstance(id, emptySet(), null)); + assertThrows(EmptyResultDataAccessException.class, () -> workflowInstanceDao.getWorkflowInstance(id, emptySet(), null, MAIN)); } } @@ -594,7 +594,7 @@ private long insert(WorkflowInstance instance) { assertTrue(id > 0); DateTime modified = instance.modified; updateModified(id, modified); - WorkflowInstance dbInstance = workflowInstanceDao.getWorkflowInstance(id, emptySet(), null); + WorkflowInstance dbInstance = workflowInstanceDao.getWorkflowInstance(id, emptySet(), null, MAIN); assertEquals(modified, dbInstance.modified); return id; } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 60e147f40..f69af1857 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -1,5 +1,6 @@ package io.nflow.engine.internal.dao; +import static io.nflow.engine.internal.dao.TablePrefix.MAIN; import static io.nflow.engine.service.WorkflowInstanceInclude.CHILD_WORKFLOW_IDS; import static io.nflow.engine.service.WorkflowInstanceInclude.CURRENT_STATE_VARIABLES; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; @@ -56,6 +57,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.core.env.Environment; +import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -99,13 +101,20 @@ public void roundTripTest() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); i1.stateVariables.put("a", "1"); long id = dao.insertWorkflowInstance(i1); - WorkflowInstance i2 = dao.getWorkflowInstance(id, EnumSet.allOf(WorkflowInstanceInclude.class), null); + WorkflowInstance i2 = dao.getWorkflowInstance(id, EnumSet.allOf(WorkflowInstanceInclude.class), null, MAIN); assertThat(i2.id, notNullValue()); assertThat(i2.created, notNullValue()); assertThat(i2.modified, notNullValue()); checkSameWorkflowInfo(i1, i2); } + @Test + public void queryNonExistingWorkflowThrowsException() { + assertThrows(EmptyResultDataAccessException.class, () -> + dao.getWorkflowInstance(-42, emptySet(), null, MAIN) + ); + } + @Test public void queryWorkflowInstanceWithAllConditions() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); @@ -137,7 +146,9 @@ public void queryWorkflowInstanceWithAllConditions() { .setIncludeCurrentStateVariables(true) // .setIncludeChildWorkflows(true) // .setMaxResults(1L) // - .setMaxActions(1L).build(); + .setMaxActions(1L) // + .setQueryArchive(true) // + .build(); List l = dao.queryWorkflowInstances(q); assertThat(l.size(), is(1)); checkSameWorkflowInfo(child, l.get(0)); @@ -166,9 +177,9 @@ public void updateWorkflowInstance() throws InterruptedException { assertThat(ids, contains(id)); DateTime started = now(); final WorkflowInstance i2 = new WorkflowInstance.Builder( - dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null)).setStatus(inProgress).setState("updateState") + dao.getWorkflowInstance(id, EnumSet.of(CURRENT_STATE_VARIABLES), null, MAIN)).setStatus(inProgress).setState("updateState") .setStateText("update text").setStartedIfNotSet(started).build(); - final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null); + final WorkflowInstance polledInstance = dao.getWorkflowInstance(id, emptySet(), null, MAIN); assertThat(polledInstance.status, equalTo(executing)); final DateTime originalModifiedTime = polledInstance.modified; sleep(1); diff --git a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java index 998338e41..4b0575aa7 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/service/WorkflowInstanceServiceTest.java @@ -1,5 +1,6 @@ package io.nflow.engine.service; +import static io.nflow.engine.internal.dao.TablePrefix.MAIN; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.externalChange; @@ -27,6 +28,7 @@ import java.util.Optional; import java.util.Set; +import io.nflow.engine.internal.dao.TablePrefix; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,7 +82,7 @@ public void getWorkflowInstance() { WorkflowInstance instance = Mockito.mock(WorkflowInstance.class); @SuppressWarnings("unchecked") Set includes = Mockito.mock(Set.class); - when(workflowInstanceDao.getWorkflowInstance(42, includes, 10L)).thenReturn(instance); + when(workflowInstanceDao.getWorkflowInstance(42, includes, 10L, MAIN)).thenReturn(instance); assertEquals(instance, service.getWorkflowInstance(42, includes, 10L)); } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/config/RestConfiguration.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/config/RestConfiguration.java index cf683fab7..067804023 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/config/RestConfiguration.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/config/RestConfiguration.java @@ -27,6 +27,7 @@ public class RestConfiguration { @Named(REST_OBJECT_MAPPER) public ObjectMapper nflowRestObjectMapper(@NFlow ObjectMapper nflowObjectMapper) { ObjectMapper restObjectMapper = nflowObjectMapper.copy(); + //restObjectMapper.registerModule(new StreamModule()); restObjectMapper.configure(WRITE_DATES_AS_TIMESTAMPS, false); return restObjectMapper; } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 657e396cd..03acfec8d 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -130,10 +130,10 @@ public boolean updateWorkflowInstance(final long id, return workflowInstances.updateWorkflowInstance(instance, action); } - public Collection listWorkflowInstances(final List ids, final List types, + public Stream listWorkflowInstances(final List ids, final List types, final Long parentWorkflowId, final Long parentActionId, final List states, final List statuses, final String businessKey, final String externalId, final String include, - final Long maxResults, final Long maxActions, final WorkflowInstanceService workflowInstances, + final Long maxResults, final Long maxActions, boolean queryArchive, final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) { Set includeStrings = parseIncludeStrings(include).collect(toSet()); QueryWorkflowInstances q = new QueryWorkflowInstances.Builder() // @@ -150,14 +150,11 @@ public Collection listWorkflowInstances(final List .setIncludeActionStateVariables(includeStrings.contains(actionStateVariables)) // .setMaxResults(maxResults) // .setMaxActions(maxActions) // + .setQueryArchive(queryArchive) // .setIncludeChildWorkflows(includeStrings.contains(childWorkflows)).build(); - Collection instances = workflowInstances.listWorkflowInstances(q); - List resp = new ArrayList<>(); + Stream instances = workflowInstances.listWorkflowInstancesAsStream(q); Set parseIncludeEnums = parseIncludeEnums(include); - for (WorkflowInstance instance : instances) { - resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums)); - } - return resp; + return instances.map(instance -> listWorkflowConverter.convert(instance, parseIncludeEnums)); } private Set parseIncludeEnums(String include) { @@ -169,11 +166,11 @@ private Stream parseIncludeStrings(String include) { return Stream.of(trimToEmpty(include).split(",")); } - public ListWorkflowInstanceResponse fetchWorkflowInstance(final long id, final String include, final Long maxActions, + public ListWorkflowInstanceResponse fetchWorkflowInstance(final long id, final String include, final Long maxActions, boolean queryArchive, final WorkflowInstanceService workflowInstances, final ListWorkflowInstanceConverter listWorkflowConverter) throws EmptyResultDataAccessException { Set includes = parseIncludeEnums(include); - WorkflowInstance instance = workflowInstances.getWorkflowInstance(id, includes, maxActions); + WorkflowInstance instance = workflowInstances.getWorkflowInstance(id, includes, maxActions, queryArchive); return listWorkflowConverter.convert(instance, includes); } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java index e642c226c..c7550e382 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowInstanceConverter.java @@ -11,6 +11,7 @@ import javax.inject.Inject; +import io.nflow.engine.config.NFlow; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java index e9d10296d..2efaa98b9 100644 --- a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java +++ b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java @@ -15,7 +15,9 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; import javax.inject.Inject; import javax.validation.Valid; @@ -119,9 +121,11 @@ public Response updateWorkflowInstance(@ApiParam("Internal id for workflow insta public ListWorkflowInstanceResponse fetchWorkflowInstance( @ApiParam("Internal id for workflow instance") @PathParam("id") long id, @QueryParam("include") @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, - @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { + @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions, + @QueryParam("queryArchive") @ApiParam("Query also the archive") Boolean queryArchive + ) { try { - return super.fetchWorkflowInstance(id, include, maxActions, workflowInstances, listWorkflowConverter); + return super.fetchWorkflowInstance(id, include, maxActions, ofNullable(queryArchive).orElse(true), workflowInstances, listWorkflowConverter); } catch (@SuppressWarnings("unused") EmptyResultDataAccessException e) { throw new NotFoundException(format("Workflow instance %s not found", id)); } @@ -129,7 +133,7 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance( @GET @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") - public Collection listWorkflowInstances( + public Iterator listWorkflowInstances( @QueryParam("id") @ApiParam("Internal id of workflow instance") List ids, @QueryParam("type") @ApiParam("Workflow definition type of workflow instance") List types, @QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Long parentWorkflowId, @@ -140,10 +144,12 @@ public Collection listWorkflowInstances( @QueryParam("externalId") @ApiParam("External id for workflow instance") String externalId, @QueryParam("include") @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, @QueryParam("maxResults") @ApiParam("Maximum number of workflow instances to be returned") Long maxResults, - @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { + @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions, + @QueryParam("queryArchive") @ApiParam("Query also the archive") Boolean queryArchive + ) { return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, - statuses, businessKey, externalId, include, maxResults, maxActions, - workflowInstances, listWorkflowConverter); + statuses, businessKey, externalId, include, maxResults, maxActions, ofNullable(queryArchive).orElse(true), + workflowInstances, listWorkflowConverter).iterator(); } @PUT diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 790a3d998..6de17f8d2 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -1,6 +1,7 @@ package io.nflow.rest.v1.jaxrs; import static com.nitorcreations.Matchers.hasField; +import static io.nflow.engine.internal.dao.TablePrefix.MAIN; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.externalChange; import static java.util.Arrays.asList; import static java.util.Collections.emptySet; @@ -181,8 +182,8 @@ public void whenUpdatingStateVariablesUpdateWorkflowInstanceWorks() { @Test public void listWorkflowInstancesWorks() { resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), - asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null); - verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( + asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null, true); + verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf( hasField("ids", contains(42L)), hasField("types", contains("type")), hasField("parentWorkflowId", is(99L)), @@ -196,15 +197,17 @@ public void listWorkflowInstancesWorks() { hasField("includeActionStateVariables", equalTo(false)), hasField("includeChildWorkflows", equalTo(false)), hasField("maxResults", equalTo(null)), - hasField("maxActions", equalTo(null))))); + hasField("maxActions", equalTo(null)), + hasField("queryArchive", equalTo(true)) + ))); } @Test public void listWorkflowInstancesWorksWithAllIncludes() { resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), asList(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing), - "businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L); - verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( + "businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L, false); + verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf( hasField("ids", contains(42L)), hasField("types", contains("type")), hasField("parentWorkflowId", is(99L)), @@ -218,25 +221,27 @@ public void listWorkflowInstancesWorksWithAllIncludes() { hasField("includeActionStateVariables", equalTo(true)), hasField("includeChildWorkflows", equalTo(true)), hasField("maxResults", equalTo(1L)), - hasField("maxActions", equalTo(1L))))); + hasField("maxActions", equalTo(1L)), + hasField("queryArchive", equalTo(false)) + ))); } @Test public void fetchingNonExistingWorkflowThrowsNotFoundException() { - when(workflowInstances.getWorkflowInstance(42, emptySet(), null)) + when(workflowInstances.getWorkflowInstance(42, emptySet(), null, true)) .thenThrow(EmptyResultDataAccessException.class); - assertThrows(NotFoundException.class, () -> resource.fetchWorkflowInstance(42, null, null)); + assertThrows(NotFoundException.class, () -> resource.fetchWorkflowInstance(42, null, null, true)); } @SuppressWarnings("unchecked") @Test public void fetchingExistingWorkflowWorks() { WorkflowInstance instance = mock(WorkflowInstance.class); - when(workflowInstances.getWorkflowInstance(42, emptySet(), null)).thenReturn(instance); + when(workflowInstances.getWorkflowInstance(42, emptySet(), null, false)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); - ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, null, null); - verify(workflowInstances).getWorkflowInstance(42, emptySet(), null); + ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, null, null, false); + verify(workflowInstances).getWorkflowInstance(42, emptySet(), null, false); assertEquals(resp, result); } @@ -245,12 +250,12 @@ public void fetchingExistingWorkflowWorks() { public void fetchingExistingWorkflowWorksWithAllIncludes() { WorkflowInstance instance = mock(WorkflowInstance.class); EnumSet includes = EnumSet.allOf(WorkflowInstanceInclude.class); - when(workflowInstances.getWorkflowInstance(42, includes, 10L)).thenReturn(instance); + when(workflowInstances.getWorkflowInstance(42, includes, 10L, false)).thenReturn(instance); ListWorkflowInstanceResponse resp = mock(ListWorkflowInstanceResponse.class); when(listWorkflowConverter.convert(eq(instance), any(Set.class))).thenReturn(resp); ListWorkflowInstanceResponse result = resource.fetchWorkflowInstance(42, - "actions,currentStateVariables,actionStateVariables,childWorkflows", 10L); - verify(workflowInstances).getWorkflowInstance(42, includes, 10L); + "actions,currentStateVariables,actionStateVariables,childWorkflows", 10L, false); + verify(workflowInstances).getWorkflowInstance(42, includes, 10L, false); assertEquals(resp, result); } diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index e086604c1..07e3940ef 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -15,7 +15,9 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.stream.Stream; import javax.inject.Inject; import javax.validation.Valid; @@ -105,9 +107,11 @@ public ResponseEntity updateWorkflowInstance(@ApiParam("Internal id for workf public ResponseEntity fetchWorkflowInstance( @ApiParam("Internal id for workflow instance") @PathVariable("id") long id, @RequestParam(value = "include", required = false) @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, - @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { + @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions, + @RequestParam(value = "queryArchive", required = false, defaultValue = "true") @ApiParam("Query also the archive") boolean queryArchive + ) { try { - return ok().body(super.fetchWorkflowInstance(id, include, maxActions, this.workflowInstances, this.listWorkflowConverter)); + return ok().body(super.fetchWorkflowInstance(id, include, maxActions, queryArchive, this.workflowInstances, this.listWorkflowConverter)); } catch (@SuppressWarnings("unused") EmptyResultDataAccessException e) { return notFound().build(); } @@ -115,7 +119,7 @@ public ResponseEntity fetchWorkflowInstance( @GetMapping @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") - public Collection listWorkflowInstances( + public Iterator listWorkflowInstances( @RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List ids, @RequestParam(value = "type", defaultValue = "") @ApiParam("Workflow definition type of workflow instance") List types, @RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Long parentWorkflowId, @@ -126,9 +130,11 @@ public Collection listWorkflowInstances( @RequestParam(value = "externalId", required = false) @ApiParam("External id for workflow instance") String externalId, @RequestParam(value = "include", required = false) @ApiParam(value = INCLUDE_PARAM_DESC, allowableValues = INCLUDE_PARAM_VALUES, allowMultiple = true) String include, @RequestParam(value = "maxResults", required = false) @ApiParam("Maximum number of workflow instances to be returned") Long maxResults, - @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { + @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions, + @RequestParam(value = "queryArchive", required = false, defaultValue = "true") @ApiParam("Query also the archive") boolean queryArchive + ) { return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, - include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter); + include, maxResults, maxActions, queryArchive, this.workflowInstances, this.listWorkflowConverter).iterator(); } @PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE)