Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk worklfow demo + test #286

Merged
merged 22 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## 5.3.4-SNAPSHOT (future release)
## 5.4.0-SNAPSHOT (future release)

**Highlights**
- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system
- Allow child workflows to automatically wake up parent workflow instance when they enter end state (can be enabled via workflow settings)
efonsell marked this conversation as resolved.
Show resolved Hide resolved
- Allow creating workflows via REST API with null activation time (by setting activate = false)

**Details**
- Support boxed primitives (Integer, Float etc) with @StateVar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
Expand Down Expand Up @@ -204,9 +205,18 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
}
WorkflowState nextState = definition.getState(execution.getNextState());
if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
AbstractWorkflowDefinition<?> parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
String parentCurrentState = workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId);
if (parentDefinition.getState(parentCurrentState).getType() == WorkflowStateType.wait) {
execution.wakeUpParentWorkflow();
}
}
WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) //
.setNextActivation(execution.getNextActivation()) //
.setStatus(getStatus(execution, definition.getState(execution.getNextState()))) //
.setStatus(getStatus(execution, nextState)) //
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.nflow.engine.workflow.definition;

import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.error;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.splitWork;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.waitForChildrenToFinish;
import static io.nflow.engine.workflow.definition.NextAction.moveToState;
import static io.nflow.engine.workflow.definition.NextAction.retryAfter;
import static io.nflow.engine.workflow.definition.WorkflowStateType.end;
import static io.nflow.engine.workflow.definition.WorkflowStateType.manual;
import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static io.nflow.engine.workflow.definition.WorkflowStateType.wait;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.finished;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static java.util.EnumSet.complementOf;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.EnumSet;
import java.util.List;

import javax.inject.Inject;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;

import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;

/**
* Bulk child workflow executor that does not overflow the system.
*/
@Component
public class BulkWorkflow extends WorkflowDefinition<BulkWorkflow.State> {

public static final String BULK_WORKFLOW_TYPE = "bulk";
public static final String VAR_CHILD_DATA = "childData";
public static final String VAR_CONCURRENCY = "concurrency";

private static final EnumSet<WorkflowInstanceStatus> RUNNING_STATES = complementOf(EnumSet.of(finished, created));
private static final Logger logger = getLogger(BulkWorkflow.class);

@Inject
WorkflowInstanceService instanceService;

public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
splitWork(start), waitForChildrenToFinish(wait), done(end), error(manual);

private WorkflowStateType type;

State(WorkflowStateType type) {
this.type = type;
}

@Override
public WorkflowStateType getType() {
return type;
}

@Override
public String getDescription() {
return name();
}
}

protected BulkWorkflow(String type) {
super(type, splitWork, error, new WorkflowSettings.Builder().setMaxRetries(Integer.MAX_VALUE).build());
setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks.");
permit(splitWork, waitForChildrenToFinish);
permit(waitForChildrenToFinish, done);
}

public BulkWorkflow() {
this(BULK_WORKFLOW_TYPE);
}

public NextAction splitWork(StateExecution execution, @StateVar(value = VAR_CHILD_DATA, readOnly = true) JsonNode data) {
boolean childrenFound = splitWorkImpl(execution, data);
if (childrenFound) {
return moveToState(waitForChildrenToFinish, "Running");
}
return retryAfter(waitForChildrenUntil(), "Waiting for child workflows");
}

protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) {
if (execution.getAllChildWorkflows().isEmpty()) {
throw new RuntimeException(
"No child workflows found - either add them before starting the parent or implement splitWorkflowImpl");
}
return true;
}

protected DateTime waitForChildrenUntil() {
return now().plusHours(1);
}

public NextAction waitForChildrenToFinish(StateExecution execution,
@StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) {
List<WorkflowInstance> childWorkflows = execution.getAllChildWorkflows();
long completed = 0;
long running = 0;
for (WorkflowInstance child : childWorkflows) {
if (child.status == finished) {
completed++;
} else if (isRunning(child)) {
running++;
}
}
if (completed == childWorkflows.size()) {
return moveToState(done, "All children completed");
}
long toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed);
if (toStart > 0) {
childWorkflows.stream().filter(this::isInInitialState).limit(toStart).forEach(this::wakeup);
logger.info("Started " + toStart + " child workflows");
}
long progress = completed * 100 / childWorkflows.size();
return retryAfter(waitForChildrenToCompleteUntil(), "Waiting for child workflows to complete - " + progress + "% done");
}

private void wakeup(WorkflowInstance instance) {
instanceService.wakeupWorkflowInstance(instance.id, emptyList());
}

protected boolean isRunning(WorkflowInstance instance) {
return RUNNING_STATES.contains(instance.status);
}

private boolean isInInitialState(WorkflowInstance instance) {
return instance.status == created;
}

protected DateTime waitForChildrenToCompleteUntil() {
return now().plusMinutes(15);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public enum WorkflowStateType {
*/
normal(false, WorkflowInstanceStatus.inProgress),

/**
* State for waiting something outside this instance to happen, including (but not limited to) child workflow instances to go to
* end state. When a child workflow finishes and parent is in wait state, parent is automatically woken up.
*/
wait(false, WorkflowInstanceStatus.inProgress),
efonsell marked this conversation as resolved.
Show resolved Hide resolved

/**
* Final state of the workflow. Workflow execution is stopped.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.BulkWorkflow;
import io.nflow.engine.workflow.definition.Mutable;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
Expand Down Expand Up @@ -433,26 +434,60 @@ public void goToErrorStateWhenNextStateIsNull() {
public void doNothingWhenNotifyingParentWithoutParentWorkflowId() {
WorkflowInstance instance = executingInstanceBuilder().setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);

executor.run();

verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(List.class));
}

@Test
public void whenWakingUpParentWorkflowSucceeds() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
TestDefinition parentDefinition = mock(TestDefinition.class);
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(true);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());
}

@Test
public void whenWakingUpParentWorkflowFails() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
TestDefinition parentDefinition = mock(TestDefinition.class);
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(false);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());
}

@Test
public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("simple-test").setState("processing")
.build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn(BulkWorkflow.BULK_WORKFLOW_TYPE);
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId))
.thenReturn(BulkWorkflow.State.waitForChildrenToFinish.name());
BulkWorkflow parentDefinition = new BulkWorkflow();
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition(BulkWorkflow.BULK_WORKFLOW_TYPE);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(true);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ public CreateWorkflowConverter(WorkflowInstanceFactory factory) {
public WorkflowInstance convert(CreateWorkflowInstanceRequest req) {
WorkflowInstance.Builder builder = factory.newWorkflowInstanceBuilder().setType(req.type).setBusinessKey(req.businessKey)
.setExternalId(req.externalId);
if (req.activationTime != null) {
builder.setNextActivation(req.activationTime);
if (req.activate) {
if (req.activationTime != null) {
builder.setNextActivation(req.activationTime);
}
} else {
builder.setNextActivation(null);
}
if (isNotEmpty(req.startState)) {
builder.setState(req.startState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ public class CreateWorkflowInstanceRequest extends ModelObject {
@ApiModelProperty("Unique external identifier within the workflow type. Generated by nflow if not given.")
public String externalId;

@ApiModelProperty("Start time for workflow execution. If null, defaults to now.")
@ApiModelProperty("Start time for workflow execution. If null, defaults to now, unless activate is set to false, in which case activationTime is ignored.")
public DateTime activationTime;

@ApiModelProperty("Set to false to force activationTime to null. Default is true.")
public boolean activate = true;

@ApiModelProperty("State variables to be set for the new workflow instance.")
public Map<String, Object> stateVariables = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.joda.time.DateTime.now;
import static org.junit.Assert.assertThat;

import org.joda.time.DateTime;
Expand Down Expand Up @@ -66,6 +67,19 @@ public void convertAndValidateWorksWithMinimalData() {
assertThat(i.type, equalTo(req.type));
}

@Test
public void nextActivationIsSetToNullWhenInstanceIsNotActivated() {
CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest();
req.type = "wfType";
req.activationTime = now();
req.activate = false;
WorkflowInstance i = converter.convert(req);
assertThat(i.nextActivation, nullValue(DateTime.class));
assertThat(i.businessKey, nullValue(String.class));
assertThat(i.externalId, nullValue(String.class));
assertThat(i.type, equalTo(req.type));
}

@Test
public void convertWorks() {
WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setType("dummy").setBusinessKey("businessKey")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.nflow.tests.demo.workflow;

import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE;
import static java.util.stream.StreamSupport.stream;
import static org.joda.time.DateTime.now;

import org.joda.time.DateTime;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;

import io.nflow.engine.workflow.definition.BulkWorkflow;
import io.nflow.engine.workflow.definition.StateExecution;
import io.nflow.engine.workflow.instance.WorkflowInstance;

/**
* Bulk child workflow executor that does not overflow the system.
*/
@Component
public class DemoBulkWorkflow extends BulkWorkflow {

public static final String DEMO_BULK_WORKFLOW_TYPE = "demoBulk";

public DemoBulkWorkflow() {
super(DEMO_BULK_WORKFLOW_TYPE);
}

@Override
protected boolean splitWorkImpl(StateExecution execution, JsonNode data) {
if (data.size() == 0) {
return false;
}
execution.addChildWorkflows(stream(data.spliterator(), false).map(this::createInstance).toArray(WorkflowInstance[]::new));
return true;
}

private WorkflowInstance createInstance(JsonNode childData) {
return new WorkflowInstance.Builder() //
.setType(DEMO_WORKFLOW_TYPE) //
.setNextActivation(null) //
.putStateVariable("requestData", childData.asText()) //
.build();
}

@Override
protected DateTime waitForChildrenUntil() {
return now().plusSeconds(10);
}

@Override
protected DateTime waitForChildrenToCompleteUntil() {
return now().plusSeconds(10);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public NextAction begin(@SuppressWarnings("unused") StateExecution execution) {
public NextAction process(@SuppressWarnings("unused") StateExecution execution) {
return stopInState(State.done, "Go to done state");
}

}
Loading