Skip to content

Commit

Permalink
Merge 6083e1d into 9713b0a
Browse files Browse the repository at this point in the history
  • Loading branch information
gmokki committed Feb 20, 2020
2 parents 9713b0a + 6083e1d commit b63996b
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand Down Expand Up @@ -522,12 +521,9 @@ private void fillState(final WorkflowInstance instance) {
jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside "
+ "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key",
new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}
}, instance.id);
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}, instance.id);
instance.originalStateVariables.putAll(instance.stateVariables);
}

Expand Down Expand Up @@ -623,6 +619,10 @@ public int compareTo(OptimisticLockKey other) {
}

public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances query) {
return queryWorkflowInstancesAsStream(query).collect(toList());
}

public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInstances query) {
String sql = "select * from nflow_workflow ";
List<String> conditions = new ArrayList<>();
MapSqlParameterSource params = new MapSqlParameterSource();
Expand Down Expand Up @@ -664,43 +664,35 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
params.addValue("executor_group", executorInfo.getExecutorGroup());
sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));
List<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build).collect(toList());

Stream<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build);
if (query.includeCurrentStateVariables) {
for (WorkflowInstance instance : ret) {
fillState(instance);
}
ret = ret.peek(instance -> fillState(instance));
}
if (query.includeActions) {
for (WorkflowInstance instance : ret) {
fillActions(instance, query.includeActionStateVariables, query.maxActions);
}
ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions));
}
if (query.includeChildWorkflows) {
for (final WorkflowInstance instance : ret) {
fillChildWorkflowIds(instance);
}
ret = ret.peek(instance -> fillChildWorkflowIds(instance));
}
return ret;
}

private void fillChildWorkflowIds(final WorkflowInstance instance) {
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", rs -> {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}, instance.id);
}

private long getMaxResults(Long maxResults) {
if (maxResults == null) {
return workflowInstanceQueryMaxResultsDefault;
}
return min(maxResults.longValue(), workflowInstanceQueryMaxResults);
return min(maxResults, workflowInstanceQueryMaxResults);
}

private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) {
Expand All @@ -716,7 +708,7 @@ private long getMaxActions(Long maxActions) {
if (maxActions == null) {
return workflowInstanceQueryMaxActionsDefault;
}
return min(maxActions.longValue(), workflowInstanceQueryMaxActions);
return min(maxActions, workflowInstanceQueryMaxActions);
}

private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import javax.inject.Inject;

import org.slf4j.Logger;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -49,7 +51,8 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow
* @param id Workflow instance id.
* @param includes Set of properties to be loaded.
* @param maxActions Maximum number of actions to be loaded.
* @return The workflow instance, or null if not found.
* @return The workflow instance.
* @throws EmptyResultDataAccessException If workflow instance is not found.
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions);
Expand Down Expand Up @@ -126,6 +129,15 @@ public Collection<WorkflowInstance> listWorkflowInstances(QueryWorkflowInstances
return workflowInstanceDao.queryWorkflowInstances(query);
}

/**
* Return workflow instances matching the given query.
* @param query The query parameters.
* @return Matching workflow instances as Stream. The stream does not need to be closed.
*/
public Stream<WorkflowInstance> listWorkflowInstancesAsStream(QueryWorkflowInstances query) {
return workflowInstanceDao.queryWorkflowInstancesAsStream(query);
}

/**
* Return current signal value for given workflow instance.
* @param workflowInstanceId Workflow instance id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.core.env.Environment;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand Down Expand Up @@ -106,6 +107,13 @@ public void roundTripTest() {
checkSameWorkflowInfo(i1, i2);
}

@Test
public void queryNonExistingWorkflowThrowsException() {
assertThrows(EmptyResultDataAccessException.class, () ->
dao.getWorkflowInstance(-42, emptySet(), null)
);
}

@Test
public void queryWorkflowInstanceWithAllConditions() {
WorkflowInstance i1 = constructWorkflowInstanceBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public boolean updateWorkflowInstance(final long id,
return workflowInstances.updateWorkflowInstance(instance, action);
}

public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(final List<Long> ids, final List<String> types,
public Stream<ListWorkflowInstanceResponse> listWorkflowInstances(final List<Long> ids, final List<String> types,
final Long parentWorkflowId, final Long parentActionId, final List<String> states,
final List<WorkflowInstanceStatus> statuses, final String businessKey, final String externalId, final String include,
final Long maxResults, final Long maxActions, final WorkflowInstanceService workflowInstances,
Expand All @@ -151,13 +151,9 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(final List
.setMaxResults(maxResults) //
.setMaxActions(maxActions) //
.setIncludeChildWorkflows(includeStrings.contains(childWorkflows)).build();
Collection<WorkflowInstance> instances = workflowInstances.listWorkflowInstances(q);
List<ListWorkflowInstanceResponse> resp = new ArrayList<>();
Stream<WorkflowInstance> instances = workflowInstances.listWorkflowInstancesAsStream(q);
Set<WorkflowInstanceInclude> parseIncludeEnums = parseIncludeEnums(include);
for (WorkflowInstance instance : instances) {
resp.add(listWorkflowConverter.convert(instance, parseIncludeEnums));
}
return resp;
return instances.map(instance -> listWorkflowConverter.convert(instance, parseIncludeEnums));
}

private Set<WorkflowInstanceInclude> parseIncludeEnums(String include) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import static javax.ws.rs.core.Response.Status.CONFLICT;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;

import javax.inject.Inject;
Expand Down Expand Up @@ -129,7 +129,7 @@ public ListWorkflowInstanceResponse fetchWorkflowInstance(

@GET
@ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List")
public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
public Iterator<ListWorkflowInstanceResponse> listWorkflowInstances(
@QueryParam("id") @ApiParam("Internal id of workflow instance") List<Long> ids,
@QueryParam("type") @ApiParam("Workflow definition type of workflow instance") List<String> types,
@QueryParam("parentWorkflowId") @ApiParam("Id of parent workflow instance") Long parentWorkflowId,
Expand All @@ -143,7 +143,7 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
@QueryParam("maxActions") @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) {
return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states,
statuses, businessKey, externalId, include, maxResults, maxActions,
workflowInstances, listWorkflowConverter);
workflowInstances, listWorkflowConverter).iterator();
}

@PUT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public void whenUpdatingStateVariablesUpdateWorkflowInstanceWorks() {
public void listWorkflowInstancesWorks() {
resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"),
asList(WorkflowInstanceStatus.created), "businessKey", "externalId", "", null, null);
verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf(
verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf(
hasField("ids", contains(42L)),
hasField("types", contains("type")),
hasField("parentWorkflowId", is(99L)),
Expand All @@ -204,7 +204,7 @@ public void listWorkflowInstancesWorksWithAllIncludes() {
resource.listWorkflowInstances(asList(42L), asList("type"), 99L, 88L, asList("state"),
asList(WorkflowInstanceStatus.created, WorkflowInstanceStatus.executing),
"businessKey", "externalId", "actions,currentStateVariables,actionStateVariables,childWorkflows", 1L, 1L);
verify(workflowInstances).listWorkflowInstances((QueryWorkflowInstances) argThat(allOf(
verify(workflowInstances).listWorkflowInstancesAsStream((QueryWorkflowInstances) argThat(allOf(
hasField("ids", contains(42L)),
hasField("types", contains("type")),
hasField("parentWorkflowId", is(99L)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import static org.springframework.http.ResponseEntity.status;

import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;

import javax.inject.Inject;
Expand Down Expand Up @@ -115,7 +115,7 @@ public ResponseEntity<ListWorkflowInstanceResponse> fetchWorkflowInstance(

@GetMapping
@ApiOperation(value = "List workflow instances", response = ListWorkflowInstanceResponse.class, responseContainer = "List")
public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
public Iterator<ListWorkflowInstanceResponse> listWorkflowInstances(
@RequestParam(value = "id", defaultValue = "") @ApiParam("Internal id of workflow instance") List<Long> ids,
@RequestParam(value = "type", defaultValue = "") @ApiParam("Workflow definition type of workflow instance") List<String> types,
@RequestParam(value = "parentWorkflowId", required = false) @ApiParam("Id of parent workflow instance") Long parentWorkflowId,
Expand All @@ -128,7 +128,7 @@ public Collection<ListWorkflowInstanceResponse> listWorkflowInstances(
@RequestParam(value = "maxResults", required = false) @ApiParam("Maximum number of workflow instances to be returned") Long maxResults,
@RequestParam(value = "maxActions", required = false) @ApiParam("Maximum number of actions returned for each workflow instance") Long maxActions) {
return super.listWorkflowInstances(ids, types, parentWorkflowId, parentActionId, states, statuses, businessKey, externalId,
include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter);
include, maxResults, maxActions, this.workflowInstances, this.listWorkflowConverter).iterator();
}

@PutMapping(path = "/{id}/signal", consumes = APPLICATION_JSON_VALUE)
Expand Down

0 comments on commit b63996b

Please sign in to comment.