Skip to content

Commit

Permalink
Merge 4fa6a7d into 5a1f294
Browse files Browse the repository at this point in the history
  • Loading branch information
gmokki committed Apr 27, 2020
2 parents 5a1f294 + 4fa6a7d commit 873b724
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 59 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
## 7.2.1-SNAPSHOT (future release)

**Highlights**
- Support for querying archived workflow instances.

**Details**
- `nflow-engine`
- Query interfaces allow to request searching of archived workflow instances if not enough matches found from main tables.
- `nflow-rest-api-jax-rs` and `nflow-rest-api-spring-web`
- Support for querying archived workflow instances when passing `queryArchive=true` query parameter if not enough matches found from main tables.
- `nflow-explorer`
- Query and show archived workflow instances by default if not enough matches found from main tables. Configurable in `config.js`.

## 7.2.0 (2020-04-27)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static io.nflow.engine.internal.dao.DaoUtil.getInt;
import static io.nflow.engine.internal.dao.DaoUtil.getLong;
import static io.nflow.engine.internal.dao.DaoUtil.toTimestamp;
import static io.nflow.engine.internal.dao.TablePrefix.ARCHIVE;
import static io.nflow.engine.internal.dao.TablePrefix.MAIN;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.executing;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
Expand All @@ -13,7 +15,9 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.sort;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Stream.concat;
import static java.util.stream.Stream.empty;
import static org.apache.commons.lang3.StringUtils.abbreviate;
import static org.apache.commons.lang3.StringUtils.join;
Expand Down Expand Up @@ -505,23 +509,37 @@ private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workfl
}

public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
String sql = "select * from nflow_workflow where id = ?";
WorkflowInstance instance = jdbc.queryForObject(sql, new WorkflowInstanceRowMapper(), id).build();
return getWorkflowInstance(id, includes, maxActions, false);
}

public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions, boolean queryArchive) {
String sql = "select *, 0 as archived from " + MAIN.nameOf("workflow") + " where id = ?";
Object[] args = new Object[]{ id };
if (queryArchive) {
sql += " union all select *, 1 as archived from " + ARCHIVE.nameOf("workflow") + " where id = ?";
args = new Object[]{ id, id };
}
WorkflowInstance instance = jdbc.queryForObject(sql, args, new WorkflowInstanceRowMapper()).build();
if (includes.contains(WorkflowInstanceInclude.CURRENT_STATE_VARIABLES)) {
fillState(instance);
}
if (includes.contains(WorkflowInstanceInclude.CHILD_WORKFLOW_IDS)) {
fillChildWorkflowIds(instance);
fillChildWorkflowIds(instance, queryArchive);
}
if (includes.contains(WorkflowInstanceInclude.ACTIONS)) {
fillActions(instance, includes.contains(WorkflowInstanceInclude.ACTION_STATE_VARIABLES), maxActions);
}
return instance;
}

private TablePrefix sourceTable(WorkflowInstance instance) {
return instance.isArchived ? ARCHIVE : MAIN;
}

private void fillState(final WorkflowInstance instance) {
jdbc.query("select outside.state_key, outside.state_value from nflow_workflow_state outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from nflow_workflow_state where workflow_id = ? group by workflow_id, state_key) inside "
TablePrefix tablePrefix = sourceTable(instance);
jdbc.query("select outside.state_key, outside.state_value from " + tablePrefix.nameOf("workflow_state") + " outside inner join "
+ "(select workflow_id, max(action_id) action_id, state_key from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? group by workflow_id, state_key) inside "
+ "on outside.workflow_id = inside.workflow_id and outside.action_id = inside.action_id and outside.state_key = inside.state_key",
rs -> {
instance.stateVariables.put(rs.getString(1), rs.getString(2));
Expand Down Expand Up @@ -626,7 +644,6 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
}

public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInstances query) {
String sql = "select * from nflow_workflow ";
List<String> conditions = new ArrayList<>();
MapSqlParameterSource params = new MapSqlParameterSource();
conditions.add(executorInfo.getExecutorGroupCondition());
Expand Down Expand Up @@ -665,30 +682,43 @@ public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInst
}
conditions.add("executor_group = :executor_group");
params.addValue("executor_group", executorInfo.getExecutorGroup());
sql += " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));
String whereCondition = " where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";

Stream<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build);
long maxResults = getMaxResults(query.maxResults);
String sql = sqlVariants.limit("select *, 0 as archived from " + MAIN.nameOf("workflow") + whereCondition, maxResults);
List<WorkflowInstance.Builder> results = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper());
maxResults -= results.size();
Stream<WorkflowInstance.Builder> resultStream = results.stream();

if (query.queryArchive && maxResults > 0) {
sql = sqlVariants.limit("select *, 1 as archived from " + ARCHIVE.nameOf("workflow") + whereCondition, maxResults);
resultStream = concat(resultStream, namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream());
}

Stream<WorkflowInstance> ret = resultStream.map(WorkflowInstance.Builder::build);
if (query.includeCurrentStateVariables) {
ret = ret.peek(instance -> fillState(instance));
}
if (query.includeActions) {
ret = ret.peek(instance -> fillActions(instance, query.includeActionStateVariables, query.maxActions));
}
if (query.includeChildWorkflows) {
ret = ret.peek(instance -> fillChildWorkflowIds(instance));
ret = ret.peek(instance -> fillChildWorkflowIds(instance, query.queryArchive));
}
return ret;
}

private void fillChildWorkflowIds(final WorkflowInstance instance) {
jdbc.query("select parent_action_id, id from nflow_workflow where parent_workflow_id = ?", rs -> {
private void fillChildWorkflowIds(final WorkflowInstance instance, boolean queryArchive) {
Stream<TablePrefix> tables = queryArchive ? Stream.of(MAIN, ARCHIVE) : Stream.of(MAIN);
String sql = tables.map(tablePrefix -> "select parent_action_id, id from " + tablePrefix.nameOf("workflow") + " where parent_workflow_id = ?")
.collect(joining(" union all "));
Object[] args = queryArchive ? new Object[]{instance.id, instance.id} : new Object[]{instance.id};
jdbc.query(sql, args, rs -> {
long parentActionId = rs.getLong(1);
long childWorkflowInstanceId = rs.getLong(2);
List<Long> children = instance.childWorkflows.computeIfAbsent(parentActionId, k -> new ArrayList<>());
children.add(childWorkflowInstanceId);
}, instance.id);
});
}

private long getMaxResults(Long maxResults) {
Expand All @@ -701,8 +731,9 @@ private long getMaxResults(Long maxResults) {
private void fillActions(WorkflowInstance instance, boolean includeStateVariables, Long maxActions) {
Map<Long, Map<String, String>> actionStates = includeStateVariables ? fetchActionStateVariables(instance)
: EMPTY_ACTION_STATE_MAP;
TablePrefix tablePrefix = sourceTable(instance);
String sql = sqlVariants.limit(
"select nflow_workflow_action.* from nflow_workflow_action where workflow_id = ? order by id desc",
"select nflow_workflow_action.* from " + tablePrefix.nameOf("workflow_action") + " where workflow_id = ? order by id desc",
getMaxActions(maxActions));
instance.actions.addAll(jdbc.query(sql, new WorkflowInstanceActionRowMapper(sqlVariants, actionStates), instance.id));
}
Expand All @@ -715,7 +746,8 @@ private long getMaxActions(Long maxActions) {
}

private Map<Long, Map<String, String>> fetchActionStateVariables(WorkflowInstance instance) {
return jdbc.query("select * from nflow_workflow_state where workflow_id = ? order by action_id, state_key asc",
TablePrefix tablePrefix = sourceTable(instance);
return jdbc.query("select * from " + tablePrefix.nameOf("workflow_state") + " where workflow_id = ? order by action_id, state_key asc",
new WorkflowActionStateRowMapper(), instance.id);
}

Expand Down Expand Up @@ -776,7 +808,8 @@ public WorkflowInstance.Builder mapRow(ResultSet rs, int rowNum) throws SQLExcep
.setModified(sqlVariants.getDateTime(rs, "modified")) //
.setStartedIfNotSet(sqlVariants.getDateTime(rs, "started")) //
.setExecutorGroup(rs.getString("executor_group")) //
.setSignal(ofNullable(getInt(rs, "workflow_signal")));
.setSignal(ofNullable(getInt(rs, "workflow_signal"))) //
.setArchived(rs.getBoolean("archived"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,21 @@ public WorkflowInstanceService(WorkflowInstanceDao workflowInstanceDao, Workflow
* @throws NflowNotFoundException If workflow instance is not found.
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions) {
return getWorkflowInstance(id, includes, maxActions, false);
}

/**
* Return the workflow instance matching the given id.
* @param id Workflow instance id.
* @param includes Set of properties to be loaded.
* @param maxActions Maximum number of actions to be loaded.
* @param queryArchive Query archive if not found from main tables.
* @return The workflow instance
* @throws EmptyResultDataAccessException if not found
*/
public WorkflowInstance getWorkflowInstance(long id, Set<WorkflowInstanceInclude> includes, Long maxActions, boolean queryArchive) {
try {
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions);
return workflowInstanceDao.getWorkflowInstance(id, includes, maxActions, queryArchive);
} catch (EmptyResultDataAccessException e) {
throw new NflowNotFoundException("Workflow instance", id, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class QueryWorkflowInstances extends ModelObject {
*/
public final Long maxActions;

/**
* When set also the workflow archive will be queried.
*/
public final boolean queryArchive;

QueryWorkflowInstances(Builder builder) {
this.ids = new ArrayList<>(builder.ids);
this.types = new ArrayList<>(builder.types);
Expand All @@ -100,6 +105,7 @@ public class QueryWorkflowInstances extends ModelObject {
this.includeChildWorkflows = builder.includeChildWorkflows;
this.maxResults = builder.maxResults;
this.maxActions = builder.maxActions;
this.queryArchive = builder.queryArchive;
}

/**
Expand All @@ -120,6 +126,7 @@ public static class Builder {
boolean includeChildWorkflows;
Long maxResults;
Long maxActions;
boolean queryArchive;

/**
* Create a workflow instance query builder.
Expand All @@ -142,6 +149,7 @@ public Builder(QueryWorkflowInstances copy) {
this.includeChildWorkflows = copy.includeChildWorkflows;
this.maxResults = copy.maxResults;
this.maxActions = copy.maxActions;
this.queryArchive = copy.queryArchive;
}
/**
* Add identifiers to query parameters.
Expand Down Expand Up @@ -287,6 +295,17 @@ public Builder setMaxActions(Long maxActions) {
return this;
}

/**
* If true the workflow instance archive is also searched.
*
* @param queryArchive True if archive should also be queried.
* @return this.
*/
public Builder setQueryArchive(boolean queryArchive) {
this.queryArchive = queryArchive;
return this;
}

/**
* Create the workflow instance query object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,11 @@ public enum WorkflowInstanceStatus {
*/
public final Optional<Integer> signal;

/**
* True if this instance is archived.
*/
public final boolean isArchived;

/**
* Child workflow instance IDs created by this workflow instance, grouped by instance action ID.
*/
Expand Down Expand Up @@ -178,6 +183,7 @@ public enum WorkflowInstanceStatus {
this.started = builder.started;
this.executorGroup = builder.executorGroup;
this.signal = builder.signal;
this.isArchived = builder.isArchived;
this.mapper = builder.mapper;
}

Expand Down Expand Up @@ -251,6 +257,7 @@ public static class Builder {
DateTime started;
DateTime modified;
String executorGroup;
boolean isArchived;
Optional<Integer> signal = Optional.empty();
ObjectStringMapper mapper;

Expand Down Expand Up @@ -295,6 +302,7 @@ public Builder(WorkflowInstance copy) {
this.started = copy.started;
this.executorGroup = copy.executorGroup;
this.signal = copy.signal;
this.isArchived = copy.isArchived;
this.mapper = copy.mapper;
}

Expand Down Expand Up @@ -561,6 +569,16 @@ public Builder setSignal(Optional<Integer> signal) {
return this;
}

/**
* Set the workflow source.
* @param isArchived True if this workflow is archived.
* @return this.
*/
public Builder setArchived(boolean isArchived) {
this.isArchived = isArchived;
return this;
}

/**
* Create the workflow instance object.
* @return The workflow instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public void queryWorkflowInstanceWithAllConditions() {
.setIncludeCurrentStateVariables(true) //
.setIncludeChildWorkflows(true) //
.setMaxResults(1L) //
.setMaxActions(1L).build();
.setMaxActions(1L) //
.setQueryArchive(true) //
.build();
List<WorkflowInstance> l = dao.queryWorkflowInstances(q);
assertThat(l.size(), is(1));
checkSameWorkflowInfo(child, l.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void getWorkflowInstance() {
WorkflowInstance instance = Mockito.mock(WorkflowInstance.class);
@SuppressWarnings("unchecked")
Set<WorkflowInstanceInclude> includes = Mockito.mock(Set.class);
when(workflowInstanceDao.getWorkflowInstance(42, includes, 10L)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstance(42, includes, 10L, false)).thenReturn(instance);
assertEquals(instance, service.getWorkflowInstance(42, includes, 10L));
}

Expand Down
1 change: 1 addition & 0 deletions nflow-explorer/src/app/search/criteriaModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

q.type = _.result(self.model.definition, 'type');
q.state = _.result(self.model.state, 'id');
q.queryArchive = true;
_.defaults(q, _.omit(self.model, ['definition', 'state']));
return omitNonValues(q);
}
Expand Down
2 changes: 1 addition & 1 deletion nflow-explorer/src/app/services/WorkflowService.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

function get(workflowId) {
return RestHelper.get({
path: '/v1/workflow-instance/id/' + workflowId + '?include=actions,currentStateVariables,actionStateVariables'
path: '/v1/workflow-instance/id/' + workflowId + '?include=actions,currentStateVariables,actionStateVariables&queryArchive=true'
});
}

Expand Down
16 changes: 8 additions & 8 deletions nflow-explorer/test/spec/search/criteriaModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,35 @@ describe('Service: CriteriaModel', function () {
describe('toQuery', function () {
it('sets definition type when available', function () {
actualModel.definition = definitions[0];
expect(CriteriaModel.toQuery()).toEqual({type: 'foo'});
expect(CriteriaModel.toQuery()).toEqual({type: 'foo', queryArchive: true});

delete actualModel.definition.type;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});

delete actualModel.definition;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});

});

it('sets state id', function () {
actualModel.state = definitions[0].states[0];
expect(CriteriaModel.toQuery()).toEqual({state: 'bar'});
expect(CriteriaModel.toQuery()).toEqual({state: 'bar', queryArchive: true});

delete actualModel.state.id;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});

delete actualModel.state;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});
});

it('null values are omitted', function () {
actualModel.foo = null;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});
});

it('undefined values are omitted', function () {
actualModel.foo = undefined;
expect(CriteriaModel.toQuery()).toEqual({});
expect(CriteriaModel.toQuery()).toEqual({queryArchive: true});
});
});

Expand Down
2 changes: 1 addition & 1 deletion nflow-explorer/test/spec/search/searchForm.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ describe('Directive: searchForm', function () {
$httpBackend = _$httpBackend_;

CriteriaModel.model = { foo: 'bar' };
url = config.nflowUrl + '/v1/workflow-instance?foo=bar';
url = config.nflowUrl + '/v1/workflow-instance?foo=bar&queryArchive=true';
}));

afterEach(function() {
Expand Down
Loading

0 comments on commit 873b724

Please sign in to comment.