Skip to content

Commit

Permalink
move unfinished children check to workflow instance service, add test…
Browse files Browse the repository at this point in the history
…s and fix a bug in the query
  • Loading branch information
efonsell committed Feb 19, 2021
1 parent d2e4a1a commit a014f84
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
- Control whether the stack trace of the exception is logged or not.
- Control how long the `WorkflowStateProcessor` should sleep before retrying.
- Support in `CronWorkflow` to wait for child workflow instances created in `doWork` state method to finish before scheduling the next work. Return `NextAction.moveToStateAfter(waitForWorkToFinish, ...)` with some fail-safe waiting time instead of `NextAction.moveToState(schedule, ...)` to avoid immediate re-scheduling. When child workflows finish, they will wake up the parent workflow automatically, if it is still in the waiting state. Default implementation will check if any child workflows are still running, and keep waiting until they are all finished. Override `CronWorkflow.waitForWorkToFinishImpl` for custom logic.
- Add `StateExecution.hasUnfinishedChildren` helper to check if the workflow instance has any child workflow instances with any other status than `WorkflowInstanceStatus.finished`.
- Add `hasUnfinishedChildWorkflows` helper in `StateExecution` and `WorkflowInstanceService` to check if the workflow instance has any child workflow instances with any other status than `WorkflowInstanceStatus.finished`.
- `nflow-rest-api-common`, `nflow-rest-api-jax-rs`, `nflow-rest-api-spring-web`
- `UpdateWorkflowInstanceRequest.businessKey` field was added to support updating workflow instance business key via REST API.
- Added support for new query parameters `stateVariableKey` and `stateVariableValue` to `GET /v1/workflow-instance` to limit search query by state variable name and key. Only the latest value of the state variable of the workflow instance is used.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.nflow.engine.internal.workflow;

import static java.util.Collections.unmodifiableList;
import static java.util.EnumSet.complementOf;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.Assert.notNull;

import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,14 +22,11 @@
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType;

public class StateExecutionImpl extends ModelObject implements StateExecution {

private static final Logger LOG = getLogger(StateExecutionImpl.class);
private static final WorkflowInstanceStatus[] UNFINISHED_STATUSES = complementOf(EnumSet.of(WorkflowInstanceStatus.finished))
.toArray(new WorkflowInstanceStatus[0]);
private final WorkflowInstance instance;
private final ObjectStringMapper objectMapper;
private final WorkflowInstanceDao workflowDao;
Expand Down Expand Up @@ -315,9 +310,7 @@ public void handleFailure(AbstractWorkflowDefinition<?> definition, String failu
}

@Override
public boolean hasUnfinishedChildren() {
QueryWorkflowInstances unfinishedChildren = new QueryWorkflowInstances.Builder().addStatuses(UNFINISHED_STATUSES)
.setParentActionId(instance.id).build();
return !workflowDao.queryWorkflowInstances(unfinishedChildren).isEmpty();
public boolean hasUnfinishedChildWorkflows() {
return workflowInstanceService.hasUnfinishedChildWorkflows(instance.id);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.nflow.engine.service;

import static java.util.Collections.emptySet;
import static java.util.EnumSet.complementOf;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.StringUtils.isEmpty;

import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -24,6 +26,7 @@
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType;

Expand All @@ -34,6 +37,8 @@
public class WorkflowInstanceService {

private static final Logger logger = getLogger(WorkflowInstanceService.class);
private static final WorkflowInstanceStatus[] UNFINISHED_STATUSES = complementOf(EnumSet.of(WorkflowInstanceStatus.finished))
.toArray(new WorkflowInstanceStatus[0]);

private final WorkflowDefinitionService workflowDefinitionService;
private final WorkflowInstanceDao workflowInstanceDao;
Expand Down Expand Up @@ -181,4 +186,17 @@ private AbstractWorkflowDefinition<?> getDefinition(Long workflowInstanceId) {
return workflowDefinitionService.getWorkflowDefinition(workflowInstanceDao.getWorkflowInstanceType(workflowInstanceId));
}

/**
* Return true if this workflow instance has unfinished child workflow instances.
*
* @param workflowInstanceId
* The parent workflow instance id.
*
* @return True if the workflow instance has unfinished child workflow instances, false otherwise.
*/
public boolean hasUnfinishedChildWorkflows(long workflowInstanceId) {
QueryWorkflowInstances unfinishedChildren = new QueryWorkflowInstances.Builder().addStatuses(UNFINISHED_STATUSES)
.setParentWorkflowId(workflowInstanceId).build();
return !listWorkflowInstances(unfinishedChildren).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public NextAction waitForWorkToFinish(StateExecution execution) {
* @return Time when check should be retried. Null to go to schedule state immediately.
*/
protected DateTime waitForWorkToFinishImpl(StateExecution execution) {
if (execution.hasUnfinishedChildren()) {
if (execution.hasUnfinishedChildWorkflows()) {
logger.info("Unfinished child workflow found, waiting before scheduling next work.");
return now().plusHours(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,5 @@ public interface StateExecution {
*
* @return True if unfinished child workflow instances are found, false otherwise.
*/
boolean hasUnfinishedChildren();
boolean hasUnfinishedChildWorkflows();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.externalChange;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand All @@ -12,7 +13,9 @@
import static org.joda.time.DateTimeUtils.setCurrentMillisFixed;
import static org.joda.time.DateTimeUtils.setCurrentMillisSystem;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
Expand All @@ -23,6 +26,7 @@
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -42,6 +46,7 @@
import io.nflow.engine.workflow.definition.WorkflowDefinition;
import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType;

Expand Down Expand Up @@ -157,7 +162,7 @@ public void wakeUpWorkflowInstance() {

@Test
public void listWorkflowInstances() {
List<WorkflowInstance> result = asList(constructWorkflowInstanceBuilder().build());
List<WorkflowInstance> result = asList(constructWorkflowInstanceBuilder().build());
QueryWorkflowInstances query = mock(QueryWorkflowInstances.class);
when(workflowInstanceDao.queryWorkflowInstances(query)).thenReturn(result);
assertEquals(result, service.listWorkflowInstances(query));
Expand Down Expand Up @@ -207,4 +212,30 @@ public void resetSignalDoesNotQueryWorkflowDefinition() {
verify(workflowDefinitions, never()).getWorkflowDefinition(anyString());
}

@Test
public void hasUnfinishedChildWorkflowsReturnsFalseWhenInstanceHasNoUnfinishedChildren() {
List<WorkflowInstance> result = emptyList();
when(workflowInstanceDao.queryWorkflowInstances(queryCapture.capture())).thenReturn(result);

assertFalse(service.hasUnfinishedChildWorkflows(42));
QueryWorkflowInstances query = queryCapture.getValue();
assertThat(query.parentWorkflowId, is(42L));
EnumSet.complementOf(EnumSet.of(WorkflowInstanceStatus.finished)).stream()
.forEach(status -> assertTrue(query.statuses.contains(status)));
assertFalse(query.statuses.contains(WorkflowInstanceStatus.finished));
}

@Test
public void hasUnfinishedChildWorkflowsReturnsTrueWhenInstanceHasUnfinishedChild() {
WorkflowInstance unfinished = constructWorkflowInstanceBuilder().build();
List<WorkflowInstance> result = asList(unfinished);
when(workflowInstanceDao.queryWorkflowInstances(queryCapture.capture())).thenReturn(result);

assertTrue(service.hasUnfinishedChildWorkflows(42));
QueryWorkflowInstances query = queryCapture.getValue();
assertThat(query.parentWorkflowId, is(42L));
EnumSet.complementOf(EnumSet.of(WorkflowInstanceStatus.finished)).stream()
.forEach(status -> assertTrue(query.statuses.contains(status)));
assertFalse(query.statuses.contains(WorkflowInstanceStatus.finished));
}
}

0 comments on commit a014f84

Please sign in to comment.