diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index a04652fc5..d3d0d7a44 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -53,7 +53,6 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementCreator; import org.springframework.jdbc.core.ResultSetExtractor; -import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -522,12 +521,9 @@ private void fillState(final WorkflowInstance instance) { jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join " + "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside " + "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key", - new RowCallbackHandler() { - @Override - public void processRow(ResultSet rs) throws SQLException { - instance.stateVariables.put(rs.getString(1), rs.getString(2)); - } - }, instance.id); + rs -> { + instance.stateVariables.put(rs.getString(1), rs.getString(2)); + }, instance.id); instance.originalStateVariables.putAll(instance.stateVariables); } @@ -623,6 +619,10 @@ public int compareTo(OptimisticLockKey other) { } public List queryWorkflowInstances(QueryWorkflowInstances query) { + return queryWorkflowInstancesAsStream(query).collect(toList()); + } + + public Stream queryWorkflowInstancesAsStream(QueryWorkflowInstances query) { String sql = "select * from nflow_workflow "; List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); @@ -664,35 +664,27 @@ public List queryWorkflowInstances(QueryWorkflowInstances quer params.addValue("executor_group", executorInfo.getExecutorGroup()); sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc"; sql = sqlVariants.limit(sql, getMaxResults(query.maxResults)); - List ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() - .map(WorkflowInstance.Builder::build).collect(toList()); + + Stream ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream() + .map(WorkflowInstance.Builder::build); if (query.includeCurrentStateVariables) { - for (WorkflowInstance instance : ret) { - fillState(instance); - } + ret = ret.peek(instance -> fillState(instance)); } if (query.includeActions) { - for (WorkflowInstance instance : ret) { - fillActions(instance, query.includeActionStateVariables, query.maxActions); - } + ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions)); } if (query.includeChildWorkflows) { - for (final WorkflowInstance instance : ret) { - fillChildWorkflowIds(instance); - } + ret = ret.peek(instance -> fillChildWorkflowIds(instance)); } return ret; } private void fillChildWorkflowIds(final WorkflowInstance instance) { - jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() { - @Override - public void processRow(ResultSet rs) throws SQLException { - long parentActionId = rs.getLong(1); - long childWorkflowInstanceId = rs.getLong(2); - List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); - children.add(childWorkflowInstanceId); - } + jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", rs -> { + long parentActionId = rs.getLong(1); + long childWorkflowInstanceId = rs.getLong(2); + List children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>()); + children.add(childWorkflowInstanceId); }, instance.id); } @@ -700,7 +692,7 @@ private long getMaxResults(Long maxResults) { if (maxResults == null) { return workflowInstanceQueryMaxResultsDefault; } - return min(maxResults.longValue(), workflowInstanceQueryMaxResults); + return min(maxResults, workflowInstanceQueryMaxResults); } private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) { @@ -716,7 +708,7 @@ private long getMaxActions(Long maxActions) { if (maxActions == null) { return workflowInstanceQueryMaxActionsDefault; } - return min(maxActions.longValue(), workflowInstanceQueryMaxActions); + return min(maxActions, workflowInstanceQueryMaxActions); } private Map> fetchActionStateVariables(WorkflowInstance instance) { diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java index 2b9c1caab..3906a26c6 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowInstanceService.java @@ -7,10 +7,12 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import javax.inject.Inject; import org.slf4j.Logger; +import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Assert; @@ -49,7 +51,8 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow * @param id Workflow instance id. * @param includes Set of properties to be loaded. * @param maxActions Maximum number of actions to be loaded. - * @return The workflow instance, or null if not found. + * @return The workflow instance. + * @throws EmptyResultDataAccessException If workflow instance is not found. */ public WorkflowInstance getWorkflowInstance(long id, Set includes, Long maxActions) { return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions); @@ -126,6 +129,15 @@ public Collection listWorkflowInstances(QueryWorkflowInstances return workflowInstanceDao.queryWorkflowInstances(query); } + /** + * Return workflow instances matching the given query. + * @param query The query parameters. + * @return Matching workflow instances as Stream. The stream does not need to be closed. + */ + public Stream listWorkflowInstancesAsStream(QueryWorkflowInstances query) { + return workflowInstanceDao.queryWorkflowInstancesAsStream(query); + } + /** * Return current signal value for given workflow instance. * @param workflowInstanceId Workflow instance id. diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index 60e147f40..871be823e 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -56,6 +56,7 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.core.env.Environment; +import org.springframework.dao.EmptyResultDataAccessException; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; @@ -106,6 +107,13 @@ public void roundTripTest() { checkSameWorkflowInfo(i1, i2); } + @Test + public void queryNonExistingWorkflowThrowsException() { + assertThrows(EmptyResultDataAccessException.class, () -> + dao.getWorkflowInstance(-42, emptySet(), null) + ); + } + @Test public void queryWorkflowInstanceWithAllConditions() { WorkflowInstance i1 = constructWorkflowInstanceBuilder().build(); diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java index 657e396cd..5051fd7e3 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/ResourceBase.java @@ -130,7 +130,7 @@ public boolean updateWorkflowInstance(final long id, return workflowInstances.updateWorkflowInstance(instance, action); } - public Collection listWorkflowInstances(final List ids, final List types, + public Stream listWorkflowInstances(final List ids, final List types, final Long parentWorkflowId, final Long parentActionId, final List states, final List statuses, final String businessKey, final String externalId, final String include, final Long maxResults, final Long maxActions, final WorkflowInstanceService workflowInstances, @@ -151,13 +151,9 @@ public Collection listWorkflowInstances(final List .setMaxResults(maxResults) // .setMaxActions(maxActions) // .setIncludeChildWorkflows(includeStrings.contains(childWorkflows)).build(); - Collection instances = workflowInstances.listWorkflowInstances(q); - List resp = new ArrayList<>(); + Stream instances = workflowInstances.listWorkflowInstancesAsStream(q); Set parseIncludeEnums = parseIncludeEnums(include); - for (WorkflowInstance instance : instances) { - resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums)); - } - return resp; + return instances.map(instance -> listWorkflowConverter.convert(instance, parseIncludeEnums)); } private Set parseIncludeEnums(String include) { diff --git a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java index e9d10296d..f507a017b 100644 --- a/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java +++ b/nflow-rest-api-jax-rs/src/main/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResource.java @@ -12,9 +12,9 @@ import static javax.ws.rs.core.Response.Status.CONFLICT; import java.net.URI; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import javax.inject.Inject; @@ -129,7 +129,7 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance( @GET @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") - public Collection listWorkflowInstances( + public Iterator listWorkflowInstances( @QueryParam("id") @ApiParam("Internal id of workflow instance") List ids, @QueryParam("type") @ApiParam("Workflow definition type of workflow instance") List types, @QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Long parentWorkflowId, @@ -143,7 +143,7 @@ public Collection listWorkflowInstances( @QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, include, maxResults, maxActions, - workflowInstances, listWorkflowConverter); + workflowInstances, listWorkflowConverter).iterator(); } @PUT diff --git a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java index 790a3d998..1ecd1eb24 100644 --- a/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java +++ b/nflow-rest-api-jax-rs/src/test/java/io/nflow/rest/v1/jaxrs/WorkflowInstanceResourceTest.java @@ -182,7 +182,7 @@ public void whenUpdatingStateVariablesUpdateWorkflowInstanceWorks() { public void listWorkflowInstancesWorks() { resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null); - verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( + verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf( hasField("ids", contains(42L)), hasField("types", contains("type")), hasField("parentWorkflowId", is(99L)), @@ -204,7 +204,7 @@ public void listWorkflowInstancesWorksWithAllIncludes() { resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"), asList(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing), "businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L); - verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf( + verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf( hasField("ids", contains(42L)), hasField("types", contains("type")), hasField("parentWorkflowId", is(99L)), diff --git a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java index e086604c1..f9d2b06ec 100644 --- a/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java +++ b/nflow-rest-api-spring-web/src/main/java/io/nflow/rest/v1/springweb/WorkflowInstanceResource.java @@ -12,9 +12,9 @@ import static org.springframework.http.ResponseEntity.status; import java.net.URI; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; import javax.inject.Inject; @@ -115,7 +115,7 @@ public ResponseEntity fetchWorkflowInstance( @GetMapping @ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List") - public Collection listWorkflowInstances( + public Iterator listWorkflowInstances( @RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List ids, @RequestParam(value = "type", defaultValue = "") @ApiParam("Workflow definition type of workflow instance") List types, @RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Long parentWorkflowId, @@ -128,7 +128,7 @@ public Collection listWorkflowInstances( @RequestParam(value = "maxResults", required = false) @ApiParam("Maximum number of workflow instances to be returned") Long maxResults, @RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) { return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId, - include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter); + include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter).iterator(); } @PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE)