diff --git a/CHANGELOG.md b/CHANGELOG.md
index b59c30895..2e6f551a0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,8 +1,13 @@
-## 5.3.4-SNAPSHOT (future release)
+## 5.4.0-SNAPSHOT (future release)
**Highlights**
+- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system.
+- Introduce new workflow instance state type `wait`. Child workflow instances automatically wake up the parent when the parent is in a `wait` state and the child enters an `end` state.
+- Allow creating workflows via REST API with null activation time (by setting `activate = false`).
+- Allow creating child workflows via REST API (by setting `parentWorkflowId`).
**Details**
+- See `BulkWorkflowTest` and `DemoBulkWorkflow` for examples on how to use bulk workflows
- Support boxed primitives (Integer, Float etc) with @StateVar
- nFlow Explorer: Library updates to `lodash` 4.17.11, `moment` 2.24.0 and `extend` 3.0.2
Earlier lodash versions had this security vulnerability: https://nvd.nist.gov/vuln/detail/CVE-2018-16487
diff --git a/nflow-engine/pom.xml b/nflow-engine/pom.xml
index cbb3a0306..f97226e82 100644
--- a/nflow-engine/pom.xml
+++ b/nflow-engine/pom.xml
@@ -13,7 +13,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java
index f4396caeb..a66b38a80 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java
@@ -47,6 +47,7 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
+import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
@@ -204,9 +205,18 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
}
+ WorkflowState nextState = definition.getState(execution.getNextState());
+ if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
+ String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
+ AbstractWorkflowDefinition> parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
+ String parentCurrentState = workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId);
+ if (parentDefinition.getState(parentCurrentState).getType() == WorkflowStateType.wait) {
+ execution.wakeUpParentWorkflow();
+ }
+ }
WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) //
.setNextActivation(execution.getNextActivation()) //
- .setStatus(getStatus(execution, definition.getState(execution.getNextState()))) //
+ .setStatus(getStatus(execution, nextState)) //
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java
new file mode 100644
index 000000000..1cb1a25bb
--- /dev/null
+++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java
@@ -0,0 +1,144 @@
+package io.nflow.engine.workflow.definition;
+
+import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done;
+import static io.nflow.engine.workflow.definition.BulkWorkflow.State.error;
+import static io.nflow.engine.workflow.definition.BulkWorkflow.State.splitWork;
+import static io.nflow.engine.workflow.definition.BulkWorkflow.State.waitForChildrenToFinish;
+import static io.nflow.engine.workflow.definition.NextAction.moveToState;
+import static io.nflow.engine.workflow.definition.NextAction.retryAfter;
+import static io.nflow.engine.workflow.definition.WorkflowStateType.end;
+import static io.nflow.engine.workflow.definition.WorkflowStateType.manual;
+import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
+import static io.nflow.engine.workflow.definition.WorkflowStateType.wait;
+import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
+import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.finished;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.Collections.emptyList;
+import static java.util.EnumSet.complementOf;
+import static org.joda.time.DateTime.now;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.EnumSet;
+import java.util.List;
+
+import javax.inject.Inject;
+
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import io.nflow.engine.service.WorkflowInstanceService;
+import io.nflow.engine.workflow.instance.WorkflowInstance;
+import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
+
+/**
+ * Bulk child workflow executor that does not overflow the system.
+ */
+@Component
+public class BulkWorkflow extends WorkflowDefinition {
+
+ public static final String BULK_WORKFLOW_TYPE = "bulk";
+ public static final String VAR_CHILD_DATA = "childData";
+ public static final String VAR_CONCURRENCY = "concurrency";
+
+ private static final EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created));
+ private static final Logger logger = getLogger(BulkWorkflow.class);
+
+ @Inject
+ WorkflowInstanceService instanceService;
+
+ public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
+ splitWork(start), waitForChildrenToFinish(wait), done(end), error(manual);
+
+ private WorkflowStateType type;
+
+ State(WorkflowStateType type) {
+ this.type = type;
+ }
+
+ @Override
+ public WorkflowStateType getType() {
+ return type;
+ }
+
+ @Override
+ public String getDescription() {
+ return name();
+ }
+ }
+
+ protected BulkWorkflow(String type) {
+ super(type, splitWork, error, new WorkflowSettings.Builder().setMaxRetries(Integer.MAX_VALUE).build());
+ setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks.");
+ permit(splitWork, waitForChildrenToFinish);
+ permit(waitForChildrenToFinish, done);
+ }
+
+ public BulkWorkflow() {
+ this(BULK_WORKFLOW_TYPE);
+ }
+
+ public NextAction splitWork(StateExecution execution, @StateVar(value = VAR_CHILD_DATA, readOnly = true) JsonNode data) {
+ boolean childrenFound = splitWorkImpl(execution, data);
+ if (childrenFound) {
+ return moveToState(waitForChildrenToFinish, "Running");
+ }
+ return retryAfter(waitForChildrenUntil(), "Waiting for child workflows");
+ }
+
+ protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) {
+ if (execution.getAllChildWorkflows().isEmpty()) {
+ throw new RuntimeException(
+ "No child workflows found - either add them before starting the parent or implement splitWorkflowImpl");
+ }
+ return true;
+ }
+
+ protected DateTime waitForChildrenUntil() {
+ return now().plusHours(1);
+ }
+
+ public NextAction waitForChildrenToFinish(StateExecution execution,
+ @StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) {
+ List childWorkflows = execution.getAllChildWorkflows();
+ long completed = 0;
+ long running = 0;
+ for (WorkflowInstance child : childWorkflows) {
+ if (child.status == finished) {
+ completed++;
+ } else if (isRunning(child)) {
+ running++;
+ }
+ }
+ if (completed == childWorkflows.size()) {
+ return moveToState(done, "All children completed");
+ }
+ long toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed);
+ if (toStart > 0) {
+ childWorkflows.stream().filter(this::isInInitialState).limit(toStart).forEach(this::wakeup);
+ logger.info("Started " + toStart + " child workflows");
+ }
+ long progress = completed * 100 / childWorkflows.size();
+ return retryAfter(waitForChildrenToCompleteUntil(), "Waiting for child workflows to complete - " + progress + "% done");
+ }
+
+ private void wakeup(WorkflowInstance instance) {
+ instanceService.wakeupWorkflowInstance(instance.id, emptyList());
+ }
+
+ protected boolean isRunning(WorkflowInstance instance) {
+ return RUNNING_STATES.contains(instance.status);
+ }
+
+ private boolean isInInitialState(WorkflowInstance instance) {
+ return instance.status == created;
+ }
+
+ protected DateTime waitForChildrenToCompleteUntil() {
+ return now().plusMinutes(15);
+ }
+
+}
diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java
index 709573a4e..d4b3f5de0 100644
--- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java
+++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java
@@ -26,6 +26,12 @@ public enum WorkflowStateType {
*/
normal(false, WorkflowInstanceStatus.inProgress),
+ /**
+ * State for waiting something outside this instance to happen, including (but not limited to) child workflow instances to go to
+ * end state. When a child workflow finishes and parent is in wait state, parent is automatically woken up.
+ */
+ wait(false, WorkflowInstanceStatus.inProgress),
+
/**
* Final state of the workflow. Workflow execution is stopped.
*/
diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java
index e7c3830f4..b197cf62b 100644
--- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java
+++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java
@@ -78,6 +78,7 @@
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.service.WorkflowInstanceService;
+import io.nflow.engine.workflow.definition.BulkWorkflow;
import io.nflow.engine.workflow.definition.Mutable;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
@@ -433,7 +434,9 @@ public void goToErrorStateWhenNextStateIsNull() {
public void doNothingWhenNotifyingParentWithoutParentWorkflowId() {
WorkflowInstance instance = executingInstanceBuilder().setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
+
executor.run();
+
verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(List.class));
}
@@ -441,8 +444,15 @@ public void doNothingWhenNotifyingParentWithoutParentWorkflowId() {
public void whenWakingUpParentWorkflowSucceeds() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
+ when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
+ when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
+ TestDefinition parentDefinition = mock(TestDefinition.class);
+ doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
+ when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(true);
+
executor.run();
+
verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList());
}
@@ -450,9 +460,34 @@ public void whenWakingUpParentWorkflowSucceeds() {
public void whenWakingUpParentWorkflowFails() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
+ when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
+ when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
+ TestDefinition parentDefinition = mock(TestDefinition.class);
+ doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
+ when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(false);
+
+ executor.run();
+
+ verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList());
+ }
+
+ @Test
+ public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() {
+ WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("simple-test").setState("processing")
+ .build();
+ when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
+ when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn(BulkWorkflow.BULK_WORKFLOW_TYPE);
+ when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId))
+ .thenReturn(BulkWorkflow.State.waitForChildrenToFinish.name());
+ BulkWorkflow parentDefinition = new BulkWorkflow();
+ doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition(BulkWorkflow.BULK_WORKFLOW_TYPE);
+ when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(true);
+
executor.run();
+
verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList());
+
}
@Test
diff --git a/nflow-explorer/pom.xml b/nflow-explorer/pom.xml
index 4afa82c56..4b7efa46f 100644
--- a/nflow-explorer/pom.xml
+++ b/nflow-explorer/pom.xml
@@ -13,7 +13,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
..
diff --git a/nflow-explorer/src/app/components/graph.js b/nflow-explorer/src/app/components/graph.js
index 22f5e186c..c007d8534 100644
--- a/nflow-explorer/src/app/components/graph.js
+++ b/nflow-explorer/src/app/components/graph.js
@@ -109,7 +109,7 @@
};
function resolveStyleClass() {
- var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error'], state.type) ? state.type : 'normal');
+ var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error', 'wait'], state.type) ? state.type : 'normal');
if (workflow && isPassiveNode()) {
cssClass += ' node-passive';
}
diff --git a/nflow-explorer/src/styles/data/graph.css b/nflow-explorer/src/styles/data/graph.css
index 32db9809e..3194ed6e5 100644
--- a/nflow-explorer/src/styles/data/graph.css
+++ b/nflow-explorer/src/styles/data/graph.css
@@ -28,18 +28,19 @@ Generated svg looks like this
*/
/* changing font or font size may require code changes to have right size boxes */
-.node-normal, .node-manual, .node-start, .node-end, .node-error {
+.node-normal, .node-manual, .node-start, .node-end, .node-error, .node-wait {
font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
font-size: 14px;
fill: black;
opacity: 1; cursor: pointer;
}
-.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect {
+.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect, .node-wait > rect {
stroke-width: 1.5px;
stroke: black;
fill: white;
}
+
.node-start > rect {
fill: LightBlue;
}
@@ -56,7 +57,11 @@ Generated svg looks like this
fill: LightGreen;
}
-.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect {
+.node-wait > rect {
+ fill: LightSkyBlue;
+}
+
+.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect, .node-wait.selected > rect {
stroke-width: 3px;
}
diff --git a/nflow-jetty/pom.xml b/nflow-jetty/pom.xml
index 79b00baf9..35aefc347 100644
--- a/nflow-jetty/pom.xml
+++ b/nflow-jetty/pom.xml
@@ -13,7 +13,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
UTF-8
diff --git a/nflow-metrics/pom.xml b/nflow-metrics/pom.xml
index 219089c98..cf96749e0 100644
--- a/nflow-metrics/pom.xml
+++ b/nflow-metrics/pom.xml
@@ -12,7 +12,7 @@
io.nflow
nflow-root
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-netty/pom.xml b/nflow-netty/pom.xml
index df14adabc..f40ec4ca5 100644
--- a/nflow-netty/pom.xml
+++ b/nflow-netty/pom.xml
@@ -13,7 +13,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
UTF-8
diff --git a/nflow-perf-test/pom.xml b/nflow-perf-test/pom.xml
index ca5099d6f..3600145ce 100644
--- a/nflow-perf-test/pom.xml
+++ b/nflow-perf-test/pom.xml
@@ -13,7 +13,7 @@
io.nflow
nflow-root
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-rest-api-common/pom.xml b/nflow-rest-api-common/pom.xml
index 01f8cf011..c2e96391b 100644
--- a/nflow-rest-api-common/pom.xml
+++ b/nflow-rest-api-common/pom.xml
@@ -12,7 +12,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java
index b45fa7aac..2fd3eb229 100644
--- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java
+++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java
@@ -1,5 +1,6 @@
package io.nflow.rest.v1.converter;
+import static java.lang.Boolean.FALSE;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import java.util.Map.Entry;
@@ -25,9 +26,14 @@ public CreateWorkflowConverter(WorkflowInstanceFactory factory) {
public WorkflowInstance convert(CreateWorkflowInstanceRequest req) {
WorkflowInstance.Builder builder = factory.newWorkflowInstanceBuilder().setType(req.type).setBusinessKey(req.businessKey)
.setExternalId(req.externalId);
- if (req.activationTime != null) {
- builder.setNextActivation(req.activationTime);
+ if (!FALSE.equals(req.activate)) {
+ if (req.activationTime != null) {
+ builder.setNextActivation(req.activationTime);
+ }
+ } else {
+ builder.setNextActivation(null);
}
+ builder.setParentWorkflowId(req.parentWorkflowId);
if (isNotEmpty(req.startState)) {
builder.setState(req.startState);
}
diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java
index a5c80177c..94ec341ca 100644
--- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java
+++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java
@@ -36,9 +36,15 @@ public class CreateWorkflowInstanceRequest extends ModelObject {
@ApiModelProperty("Unique external identifier within the workflow type. Generated by nflow if not given.")
public String externalId;
- @ApiModelProperty("Start time for workflow execution. If null, defaults to now.")
+ @ApiModelProperty("Start time for workflow execution. If null, defaults to now, unless activate is set to false, in which case activationTime is ignored.")
public DateTime activationTime;
+ @ApiModelProperty("Set to false to force activationTime to null. Default is true.")
+ public Boolean activate;
+
+ @ApiModelProperty("Create the workflow as a child of the given parent workflow.")
+ public Integer parentWorkflowId;
+
@ApiModelProperty("State variables to be set for the new workflow instance.")
public Map stateVariables = new HashMap<>();
diff --git a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java
index 7fb3f28f4..5f48f5bd9 100644
--- a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java
+++ b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java
@@ -4,6 +4,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.joda.time.DateTime.now;
import static org.junit.Assert.assertThat;
import org.joda.time.DateTime;
@@ -66,6 +67,19 @@ public void convertAndValidateWorksWithMinimalData() {
assertThat(i.type, equalTo(req.type));
}
+ @Test
+ public void nextActivationIsSetToNullWhenInstanceIsNotActivated() {
+ CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest();
+ req.type = "wfType";
+ req.activationTime = now();
+ req.activate = false;
+ WorkflowInstance i = converter.convert(req);
+ assertThat(i.nextActivation, nullValue(DateTime.class));
+ assertThat(i.businessKey, nullValue(String.class));
+ assertThat(i.externalId, nullValue(String.class));
+ assertThat(i.type, equalTo(req.type));
+ }
+
@Test
public void convertWorks() {
WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setType("dummy").setBusinessKey("businessKey")
diff --git a/nflow-rest-api-jax-rs/pom.xml b/nflow-rest-api-jax-rs/pom.xml
index 2cf83e7dd..91dc6bab0 100644
--- a/nflow-rest-api-jax-rs/pom.xml
+++ b/nflow-rest-api-jax-rs/pom.xml
@@ -12,7 +12,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-rest-api-spring-web/pom.xml b/nflow-rest-api-spring-web/pom.xml
index 10943874a..9f930d403 100644
--- a/nflow-rest-api-spring-web/pom.xml
+++ b/nflow-rest-api-spring-web/pom.xml
@@ -12,7 +12,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-server-common/pom.xml b/nflow-server-common/pom.xml
index 4bc7146cd..cd4710d6a 100644
--- a/nflow-server-common/pom.xml
+++ b/nflow-server-common/pom.xml
@@ -13,7 +13,7 @@
nflow-root
io.nflow
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
UTF-8
diff --git a/nflow-tests/pom.xml b/nflow-tests/pom.xml
index 1c8cd9a53..c08a4d5d7 100644
--- a/nflow-tests/pom.xml
+++ b/nflow-tests/pom.xml
@@ -12,7 +12,7 @@
io.nflow
nflow-root
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java
index 180b2bf05..a6acdfbe1 100644
--- a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java
+++ b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java
@@ -5,6 +5,7 @@
import static io.nflow.tests.demo.SpringApplicationContext.applicationContext;
import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE;
import static io.nflow.tests.demo.workflow.SlowWorkflow.SLOW_WORKFLOW_TYPE;
+import static java.util.Arrays.asList;
import static java.util.Collections.emptySet;
import static org.joda.time.DateTime.now;
@@ -16,10 +17,13 @@
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.service.WorkflowInstanceService;
+import io.nflow.engine.workflow.definition.BulkWorkflow;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
+import io.nflow.engine.workflow.instance.WorkflowInstanceFactory;
import io.nflow.jetty.StartNflow;
import io.nflow.metrics.NflowMetricsContext;
+import io.nflow.tests.demo.workflow.DemoBulkWorkflow;
import io.nflow.tests.demo.workflow.DemoWorkflow;
public class DemoServer {
@@ -38,6 +42,7 @@ static class DemoServerWorkflowsConfiguration {
private static void insertDemoWorkflows() {
WorkflowInstanceService workflowInstanceService = applicationContext.getBean(WorkflowInstanceService.class);
+ WorkflowInstanceFactory workflowInstanceFactory = applicationContext.getBean(WorkflowInstanceFactory.class);
WorkflowInstance instance = new WorkflowInstance.Builder().setType(DEMO_WORKFLOW_TYPE)
.setState(DemoWorkflow.State.begin.name()).build();
int id = workflowInstanceService.insertWorkflowInstance(instance);
@@ -53,5 +58,13 @@ private static void insertDemoWorkflows() {
instance = new WorkflowInstance.Builder().setType(SLOW_WORKFLOW_TYPE).setSignal(Optional.of(1)).setNextActivation(null)
.build();
workflowInstanceService.insertWorkflowInstance(instance);
+ // insert demo bulk workflow with couple of children
+ instance = workflowInstanceFactory.newWorkflowInstanceBuilder() //
+ .setType(DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE) //
+ .setState(BulkWorkflow.State.splitWork.name()) //
+ .putStateVariable(BulkWorkflow.VAR_CONCURRENCY, 2) //
+ .putStateVariable(BulkWorkflow.VAR_CHILD_DATA, asList(1, 2, 3, 4, 5)) //
+ .build();
+ workflowInstanceService.insertWorkflowInstance(instance);
}
}
diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java
new file mode 100644
index 000000000..7cf7b6460
--- /dev/null
+++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java
@@ -0,0 +1,55 @@
+package io.nflow.tests.demo.workflow;
+
+import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE;
+import static java.util.stream.StreamSupport.stream;
+import static org.joda.time.DateTime.now;
+
+import org.joda.time.DateTime;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import io.nflow.engine.workflow.definition.BulkWorkflow;
+import io.nflow.engine.workflow.definition.StateExecution;
+import io.nflow.engine.workflow.instance.WorkflowInstance;
+
+/**
+ * Bulk child workflow executor that does not overflow the system.
+ */
+@Component
+public class DemoBulkWorkflow extends BulkWorkflow {
+
+ public static final String DEMO_BULK_WORKFLOW_TYPE = "demoBulk";
+
+ public DemoBulkWorkflow() {
+ super(DEMO_BULK_WORKFLOW_TYPE);
+ }
+
+ @Override
+ protected boolean splitWorkImpl(StateExecution execution, JsonNode data) {
+ if (data.size() == 0) {
+ return false;
+ }
+ execution.addChildWorkflows(stream(data.spliterator(), false).map(this::createInstance).toArray(WorkflowInstance[]::new));
+ return true;
+ }
+
+ private WorkflowInstance createInstance(JsonNode childData) {
+ return new WorkflowInstance.Builder() //
+ .setType(DEMO_WORKFLOW_TYPE) //
+ .setNextActivation(null) //
+ .putStateVariable("requestData", childData.asText()) //
+ .build();
+ }
+
+ @Override
+ protected DateTime waitForChildrenUntil() {
+ return now().plusSeconds(10);
+ }
+
+ @Override
+ protected DateTime waitForChildrenToCompleteUntil() {
+ return now().plusSeconds(10);
+ }
+
+}
diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java
index 9066ba9b0..141feed83 100644
--- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java
+++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java
@@ -54,4 +54,5 @@ public NextAction begin(@SuppressWarnings("unused") StateExecution execution) {
public NextAction process(@SuppressWarnings("unused") StateExecution execution) {
return stopInState(State.done, "Go to done state");
}
+
}
diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java
new file mode 100644
index 000000000..b4b6da2dd
--- /dev/null
+++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java
@@ -0,0 +1,119 @@
+package io.nflow.tests;
+
+import static io.nflow.engine.workflow.definition.BulkWorkflow.BULK_WORKFLOW_TYPE;
+import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done;
+import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE;
+import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE;
+import static java.util.Comparator.naturalOrder;
+import static java.util.stream.Collectors.toList;
+import static org.apache.cxf.jaxrs.client.WebClient.fromClient;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.joda.time.DateTime.now;
+import static org.junit.Assert.assertThat;
+import static org.junit.runners.MethodSorters.NAME_ASCENDING;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import javax.ws.rs.core.Response;
+
+import org.joda.time.DateTime;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.springframework.context.annotation.ComponentScan;
+
+import io.nflow.engine.workflow.definition.BulkWorkflow;
+import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest;
+import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse;
+import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse;
+import io.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest;
+import io.nflow.tests.demo.workflow.DemoBulkWorkflow;
+import io.nflow.tests.runner.NflowServerRule;
+
+@FixMethodOrder(NAME_ASCENDING)
+public class BulkWorkflowTest extends AbstractNflowTest {
+
+ @ClassRule
+ public static NflowServerRule server = new NflowServerRule.Builder().springContextClass(Configuration.class).build();
+
+ private static int workflowId;
+
+ private static final int CHILDREN_COUNT = 10;
+
+ public BulkWorkflowTest() {
+ super(server);
+ }
+
+ @ComponentScan(basePackageClasses = DemoBulkWorkflow.class)
+ static class Configuration {
+ // for component scanning only
+ }
+
+ @Test
+ public void t01_startDemoBulkWorkflow() {
+ CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest();
+ req.type = DEMO_BULK_WORKFLOW_TYPE;
+ req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3);
+ List childData = IntStream.rangeClosed(1, CHILDREN_COUNT).boxed().collect(toList());
+ req.stateVariables.put(BulkWorkflow.VAR_CHILD_DATA, nflowObjectMapper().valueToTree(childData));
+ CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req,
+ CreateWorkflowInstanceResponse.class);
+ assertThat(resp.id, notNullValue());
+ workflowId = resp.id;
+ }
+
+ @Test(timeout = 30000)
+ public void t02_waitForBulkToFinish() throws InterruptedException {
+ waitForBulkToFinish();
+ }
+
+ @Test
+ public void t11_createBulkWorkflow() {
+ CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest();
+ req.type = BULK_WORKFLOW_TYPE;
+ req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3);
+ req.activate = false;
+ CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req,
+ CreateWorkflowInstanceResponse.class);
+ assertThat(resp.id, notNullValue());
+ workflowId = resp.id;
+
+ for (int i = 0; i < CHILDREN_COUNT; ++i) {
+ CreateWorkflowInstanceRequest child = new CreateWorkflowInstanceRequest();
+ child.type = DEMO_WORKFLOW_TYPE;
+ child.activate = false;
+ child.parentWorkflowId = workflowId;
+ resp = fromClient(workflowInstanceResource, true).put(child, CreateWorkflowInstanceResponse.class);
+ assertThat(resp.id, notNullValue());
+ }
+ }
+
+ @Test
+ public void t12_startBulkWorkflow() {
+ UpdateWorkflowInstanceRequest req = new UpdateWorkflowInstanceRequest();
+ req.nextActivationTime = now();
+ try (Response resp = fromClient(workflowInstanceResource, true).path("id").path(workflowId).put(req, Response.class)) {
+ assertThat(resp.getStatus(), equalTo(204));
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void t13_waitForBulkToFinish() throws InterruptedException {
+ waitForBulkToFinish();
+ }
+
+ private void waitForBulkToFinish() throws InterruptedException {
+ ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name());
+ assertThat(instance.childWorkflows.size(), equalTo(1));
+ List childWorkflowIds = instance.childWorkflows.values().iterator().next();
+ assertThat(childWorkflowIds.size(), equalTo(CHILDREN_COUNT));
+ List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList());
+ DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get();
+ DateTime maxStarted = children.stream().map(child -> child.started).max(naturalOrder()).get();
+ assertThat(minFinished, lessThan(maxStarted));
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index b2b6e7c64..d3b571fcf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
nflow-root
pom
nFlow Root
- 5.3.4-SNAPSHOT
+ 5.4.0-SNAPSHOT
http://nflow.io