Skip to content

Commit

Permalink
Support querying workflows from both main tables and archive. Stream …
Browse files Browse the repository at this point in the history
…workflow query results out of rest api, which makes it faster
  • Loading branch information
gmokki committed Feb 19, 2020
1 parent 9713b0a commit 698a65f
Show file tree
Hide file tree
Showing 14 changed files with 199 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static io.nflow.engine.internal.dao.DaoUtil.getInt;
import static io.nflow.engine.internal.dao.DaoUtil.getLong;
import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp;
import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE;
import static io.nflow.engine.internal.dao.TablePrefix.MAIN;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
Expand All @@ -14,6 +16,7 @@
import static java.util.Collections.sort;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.empty;
import static org.apache.commons.lang3.StringUtils.abbreviate;
import static org.apache.commons.lang3.StringUtils.join;
Expand Down Expand Up @@ -53,7 +56,6 @@
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementCreator;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowCallbackHandler;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
Expand Down Expand Up @@ -504,30 +506,31 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl
}

public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
String sql = "select * from nflow_workflow where id = ?";
return getWorkflowInstance(id, includes, maxActions, MAIN);
}

public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions, TablePrefix tablePrefix) {
String sql = "select * from " + tablePrefix.nameOf("workflow") + " where id = ?";
WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build();
if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) {
fillState(instance);
fillState(instance, tablePrefix);
}
if (includes.contains(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS)) {
fillChildWorkflowIds(instance);
fillChildWorkflowIds(instance, tablePrefix);
}
if (includes.contains(WorkflowInstanceInclude.ACTIONS)) {
fillActions(instance, includes.contains(WorkflowInstanceInclude.ACTION_STATE_VARIABLES), maxActions);
fillActions(instance, includes.contains(WorkflowInstanceInclude.ACTION_STATE_VARIABLES), maxActions, tablePrefix);
}
return instance;
}

private void fillState(final WorkflowInstance instance) {
jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside "
private void fillState(final WorkflowInstance instance, TablePrefix tablePrefix) {
jdbc.query("select outside.state_key, outside.state_value from " + tablePrefix.nameOf("workflow_state") + " outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? group by workflow_id, state_key) inside "
+ "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key",
new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}
}, instance.id);
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
}, instance.id);
instance.originalStateVariables.putAll(instance.stateVariables);
}

Expand Down Expand Up @@ -623,7 +626,10 @@ public int compareTo(OptimisticLockKey other) {
}

public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances query) {
String sql = "select * from nflow_workflow ";
return queryWorkflowInstancesAsStream(query).collect(toList());
}

public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInstances query) {
List<String> conditions = new ArrayList<>();
MapSqlParameterSource params = new MapSqlParameterSource();
conditions.add(executorInfo.getExecutorGroupCondition());
Expand Down Expand Up @@ -662,52 +668,54 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
}
conditions.add("executor_group = :executor_group");
params.addValue("executor_group", executorInfo.getExecutorGroup());
sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));
List<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build).collect(toList());
String sqlQuery = " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";

long maxResults = getMaxResults(query.maxResults);
String sql = sqlVariants.limit("select * from " + MAIN.nameOf("workflow") + sqlQuery, maxResults);
List<WorkflowInstance.Builder> results = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper());
maxResults -= results.size();
Stream<WorkflowInstance.Builder> resultStream = results.stream();

if (query.queryArchive && maxResults > 0) {
sql = sqlVariants.limit("select * from " + ARCHIVE.nameOf("workflow") + sqlQuery, maxResults);
resultStream = concat(resultStream, namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.peek(builder -> builder.setArchived(true)));
}

Stream<WorkflowInstance> ret = resultStream.map(WorkflowInstance.Builder::build);
if (query.includeCurrentStateVariables) {
for (WorkflowInstance instance : ret) {
fillState(instance);
}
ret = ret.peek(instance -> fillState(instance, instance.isArchived ? ARCHIVE : MAIN));
}
if (query.includeActions) {
for (WorkflowInstance instance : ret) {
fillActions(instance, query.includeActionStateVariables, query.maxActions);
}
ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions, instance.isArchived ? ARCHIVE : MAIN));
}
if (query.includeChildWorkflows) {
for (final WorkflowInstance instance : ret) {
fillChildWorkflowIds(instance);
}
ret = ret.peek(instance -> fillChildWorkflowIds(instance, instance.isArchived ? ARCHIVE : MAIN));
}
return ret;
}

private void fillChildWorkflowIds(final WorkflowInstance instance) {
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", new RowCallbackHandler() {
@Override
public void processRow(ResultSet rs) throws SQLException {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}
private void fillChildWorkflowIds(final WorkflowInstance instance, TablePrefix tablePrefix) {
jdbc.query("select parent_action_id, id from " + tablePrefix.nameOf("workflow") + " where parent_workflow_id = ?", rs -> {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}, instance.id);
}

private long getMaxResults(Long maxResults) {
if (maxResults == null) {
return workflowInstanceQueryMaxResultsDefault;
}
return min(maxResults.longValue(), workflowInstanceQueryMaxResults);
return min(maxResults, workflowInstanceQueryMaxResults);
}

private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) {
Map<Long, Map<String, String>> actionStates = includeStateVariables ? fetchActionStateVariables(instance)
private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions, TablePrefix tablePrefix) {
Map<Long, Map<String, String>> actionStates = includeStateVariables ? fetchActionStateVariables(instance, tablePrefix)
: EMPTY_ACTION_STATE_MAP;
String sql = sqlVariants.limit(
"select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc",
"select nflow_workflow_action.* from " + tablePrefix.nameOf("workflow_action") + " where workflow_id = ? order by id desc",
getMaxActions(maxActions));
instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id));
}
Expand All @@ -716,11 +724,11 @@ private long getMaxActions(Long maxActions) {
if (maxActions == null) {
return workflowInstanceQueryMaxActionsDefault;
}
return min(maxActions.longValue(), workflowInstanceQueryMaxActions);
return min(maxActions, workflowInstanceQueryMaxActions);
}

private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance) {
return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc",
private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance, TablePrefix tablePrefix) {
return jdbc.query("select * from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? order by action_id, state_key asc",
new WorkflowActionStateRowMapper(), instance.id);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.nflow.engine.internal.workflow;

import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.toList;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.Assert.notNull;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package io.nflow.engine.service;

import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE;
import static io.nflow.engine.internal.dao.TablePrefix.MAIN;
import static java.util.stream.Collectors.toList;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.StringUtils.isEmpty;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import javax.inject.Inject;

import io.nflow.engine.internal.dao.TablePrefix;
import org.slf4j.Logger;
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -52,7 +58,27 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow
* @return The workflow instance, or null if not found.
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions);
return getWorkflowInstance(id, includes, maxActions, false);
}

/**
* Return the workflow instance matching the given id.
* @param id Workflow instance id.
* @param includes Set of properties to be loaded.
* @param maxActions Maximum number of actions to be loaded.
* @param queryArchive Query archive if not found from main tables.
* @return The workflow instance
* @throws EmptyResultDataAccessException if not found
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions, boolean queryArchive) {
try {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions, MAIN);
} catch (EmptyResultDataAccessException ex) {
if (queryArchive) {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions, ARCHIVE);
}
throw ex;
}
}

/**
Expand Down Expand Up @@ -126,6 +152,15 @@ public Collection<WorkflowInstance> listWorkflowInstances(QueryWorkflowInstances
return workflowInstanceDao.queryWorkflowInstances(query);
}

/**
* Return workflow instances matching the given query.
* @param query The query parameters.
* @return Matching workflow instances as Stream. The stream does not need to be closed.
*/
public Stream<WorkflowInstance> listWorkflowInstancesAsStream(QueryWorkflowInstances query) {
return workflowInstanceDao.queryWorkflowInstancesAsStream(query);
}

/**
* Return current signal value for given workflow instance.
* @param workflowInstanceId Workflow instance id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class QueryWorkflowInstances extends ModelObject {
*/
public final Long maxActions;

/**
* When set also the workflow archive will be queried.
*/
public final boolean queryArchive;

QueryWorkflowInstances(Builder builder) {
this.ids = new ArrayList<>(builder.ids);
this.types = new ArrayList<>(builder.types);
Expand All @@ -100,6 +105,7 @@ public class QueryWorkflowInstances extends ModelObject {
this.includeChildWorkflows = builder.includeChildWorkflows;
this.maxResults = builder.maxResults;
this.maxActions = builder.maxActions;
this.queryArchive = builder.queryArchive;
}

/**
Expand All @@ -120,6 +126,7 @@ public static class Builder {
boolean includeChildWorkflows;
Long maxResults;
Long maxActions;
boolean queryArchive;

/**
* Create a workflow instance query builder.
Expand All @@ -142,6 +149,7 @@ public Builder(QueryWorkflowInstances copy) {
this.includeChildWorkflows = copy.includeChildWorkflows;
this.maxResults = copy.maxResults;
this.maxActions = copy.maxActions;
this.queryArchive = copy.queryArchive;
}
/**
* Add identifiers to query parameters.
Expand Down Expand Up @@ -287,6 +295,17 @@ public Builder setMaxActions(Long maxActions) {
return this;
}

/**
* If true the workflow instance archive is also searched.
*
* @param queryArchive True if archive should also be queried.
* @return this.
*/
public Builder setQueryArchive(boolean queryArchive) {
this.queryArchive = queryArchive;
return this;
}

/**
* Create the workflow instance query object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public enum WorkflowInstanceStatus {
*/
public final Optional<Integer> signal;

/**
* True if this instance is archived.
*/
public final boolean isArchived;

/**
* Child workflow instance IDs created by this workflow instance, grouped by instance action ID.
*/
Expand Down Expand Up @@ -178,6 +183,7 @@ public enum WorkflowInstanceStatus {
this.started = builder.started;
this.executorGroup = builder.executorGroup;
this.signal = builder.signal;
this.isArchived = builder.isArchived;
this.mapper = builder.mapper;
}

Expand Down Expand Up @@ -251,6 +257,7 @@ public static class Builder {
DateTime started;
DateTime modified;
String executorGroup;
boolean isArchived;
Optional<Integer> signal = Optional.empty();
ObjectStringMapper mapper;

Expand Down Expand Up @@ -295,6 +302,7 @@ public Builder(WorkflowInstance copy) {
this.started = copy.started;
this.executorGroup = copy.executorGroup;
this.signal = copy.signal;
this.isArchived = copy.isArchived;
this.mapper = copy.mapper;
}

Expand Down Expand Up @@ -561,6 +569,16 @@ public Builder setSignal(Optional<Integer> signal) {
return this;
}

/**
* Set the workflow source.
* @param isArchived True if this workflow is archived.
* @return this.
*/
public Builder setArchived(boolean isArchived) {
this.isArchived = isArchived;
return this;
}

/**
* Create the workflow instance object.
* @return The workflow instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,11 @@ public void deleteExpiredWorkflowHistory() {
maintenanceDao.deleteActionAndStateHistory(parentWorkflowId, now());

parentWorkflow = workflowInstanceDao.getWorkflowInstance(parentWorkflowId, EnumSet.allOf(WorkflowInstanceInclude.class),
null);
null, MAIN);
assertThat(parentWorkflow.getStateVariable("requestData"), equalTo("{ \"parameter\": \"abc\" }"));
assertThat(parentWorkflow.getStateVariable("variable"), equalTo("preservedValue"));
assertThat(parentWorkflow.actions.size(), equalTo(2));
childWorkflow = workflowInstanceDao.getWorkflowInstance(childWorkflowId, emptySet(), null);
childWorkflow = workflowInstanceDao.getWorkflowInstance(childWorkflowId, emptySet(), null, MAIN);
assertThat(childWorkflow.parentWorkflowId, equalTo(parentWorkflowId));
}

Expand Down Expand Up @@ -459,7 +459,7 @@ private long addWorkflowAction(long workflowId, final WorkflowInstance instance,

private void assertActiveWorkflowsRemoved(List<Long> workflowIds) {
for (long id : workflowIds) {
assertThrows(EmptyResultDataAccessException.class, () -> workflowInstanceDao.getWorkflowInstance(id, emptySet(), null));
assertThrows(EmptyResultDataAccessException.class, () -> workflowInstanceDao.getWorkflowInstance(id, emptySet(), null, MAIN));
}
}

Expand Down Expand Up @@ -594,7 +594,7 @@ private long insert(WorkflowInstance instance) {
assertTrue(id > 0);
DateTime modified = instance.modified;
updateModified(id, modified);
WorkflowInstance dbInstance = workflowInstanceDao.getWorkflowInstance(id, emptySet(), null);
WorkflowInstance dbInstance = workflowInstanceDao.getWorkflowInstance(id, emptySet(), null, MAIN);
assertEquals(modified, dbInstance.modified);
return id;
}
Expand Down
Loading

0 comments on commit 698a65f

Please sign in to comment.