Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream results out of rest api. #382

Merged
merged 2 commits into from
Feb 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand Down Expand Up @@ -522,12 +521,9 @@ private void fillState(final WorkflowInstance instance) {
jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside "
+ "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key",
new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}
}, instance.id);
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}, instance.id);
instance.originalStateVariables.putAll(instance.stateVariables);
}

Expand Down Expand Up @@ -623,6 +619,10 @@ public int compareTo(OptimisticLockKey other) {
}

public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances query) {
return queryWorkflowInstancesAsStream(query).collect(toList());
}

public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInstances query) {
String sql = "select * from nflow_workflow ";
List<String> conditions = new ArrayList<>();
MapSqlParameterSource params = new MapSqlParameterSource();
Expand Down Expand Up @@ -664,43 +664,35 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
params.addValue("executor_group", executorInfo.getExecutorGroup());
sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));
List<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build).collect(toList());

Stream<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build);
if (query.includeCurrentStateVariables) {
for (WorkflowInstance instance : ret) {
fillState(instance);
}
ret = ret.peek(instance -> fillState(instance));
}
if (query.includeActions) {
for (WorkflowInstance instance : ret) {
fillActions(instance, query.includeActionStateVariables, query.maxActions);
}
ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions));
}
if (query.includeChildWorkflows) {
for (final WorkflowInstance instance : ret) {
fillChildWorkflowIds(instance);
}
ret = ret.peek(instance -> fillChildWorkflowIds(instance));
}
return ret;
}

private void fillChildWorkflowIds(final WorkflowInstance instance) {
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", rs -> {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}, instance.id);
}

private long getMaxResults(Long maxResults) {
if (maxResults == null) {
return workflowInstanceQueryMaxResultsDefault;
}
return min(maxResults.longValue(), workflowInstanceQueryMaxResults);
return min(maxResults, workflowInstanceQueryMaxResults);
}

private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) {
Expand All @@ -716,7 +708,7 @@ private long getMaxActions(Long maxActions) {
if (maxActions == null) {
return workflowInstanceQueryMaxActionsDefault;
}
return min(maxActions.longValue(), workflowInstanceQueryMaxActions);
return min(maxActions, workflowInstanceQueryMaxActions);
}

private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -49,7 +51,8 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow
* @param id Workflow instance id.
* @param includes Set of properties to be loaded.
* @param maxActions Maximum number of actions to be loaded.
* @return The workflow instance, or null if not found.
* @return The workflow instance.
* @throws EmptyResultDataAccessException If workflow instance is not found.
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions);
Expand Down Expand Up @@ -126,6 +129,15 @@ public Collection<WorkflowInstance> listWorkflowInstances(QueryWorkflowInstances
return workflowInstanceDao.queryWorkflowInstances(query);
}

/**
* Return workflow instances matching the given query.
* @param query The query parameters.
* @return Matching workflow instances as Stream. The stream does not need to be closed.
*/
public Stream<WorkflowInstance> listWorkflowInstancesAsStream(QueryWorkflowInstances query) {
return workflowInstanceDao.queryWorkflowInstancesAsStream(query);
}

/**
* Return current signal value for given workflow instance.
* @param workflowInstanceId Workflow instance id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand Down Expand Up @@ -106,6 +107,13 @@ public void roundTripTest() {
checkSameWorkflowInfo(i1, i2);
}

@Test
public void queryNonExistingWorkflowThrowsException() {
assertThrows(EmptyResultDataAccessException.class, () ->
dao.getWorkflowInstance(-42, emptySet(), null)
);
}

@Test
public void queryWorkflowInstanceWithAllConditions() {
WorkflowInstance i1 = constructWorkflowInstanceBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean updateWorkflowInstance(final long id,
return workflowInstances.updateWorkflowInstance(instance, action);
}

public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(final List<Long> ids, final List<String> types,
public Stream<ListWorkflowInstanceResponse> listWorkflowInstances(final List<Long> ids, final List<String> types,
final Long parentWorkflowId, final Long parentActionId, final List<String> states,
final List<WorkflowInstanceStatus> statuses, final String businessKey, final String externalId, final String include,
final Long maxResults, final Long maxActions, final WorkflowInstanceService workflowInstances,
Expand All @@ -151,13 +151,9 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(final List
.setMaxResults(maxResults) //
.setMaxActions(maxActions) //
.setIncludeChildWorkflows(includeStrings.contains(childWorkflows)).build();
Collection<WorkflowInstance> instances = workflowInstances.listWorkflowInstances(q);
List<ListWorkflowInstanceResponse> resp = new ArrayList<>();
Stream<WorkflowInstance> instances = workflowInstances.listWorkflowInstancesAsStream(q);
Set<WorkflowInstanceInclude> parseIncludeEnums = parseIncludeEnums(include);
for (WorkflowInstance instance : instances) {
resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums));
}
return resp;
return instances.map(instance -> listWorkflowConverter.convert(instance, parseIncludeEnums));
}

private Set<WorkflowInstanceInclude> parseIncludeEnums(String include) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import static javax.ws.rs.core.Response.Status.CONFLICT;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;

import javax.inject.Inject;
Expand Down Expand Up @@ -129,7 +129,7 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance(

@GET
@ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List")
public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
public Iterator<ListWorkflowInstanceResponse> listWorkflowInstances(
@QueryParam("id") @ApiParam("Internal id of workflow instance") List<Long> ids,
@QueryParam("type") @ApiParam("Workflow definition type of workflow instance") List<String> types,
@QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Long parentWorkflowId,
Expand All @@ -143,7 +143,7 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
@QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) {
return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states,
statuses, businessKey, externalId, include, maxResults, maxActions,
workflowInstances, listWorkflowConverter);
workflowInstances, listWorkflowConverter).iterator();
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void whenUpdatingStateVariablesUpdateWorkflowInstanceWorks() {
public void listWorkflowInstancesWorks() {
resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"),
asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null);
verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf(
verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf(
hasField("ids", contains(42L)),
hasField("types", contains("type")),
hasField("parentWorkflowId", is(99L)),
Expand All @@ -204,7 +204,7 @@ public void listWorkflowInstancesWorksWithAllIncludes() {
resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"),
asList(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing),
"businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L);
verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf(
verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf(
hasField("ids", contains(42L)),
hasField("types", contains("type")),
hasField("parentWorkflowId", is(99L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import static org.springframework.http.ResponseEntity.status;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;

import javax.inject.Inject;
Expand Down Expand Up @@ -115,7 +115,7 @@ public ResponseEntity<ListWorkflowInstanceResponse> fetchWorkflowInstance(

@GetMapping
@ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List")
public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
public Iterator<ListWorkflowInstanceResponse> listWorkflowInstances(
@RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List<Long> ids,
@RequestParam(value = "type", defaultValue = "") @ApiParam("Workflow definition type of workflow instance") List<String> types,
@RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Long parentWorkflowId,
Expand All @@ -128,7 +128,7 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
@RequestParam(value = "maxResults", required = false) @ApiParam("Maximum number of workflow instances to be returned") Long maxResults,
@RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) {
return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId,
include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter);
include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter).iterator();
}

@PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE)
Expand Down