From 6f6eb6983b0b0946fad30f9673d4ab0799aafd62 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Tue, 12 Feb 2019 00:01:24 +0200 Subject: [PATCH 01/22] Bulk worklfow demo + test --- .../tests/demo/workflow/BulkWorkflow.java | 128 ++++++++++++++++++ .../tests/demo/workflow/DemoBulkWorkflow.java | 33 +++++ .../tests/demo/workflow/DemoWorkflow.java | 1 + .../java/io/nflow/tests/BulkWorkflowTest.java | 51 +++++++ 4 files changed, 213 insertions(+) create mode 100644 nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java create mode 100644 nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java create mode 100644 nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java new file mode 100644 index 000000000..8d211bbae --- /dev/null +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -0,0 +1,128 @@ +package io.nflow.tests.demo.workflow; + +import com.fasterxml.jackson.databind.JsonNode; +import io.nflow.engine.service.WorkflowInstanceService; +import io.nflow.engine.workflow.definition.NextAction; +import io.nflow.engine.workflow.definition.StateExecution; +import io.nflow.engine.workflow.definition.StateVar; +import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowState; +import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.instance.WorkflowInstance; +import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; +import org.slf4j.Logger; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.util.EnumSet; +import java.util.List; + +import static io.nflow.engine.workflow.definition.NextAction.moveToState; +import static io.nflow.engine.workflow.definition.NextAction.retryAfter; +import static io.nflow.engine.workflow.definition.WorkflowStateType.end; +import static io.nflow.engine.workflow.definition.WorkflowStateType.manual; +import static io.nflow.engine.workflow.definition.WorkflowStateType.normal; +import static io.nflow.engine.workflow.definition.WorkflowStateType.start; +import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; +import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.finished; +import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; +import static io.nflow.tests.demo.workflow.BulkWorkflow.State.error; +import static io.nflow.tests.demo.workflow.BulkWorkflow.State.splitWork; +import static io.nflow.tests.demo.workflow.BulkWorkflow.State.waitForChildrenToFinish; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.util.Collections.emptyList; +import static java.util.EnumSet.complementOf; +import static org.joda.time.DateTime.now; +import static org.slf4j.LoggerFactory.getLogger; + +/** + * Bulk child workflow executor that does not overflow the system. + */ +@Component +public class BulkWorkflow extends WorkflowDefinition { + public static final String BULK_WORKFLOW_TYPE = "bulk"; + private static final Logger logger = getLogger(BulkWorkflow.class); + + @Inject + WorkflowInstanceService instanceService; + + public enum State implements WorkflowState { + splitWork(start), waitForChildrenToFinish(normal), done(end), error(manual); + + private WorkflowStateType type; + + State(WorkflowStateType type) { + this.type = type; + } + + @Override + public WorkflowStateType getType() { + return type; + } + + @Override + public String getDescription() { + return name(); + } + } + + protected BulkWorkflow(String type) { + super(type, splitWork, error); + setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks."); + permit(splitWork, waitForChildrenToFinish); + permit(waitForChildrenToFinish, done); + } + + public BulkWorkflow() { + this(BULK_WORKFLOW_TYPE); + } + + public NextAction splitWork(StateExecution execution, @StateVar(value = "requestData", readOnly = true) JsonNode data) { + boolean childrenFound = splitWorkImpl(execution, data); + if (childrenFound) { + return moveToState(waitForChildrenToFinish, "Running"); + } + return retryAfter(now().plusMinutes(1), "Waiting for child workflows"); + } + + protected boolean splitWorkImpl(StateExecution execution, JsonNode data) { + if (execution.getAllChildWorkflows().isEmpty()) { + // if (DateTime.now() - execution.getStartTime() < MINUTES.toMillis(5)) { return false; } + throw new RuntimeException("No child workflows found - either add them before starting the parent or implement splitWorkflowImpl"); + } + return true; + } + + private final static EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created)); + + protected boolean isRunning(WorkflowInstance child) { + return RUNNING_STATES.contains(child.status); + } + + public NextAction waitForChildrenToFinish(StateExecution execution, @StateVar(value = "concurrency", readOnly = true) int concurrency) { + List childWorkflows = execution.getAllChildWorkflows(); + int running = 0, completed = 0; + for (WorkflowInstance child : childWorkflows) { + if (child.status == finished) { + completed++; + } else if (isRunning(child)) { + running++; + } + } + if (completed == childWorkflows.size()) { + return moveToState(done, "All children completed"); + } + int toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed); + if (toStart > 0) { + childWorkflows.stream() + .filter(child -> child.status == created) + .limit(toStart) + .map(child -> child.id) + .forEach(id -> instanceService.wakeupWorkflowInstance(id, emptyList())); + logger.info("Started " + toStart + " child workflows"); + } + int progress = completed * 100 / childWorkflows.size(); + return retryAfter(now().plusMinutes(15), "Waiting for child workflows to complete - " + progress + "% done"); + } +} diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java new file mode 100644 index 000000000..6c24316db --- /dev/null +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java @@ -0,0 +1,33 @@ +package io.nflow.tests.demo.workflow; + +import com.fasterxml.jackson.databind.JsonNode; +import io.nflow.engine.workflow.definition.StateExecution; +import io.nflow.engine.workflow.instance.WorkflowInstance; +import org.springframework.stereotype.Component; + +import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; +import static java.util.Collections.singletonMap; + +/** + * Bulk child workflow executor that does not overflow the system. + */ +@Component +public class DemoBulkWorkflow extends BulkWorkflow { + public static final String DEMO_BULK_WORKFLOW_TYPE = "demoBulk"; + + public DemoBulkWorkflow() { + super(DEMO_BULK_WORKFLOW_TYPE); + } + + protected boolean splitWorkImpl(StateExecution execution, JsonNode data) { + data.forEach(childData -> { + WorkflowInstance child = new WorkflowInstance.Builder() + .setType(DEMO_WORKFLOW_TYPE) + .setNextActivation(null) + .setStateVariables(singletonMap("requestData", childData.asText())) + .build(); + execution.addChildWorkflows(child); + }); + return true; + } +} diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java index 9066ba9b0..04d9355ef 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java @@ -52,6 +52,7 @@ public NextAction begin(@SuppressWarnings("unused") StateExecution execution) { } public NextAction process(@SuppressWarnings("unused") StateExecution execution) { + execution.wakeUpParentWorkflow(); return stopInState(State.done, "Go to done state"); } } diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java new file mode 100644 index 000000000..d6e7ca1a6 --- /dev/null +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -0,0 +1,51 @@ +package io.nflow.tests; + +import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; +import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; +import io.nflow.tests.demo.workflow.BulkWorkflow; +import io.nflow.tests.runner.NflowServerRule; +import org.junit.ClassRule; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.springframework.context.annotation.ComponentScan; + +import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; +import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; +import static java.util.Arrays.asList; +import static org.apache.cxf.jaxrs.client.WebClient.fromClient; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.runners.MethodSorters.NAME_ASCENDING; + +@FixMethodOrder(NAME_ASCENDING) +public class BulkWorkflowTest extends AbstractNflowTest { + @ClassRule + public static NflowServerRule server = new NflowServerRule.Builder().springContextClass(Configuration.class).build(); + + private static int workflowId; + + public BulkWorkflowTest() { + super(server); + } + + @ComponentScan(basePackageClasses = BulkWorkflow.class) + static class Configuration { + // for component scanning only + } + + @Test + public void t01_startBulkWorkflow() { + CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); + req.type = DEMO_BULK_WORKFLOW_TYPE; + req.stateVariables.put("concurrency", 2); + req.stateVariables.put("requestData", nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10))); + CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); + assertThat(resp.id, notNullValue()); + workflowId = resp.id; + } + + @Test(timeout = 30000) + public void t02_waitForBulkToFinish() throws InterruptedException { + getWorkflowInstance(workflowId, done.name()); + } +} From a8aaeeb0d7c12830bf3d56f60f7c6537cf9d63c4 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 11:14:08 +0200 Subject: [PATCH 02/22] refactor and format --- .../tests/demo/workflow/BulkWorkflow.java | 93 ++++++++++--------- .../tests/demo/workflow/DemoBulkWorkflow.java | 35 ++++--- .../java/io/nflow/tests/BulkWorkflowTest.java | 80 ++++++++-------- 3 files changed, 116 insertions(+), 92 deletions(-) diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index 8d211bbae..a0506118a 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -1,22 +1,5 @@ package io.nflow.tests.demo.workflow; -import com.fasterxml.jackson.databind.JsonNode; -import io.nflow.engine.service.WorkflowInstanceService; -import io.nflow.engine.workflow.definition.NextAction; -import io.nflow.engine.workflow.definition.StateExecution; -import io.nflow.engine.workflow.definition.StateVar; -import io.nflow.engine.workflow.definition.WorkflowDefinition; -import io.nflow.engine.workflow.definition.WorkflowState; -import io.nflow.engine.workflow.definition.WorkflowStateType; -import io.nflow.engine.workflow.instance.WorkflowInstance; -import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; -import org.slf4j.Logger; -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.util.EnumSet; -import java.util.List; - import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.retryAfter; import static io.nflow.engine.workflow.definition.WorkflowStateType.end; @@ -36,12 +19,36 @@ import static org.joda.time.DateTime.now; import static org.slf4j.LoggerFactory.getLogger; +import java.util.EnumSet; +import java.util.List; + +import javax.inject.Inject; + +import org.slf4j.Logger; +import org.springframework.stereotype.Component; + +import com.fasterxml.jackson.databind.JsonNode; + +import io.nflow.engine.service.WorkflowInstanceService; +import io.nflow.engine.workflow.definition.NextAction; +import io.nflow.engine.workflow.definition.StateExecution; +import io.nflow.engine.workflow.definition.StateVar; +import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowState; +import io.nflow.engine.workflow.definition.WorkflowStateType; +import io.nflow.engine.workflow.instance.WorkflowInstance; +import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; + /** * Bulk child workflow executor that does not overflow the system. */ @Component public class BulkWorkflow extends WorkflowDefinition { + public static final String BULK_WORKFLOW_TYPE = "bulk"; + + private final static EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created)); + private static final Logger logger = getLogger(BulkWorkflow.class); @Inject @@ -86,43 +93,45 @@ public NextAction splitWork(StateExecution execution, @StateVar(value = "request return retryAfter(now().plusMinutes(1), "Waiting for child workflows"); } - protected boolean splitWorkImpl(StateExecution execution, JsonNode data) { + protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) { if (execution.getAllChildWorkflows().isEmpty()) { - // if (DateTime.now() - execution.getStartTime() < MINUTES.toMillis(5)) { return false; } - throw new RuntimeException("No child workflows found - either add them before starting the parent or implement splitWorkflowImpl"); + throw new RuntimeException( + "No child workflows found - either add them before starting the parent or implement splitWorkflowImpl"); } return true; } - private final static EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created)); - - protected boolean isRunning(WorkflowInstance child) { - return RUNNING_STATES.contains(child.status); - } - - public NextAction waitForChildrenToFinish(StateExecution execution, @StateVar(value = "concurrency", readOnly = true) int concurrency) { + public NextAction waitForChildrenToFinish(StateExecution execution, + @StateVar(value = "concurrency", readOnly = true) int concurrency) { List childWorkflows = execution.getAllChildWorkflows(); - int running = 0, completed = 0; - for (WorkflowInstance child : childWorkflows) { - if (child.status == finished) { - completed++; - } else if (isRunning(child)) { - running++; - } - } + long running = childWorkflows.stream().filter(this::isRunning).count(); + long completed = childWorkflows.stream().filter(this::isFinished).count(); if (completed == childWorkflows.size()) { return moveToState(done, "All children completed"); } - int toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed); + long toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed); if (toStart > 0) { - childWorkflows.stream() - .filter(child -> child.status == created) - .limit(toStart) - .map(child -> child.id) - .forEach(id -> instanceService.wakeupWorkflowInstance(id, emptyList())); + childWorkflows.stream().filter(this::isInInitialState).limit(toStart).forEach(this::wakeup); logger.info("Started " + toStart + " child workflows"); } - int progress = completed * 100 / childWorkflows.size(); + long progress = completed * 100 / childWorkflows.size(); return retryAfter(now().plusMinutes(15), "Waiting for child workflows to complete - " + progress + "% done"); } + + private void wakeup(WorkflowInstance instance) { + instanceService.wakeupWorkflowInstance(instance.id, emptyList()); + } + + private boolean isRunning(WorkflowInstance instance) { + return RUNNING_STATES.contains(instance.status); + } + + private boolean isFinished(WorkflowInstance instance) { + return instance.status == finished; + } + + private boolean isInInitialState(WorkflowInstance instance) { + return instance.status == created; + } + } diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java index 6c24316db..f092a7027 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java @@ -1,33 +1,44 @@ package io.nflow.tests.demo.workflow; +import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; +import static java.util.Collections.singletonMap; + +import java.util.stream.Stream; + +import org.springframework.stereotype.Component; + import com.fasterxml.jackson.databind.JsonNode; + import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.instance.WorkflowInstance; -import org.springframework.stereotype.Component; - -import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; -import static java.util.Collections.singletonMap; /** * Bulk child workflow executor that does not overflow the system. */ @Component public class DemoBulkWorkflow extends BulkWorkflow { + public static final String DEMO_BULK_WORKFLOW_TYPE = "demoBulk"; public DemoBulkWorkflow() { super(DEMO_BULK_WORKFLOW_TYPE); } + @Override protected boolean splitWorkImpl(StateExecution execution, JsonNode data) { - data.forEach(childData -> { - WorkflowInstance child = new WorkflowInstance.Builder() - .setType(DEMO_WORKFLOW_TYPE) - .setNextActivation(null) - .setStateVariables(singletonMap("requestData", childData.asText())) - .build(); - execution.addChildWorkflows(child); - }); + if (data.size() == 0) { + return false; + } + execution.addChildWorkflows(Stream.of(data).map(this::createInstance).toArray(WorkflowInstance[]::new)); return true; } + + private WorkflowInstance createInstance(JsonNode childData) { + return new WorkflowInstance.Builder() // + .setType(DEMO_WORKFLOW_TYPE) // + .setNextActivation(null) // + .setStateVariables(singletonMap("requestData", childData.asText())) // + .build(); + } + } diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index d6e7ca1a6..ccd4ccc30 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -1,14 +1,5 @@ package io.nflow.tests; -import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; -import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; -import io.nflow.tests.demo.workflow.BulkWorkflow; -import io.nflow.tests.runner.NflowServerRule; -import org.junit.ClassRule; -import org.junit.FixMethodOrder; -import org.junit.Test; -import org.springframework.context.annotation.ComponentScan; - import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; import static java.util.Arrays.asList; @@ -17,35 +8,48 @@ import static org.junit.Assert.assertThat; import static org.junit.runners.MethodSorters.NAME_ASCENDING; +import org.junit.ClassRule; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.springframework.context.annotation.ComponentScan; + +import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; +import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; +import io.nflow.tests.demo.workflow.BulkWorkflow; +import io.nflow.tests.runner.NflowServerRule; + @FixMethodOrder(NAME_ASCENDING) public class BulkWorkflowTest extends AbstractNflowTest { - @ClassRule - public static NflowServerRule server = new NflowServerRule.Builder().springContextClass(Configuration.class).build(); - - private static int workflowId; - - public BulkWorkflowTest() { - super(server); - } - - @ComponentScan(basePackageClasses = BulkWorkflow.class) - static class Configuration { - // for component scanning only - } - - @Test - public void t01_startBulkWorkflow() { - CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); - req.type = DEMO_BULK_WORKFLOW_TYPE; - req.stateVariables.put("concurrency", 2); - req.stateVariables.put("requestData", nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10))); - CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); - assertThat(resp.id, notNullValue()); - workflowId = resp.id; - } - - @Test(timeout = 30000) - public void t02_waitForBulkToFinish() throws InterruptedException { - getWorkflowInstance(workflowId, done.name()); - } + + @ClassRule + public static NflowServerRule server = new NflowServerRule.Builder().springContextClass(Configuration.class).build(); + + private static int workflowId; + + public BulkWorkflowTest() { + super(server); + } + + @ComponentScan(basePackageClasses = BulkWorkflow.class) + static class Configuration { + // for component scanning only + } + + @Test + public void t01_startBulkWorkflow() { + CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); + req.type = DEMO_BULK_WORKFLOW_TYPE; + req.stateVariables.put("concurrency", 2); + req.stateVariables.put("requestData", nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10))); + CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, + CreateWorkflowInstanceResponse.class); + assertThat(resp.id, notNullValue()); + workflowId = resp.id; + } + + @Test(timeout = 30000) + public void t02_waitForBulkToFinish() throws InterruptedException { + getWorkflowInstance(workflowId, done.name()); + } + } From 6557abc39ed5fa91abfca556b7f3252976a033a3 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 11:36:52 +0200 Subject: [PATCH 03/22] allow creating child instances via rest api with nextActivation=null --- .../rest/v1/converter/CreateWorkflowConverter.java | 8 ++++++-- .../rest/v1/msg/CreateWorkflowInstanceRequest.java | 5 ++++- .../v1/converter/CreateWorkflowConverterTest.java | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java index b45fa7aac..a7ffadf79 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java @@ -25,8 +25,12 @@ public CreateWorkflowConverter(WorkflowInstanceFactory factory) { public WorkflowInstance convert(CreateWorkflowInstanceRequest req) { WorkflowInstance.Builder builder = factory.newWorkflowInstanceBuilder().setType(req.type).setBusinessKey(req.businessKey) .setExternalId(req.externalId); - if (req.activationTime != null) { - builder.setNextActivation(req.activationTime); + if (req.activate) { + if (req.activationTime != null) { + builder.setNextActivation(req.activationTime); + } + } else { + builder.setNextActivation(null); } if (isNotEmpty(req.startState)) { builder.setState(req.startState); diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java index a5c80177c..5385240d2 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java @@ -36,9 +36,12 @@ public class CreateWorkflowInstanceRequest extends ModelObject { @ApiModelProperty("Unique external identifier within the workflow type. Generated by nflow if not given.") public String externalId; - @ApiModelProperty("Start time for workflow execution. If null, defaults to now.") + @ApiModelProperty("Start time for workflow execution. If null, defaults to now, unless activate is set to false, in which case activationTime is ignored.") public DateTime activationTime; + @ApiModelProperty("Set to false to force activationTime to null. Default is true.") + public boolean activate = true; + @ApiModelProperty("State variables to be set for the new workflow instance.") public Map stateVariables = new HashMap<>(); diff --git a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java index 7fb3f28f4..5f48f5bd9 100644 --- a/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java +++ b/nflow-rest-api-common/src/test/java/io/nflow/rest/v1/converter/CreateWorkflowConverterTest.java @@ -4,6 +4,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.joda.time.DateTime.now; import static org.junit.Assert.assertThat; import org.joda.time.DateTime; @@ -66,6 +67,19 @@ public void convertAndValidateWorksWithMinimalData() { assertThat(i.type, equalTo(req.type)); } + @Test + public void nextActivationIsSetToNullWhenInstanceIsNotActivated() { + CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); + req.type = "wfType"; + req.activationTime = now(); + req.activate = false; + WorkflowInstance i = converter.convert(req); + assertThat(i.nextActivation, nullValue(DateTime.class)); + assertThat(i.businessKey, nullValue(String.class)); + assertThat(i.externalId, nullValue(String.class)); + assertThat(i.type, equalTo(req.type)); + } + @Test public void convertWorks() { WorkflowInstance i = new WorkflowInstance.Builder().setId(1).setType("dummy").setBusinessKey("businessKey") From 51b524987f7bda9d636c87152902dd5492f9b46b Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 11:50:40 +0200 Subject: [PATCH 04/22] allow bulk workflow sub-classes to override next activation times of the bulk states --- .../io/nflow/tests/demo/workflow/BulkWorkflow.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index a0506118a..c47646f21 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -24,6 +24,7 @@ import javax.inject.Inject; +import org.joda.time.DateTime; import org.slf4j.Logger; import org.springframework.stereotype.Component; @@ -90,7 +91,7 @@ public NextAction splitWork(StateExecution execution, @StateVar(value = "request if (childrenFound) { return moveToState(waitForChildrenToFinish, "Running"); } - return retryAfter(now().plusMinutes(1), "Waiting for child workflows"); + return retryAfter(waitForChildrenUntil(), "Waiting for child workflows"); } protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) { @@ -101,6 +102,10 @@ protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unu return true; } + protected DateTime waitForChildrenUntil() { + return now().plusMinutes(1); + } + public NextAction waitForChildrenToFinish(StateExecution execution, @StateVar(value = "concurrency", readOnly = true) int concurrency) { List childWorkflows = execution.getAllChildWorkflows(); @@ -115,7 +120,7 @@ public NextAction waitForChildrenToFinish(StateExecution execution, logger.info("Started " + toStart + " child workflows"); } long progress = completed * 100 / childWorkflows.size(); - return retryAfter(now().plusMinutes(15), "Waiting for child workflows to complete - " + progress + "% done"); + return retryAfter(waitForChildrenToCompleteUntil(), "Waiting for child workflows to complete - " + progress + "% done"); } private void wakeup(WorkflowInstance instance) { @@ -134,4 +139,8 @@ private boolean isInInitialState(WorkflowInstance instance) { return instance.status == created; } + protected DateTime waitForChildrenToCompleteUntil() { + return now().plusMinutes(15); + } + } From 7bd5cba9a252060786f6e9d60b00e83c8856d1d1 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 13:09:56 +0200 Subject: [PATCH 05/22] extract bulk workflow variable names to constants, fix DemoBulkWorkflow --- .../tests/demo/workflow/BulkWorkflow.java | 9 +++++---- .../tests/demo/workflow/DemoBulkWorkflow.java | 20 ++++++++++++++----- .../tests/demo/workflow/DemoWorkflow.java | 3 ++- .../java/io/nflow/tests/BulkWorkflowTest.java | 10 +++++++--- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index c47646f21..8b2ca53a4 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -47,9 +47,10 @@ public class BulkWorkflow extends WorkflowDefinition { public static final String BULK_WORKFLOW_TYPE = "bulk"; + public static final String VAR_CHILD_DATA = "childData"; + public static final String VAR_CONCURRENCY = "concurrency"; - private final static EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created)); - + private static final EnumSet RUNNING_STATES = complementOf(EnumSet.of(finished, created)); private static final Logger logger = getLogger(BulkWorkflow.class); @Inject @@ -86,7 +87,7 @@ public BulkWorkflow() { this(BULK_WORKFLOW_TYPE); } - public NextAction splitWork(StateExecution execution, @StateVar(value = "requestData", readOnly = true) JsonNode data) { + public NextAction splitWork(StateExecution execution, @StateVar(value = VAR_CHILD_DATA, readOnly = true) JsonNode data) { boolean childrenFound = splitWorkImpl(execution, data); if (childrenFound) { return moveToState(waitForChildrenToFinish, "Running"); @@ -107,7 +108,7 @@ protected DateTime waitForChildrenUntil() { } public NextAction waitForChildrenToFinish(StateExecution execution, - @StateVar(value = "concurrency", readOnly = true) int concurrency) { + @StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) { List childWorkflows = execution.getAllChildWorkflows(); long running = childWorkflows.stream().filter(this::isRunning).count(); long completed = childWorkflows.stream().filter(this::isFinished).count(); diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java index f092a7027..6e783f40e 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java @@ -1,10 +1,10 @@ package io.nflow.tests.demo.workflow; import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; -import static java.util.Collections.singletonMap; - -import java.util.stream.Stream; +import static java.util.stream.StreamSupport.stream; +import static org.joda.time.DateTime.now; +import org.joda.time.DateTime; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.JsonNode; @@ -29,7 +29,7 @@ protected boolean splitWorkImpl(StateExecution execution, JsonNode data) { if (data.size() == 0) { return false; } - execution.addChildWorkflows(Stream.of(data).map(this::createInstance).toArray(WorkflowInstance[]::new)); + execution.addChildWorkflows(stream(data.spliterator(), false).map(this::createInstance).toArray(WorkflowInstance[]::new)); return true; } @@ -37,8 +37,18 @@ private WorkflowInstance createInstance(JsonNode childData) { return new WorkflowInstance.Builder() // .setType(DEMO_WORKFLOW_TYPE) // .setNextActivation(null) // - .setStateVariables(singletonMap("requestData", childData.asText())) // + .putStateVariable("requestData", childData.asText()) // .build(); } + @Override + protected DateTime waitForChildrenUntil() { + return now().plusSeconds(10); + } + + @Override + protected DateTime waitForChildrenToCompleteUntil() { + return now().plusSeconds(10); + } + } diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java index 04d9355ef..48303efe6 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java @@ -51,8 +51,9 @@ public NextAction begin(@SuppressWarnings("unused") StateExecution execution) { return moveToState(State.process, "Go to process state"); } - public NextAction process(@SuppressWarnings("unused") StateExecution execution) { + public NextAction process(StateExecution execution) { execution.wakeUpParentWorkflow(); return stopInState(State.done, "Go to done state"); } + } diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index ccd4ccc30..6fece4426 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -4,6 +4,7 @@ import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; import static java.util.Arrays.asList; import static org.apache.cxf.jaxrs.client.WebClient.fromClient; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; import static org.junit.runners.MethodSorters.NAME_ASCENDING; @@ -15,6 +16,7 @@ import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; +import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse; import io.nflow.tests.demo.workflow.BulkWorkflow; import io.nflow.tests.runner.NflowServerRule; @@ -39,8 +41,8 @@ static class Configuration { public void t01_startBulkWorkflow() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); req.type = DEMO_BULK_WORKFLOW_TYPE; - req.stateVariables.put("concurrency", 2); - req.stateVariables.put("requestData", nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10))); + req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); + req.stateVariables.put(BulkWorkflow.VAR_CHILD_DATA, nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); assertThat(resp.id, notNullValue()); @@ -49,7 +51,9 @@ public void t01_startBulkWorkflow() { @Test(timeout = 30000) public void t02_waitForBulkToFinish() throws InterruptedException { - getWorkflowInstance(workflowId, done.name()); + ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); + assertThat(instance.childWorkflows.size(), equalTo(1)); + assertThat(instance.childWorkflows.get(1).size(), equalTo(10)); } } From 5fd635343cde7306d7e879141a3dd26e1f687cdb Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 13:30:52 +0200 Subject: [PATCH 06/22] revert some changes as per review comments --- .../nflow/tests/demo/workflow/BulkWorkflow.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index 8b2ca53a4..1a18c7f8b 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -110,8 +110,15 @@ protected DateTime waitForChildrenUntil() { public NextAction waitForChildrenToFinish(StateExecution execution, @StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) { List childWorkflows = execution.getAllChildWorkflows(); - long running = childWorkflows.stream().filter(this::isRunning).count(); - long completed = childWorkflows.stream().filter(this::isFinished).count(); + long completed = 0; + long running = 0; + for (WorkflowInstance child : childWorkflows) { + if (child.status == finished) { + completed++; + } else if (isRunning(child)) { + running++; + } + } if (completed == childWorkflows.size()) { return moveToState(done, "All children completed"); } @@ -128,14 +135,10 @@ private void wakeup(WorkflowInstance instance) { instanceService.wakeupWorkflowInstance(instance.id, emptyList()); } - private boolean isRunning(WorkflowInstance instance) { + protected boolean isRunning(WorkflowInstance instance) { return RUNNING_STATES.contains(instance.status); } - private boolean isFinished(WorkflowInstance instance) { - return instance.status == finished; - } - private boolean isInInitialState(WorkflowInstance instance) { return instance.status == created; } From 4613974b5f2ed72aee13a4ed9d31f765f3499aaf Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 13:44:07 +0200 Subject: [PATCH 07/22] check that last child instance is not started before some other child is finished --- .../test/java/io/nflow/tests/BulkWorkflowTest.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index 6fece4426..2a03c8449 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -3,12 +3,18 @@ import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; import static java.util.Arrays.asList; +import static java.util.Comparator.naturalOrder; +import static java.util.stream.Collectors.toList; import static org.apache.cxf.jaxrs.client.WebClient.fromClient; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; import static org.junit.runners.MethodSorters.NAME_ASCENDING; +import java.util.List; + +import org.joda.time.DateTime; import org.junit.ClassRule; import org.junit.FixMethodOrder; import org.junit.Test; @@ -53,7 +59,12 @@ public void t01_startBulkWorkflow() { public void t02_waitForBulkToFinish() throws InterruptedException { ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); assertThat(instance.childWorkflows.size(), equalTo(1)); - assertThat(instance.childWorkflows.get(1).size(), equalTo(10)); + List childWorkflowIds = instance.childWorkflows.get(1); + assertThat(childWorkflowIds.size(), equalTo(10)); + List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); + DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); + DateTime maxStarted = children.stream().map(child -> child.started).max(naturalOrder()).get(); + assertThat(minFinished, lessThan(maxStarted)); } } From 9df568cbbdc955380ecdaf8ecd2bb5ae63b4f9c3 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 14:39:44 +0200 Subject: [PATCH 08/22] wake up parent when finished can be set via workflow settings --- .../executor/WorkflowStateProcessor.java | 8 +++++++- .../workflow/definition/WorkflowSettings.java | 17 +++++++++++++++++ .../ListWorkflowDefinitionConverter.java | 1 + .../v1/msg/ListWorkflowDefinitionResponse.java | 3 +++ .../nflow/tests/demo/workflow/BulkWorkflow.java | 3 +-- .../nflow/tests/demo/workflow/DemoWorkflow.java | 6 +++--- 6 files changed, 32 insertions(+), 6 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index f4396caeb..1bea4a6e4 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -47,6 +47,7 @@ import io.nflow.engine.workflow.definition.NextAction; import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; +import io.nflow.engine.workflow.definition.WorkflowStateType; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; @@ -204,9 +205,14 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState()); execution.setNextActivation(null); } + WorkflowState nextState = definition.getState(execution.getNextState()); + if (definition.getSettings().wakeupParentWhenFinished && nextState.getType() == WorkflowStateType.end + && instance.parentWorkflowId != null) { + execution.wakeUpParentWorkflow(); + } WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) // .setNextActivation(execution.getNextActivation()) // - .setStatus(getStatus(execution, definition.getState(execution.getNextState()))) // + .setStatus(getStatus(execution, nextState)) // .setStateText(getStateText(instance, execution)) // .setState(execution.getNextState()) // .setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0); diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java index 4a65c706b..d3d4a8a28 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java @@ -61,6 +61,10 @@ public class WorkflowSettings extends ModelObject { * By default, returns true roughly every tenth time. */ public final BooleanSupplier deleteHistoryCondition; + /** + * If true, automatically wake up parent workflow instance when an instance goes to end state. + */ + public final boolean wakeupParentWhenFinished; WorkflowSettings(Builder builder) { this.minErrorTransitionDelay = builder.minErrorTransitionDelay; @@ -72,6 +76,7 @@ public class WorkflowSettings extends ModelObject { this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState); this.historyDeletableAfterHours = builder.historyDeletableAfterHours; this.deleteHistoryCondition = builder.deleteHistoryCondition; + this.wakeupParentWhenFinished = builder.wakeupParentWhenFinished; } /** @@ -87,6 +92,7 @@ public static class Builder { int maxSubsequentStateExecutions = 100; Map maxSubsequentStateExecutionsPerState = new HashMap<>(); Integer historyDeletableAfterHours; + boolean wakeupParentWhenFinished; Random rnd = new Random(); BooleanSupplier deleteHistoryCondition = new BooleanSupplier() { @@ -212,6 +218,17 @@ public Builder setDeleteHistoryCondition(BooleanSupplier deleteHistoryCondition) return this; } + /** + * Set to true to automatically wake up parent workflow instance when an instance goes to end state. + * + * @param wakeupParentWhenFinished True to wake up parent automatically + * @return this. + */ + public Builder setWakeupParentWhenFinished(boolean wakeupParentWhenFinished) { + this.wakeupParentWhenFinished = wakeupParentWhenFinished; + return this; + } + /** * Create workflow settings object. * diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java index 4b92bf130..52c2849f5 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java @@ -58,6 +58,7 @@ public ListWorkflowDefinitionResponse convert(AbstractWorkflowDefinition { diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java index c7db2cfc6..c8c590f29 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java @@ -41,6 +41,9 @@ public static class Settings extends ModelObject { @ApiModelProperty(value = "Delay after which workflow instance history (actions, states) can be deleted from database", required = false) public Integer historyDeletableAfterHours; + @ApiModelProperty(value = "True if parent workflow instance is automatically woken up when instance goes to end state", required = false) + public boolean wakeupParentWhenFinished; + } public static class TransitionDelays extends ModelObject { diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index 1a18c7f8b..e13fcb12f 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -35,7 +35,6 @@ import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.definition.StateVar; import io.nflow.engine.workflow.definition.WorkflowDefinition; -import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; @@ -56,7 +55,7 @@ public class BulkWorkflow extends WorkflowDefinition { @Inject WorkflowInstanceService instanceService; - public enum State implements WorkflowState { + public enum State implements io.nflow.engine.workflow.definition.WorkflowState { splitWork(start), waitForChildrenToFinish(normal), done(end), error(manual); private WorkflowStateType type; diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java index 48303efe6..a57421651 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java @@ -12,6 +12,7 @@ import io.nflow.engine.workflow.definition.NextAction; import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; @@ -41,7 +42,7 @@ public String getDescription() { } public DemoWorkflow() { - super(DEMO_WORKFLOW_TYPE, State.begin, State.error); + super(DEMO_WORKFLOW_TYPE, State.begin, State.error, new WorkflowSettings.Builder().setWakeupParentWhenFinished(true).build()); setDescription("Simple demo workflow: start -> process -> end"); permit(State.begin, State.process); permit(State.process, State.done); @@ -51,8 +52,7 @@ public NextAction begin(@SuppressWarnings("unused") StateExecution execution) { return moveToState(State.process, "Go to process state"); } - public NextAction process(StateExecution execution) { - execution.wakeUpParentWorkflow(); + public NextAction process(@SuppressWarnings("unused") StateExecution execution) { return stopInState(State.done, "Go to done state"); } From 1c3aa56c64ea71c3b09c7e6f9eb95b075131694c Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 14:52:33 +0200 Subject: [PATCH 09/22] fix test --- nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index 2a03c8449..debfc437c 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -59,7 +59,7 @@ public void t01_startBulkWorkflow() { public void t02_waitForBulkToFinish() throws InterruptedException { ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); assertThat(instance.childWorkflows.size(), equalTo(1)); - List childWorkflowIds = instance.childWorkflows.get(1); + List childWorkflowIds = instance.childWorkflows.values().iterator().next(); assertThat(childWorkflowIds.size(), equalTo(10)); List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); From 7d5c16c9d3fe7f8a523fd8d1fc5029eab55a051b Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Tue, 12 Feb 2019 16:16:24 +0200 Subject: [PATCH 10/22] update changelog --- CHANGELOG.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f0510179..56efacfb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,9 @@ -## 5.3.4-SNAPSHOT (future release) +## 5.4.0-SNAPSHOT (future release) **Highlights** +- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system +- Allow child workflows to automatically wake up parent workflow instance when they enter end state (can be enabled via workflow settings) +- Allow creating workflows via REST API with null activation time (by setting activate = false) **Details** - Support boxed primitives (Integer, Float etc) with @StateVar From 4298c50e96b83090305a3da2156d8871e551c661 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Mon, 25 Feb 2019 08:50:06 +0200 Subject: [PATCH 11/22] unlimited retry on bulk workflow, children ready check delay from 1 minute to 1 hour --- .../main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java index e13fcb12f..5be22718d 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java @@ -35,6 +35,7 @@ import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.definition.StateVar; import io.nflow.engine.workflow.definition.WorkflowDefinition; +import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowStateType; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; @@ -76,7 +77,7 @@ public String getDescription() { } protected BulkWorkflow(String type) { - super(type, splitWork, error); + super(type, splitWork, error, new WorkflowSettings.Builder().setMaxRetries(Integer.MAX_VALUE).build()); setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks."); permit(splitWork, waitForChildrenToFinish); permit(waitForChildrenToFinish, done); @@ -103,7 +104,7 @@ protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unu } protected DateTime waitForChildrenUntil() { - return now().plusMinutes(1); + return now().plusHours(1); } public NextAction waitForChildrenToFinish(StateExecution execution, From 0d95466218e136cc4eb280b1890c2b28adb07358 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Mon, 25 Feb 2019 08:54:15 +0200 Subject: [PATCH 12/22] minor optimization --- .../engine/internal/executor/WorkflowStateProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 1bea4a6e4..a63a98127 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -206,8 +206,8 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, execution.setNextActivation(null); } WorkflowState nextState = definition.getState(execution.getNextState()); - if (definition.getSettings().wakeupParentWhenFinished && nextState.getType() == WorkflowStateType.end - && instance.parentWorkflowId != null) { + if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end + && definition.getSettings().wakeupParentWhenFinished) { execution.wakeUpParentWorkflow(); } WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) // From 886f4118badc95d5eed19bfa1f03df3e6a41f1d9 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Fri, 1 Mar 2019 15:57:42 +0200 Subject: [PATCH 13/22] remove wakeupParentWhenFinished from settings, add wait state, wake up parent if in wait state when child finishes --- .../executor/WorkflowStateProcessor.java | 10 ++++-- .../workflow/definition}/BulkWorkflow.java | 20 ++++------- .../workflow/definition/WorkflowSettings.java | 17 --------- .../definition/WorkflowStateType.java | 6 ++++ .../executor/WorkflowStateProcessorTest.java | 35 +++++++++++++++++++ .../ListWorkflowDefinitionConverter.java | 1 - .../msg/ListWorkflowDefinitionResponse.java | 3 -- .../tests/demo/workflow/DemoBulkWorkflow.java | 1 + .../tests/demo/workflow/DemoWorkflow.java | 3 +- .../java/io/nflow/tests/BulkWorkflowTest.java | 7 ++-- 10 files changed, 61 insertions(+), 42 deletions(-) rename {nflow-tests/src/main/java/io/nflow/tests/demo/workflow => nflow-engine/src/main/java/io/nflow/engine/workflow/definition}/BulkWorkflow.java (86%) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index a63a98127..a66b38a80 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -206,9 +206,13 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, execution.setNextActivation(null); } WorkflowState nextState = definition.getState(execution.getNextState()); - if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end - && definition.getSettings().wakeupParentWhenFinished) { - execution.wakeUpParentWorkflow(); + if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) { + String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId); + AbstractWorkflowDefinition parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType); + String parentCurrentState = workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId); + if (parentDefinition.getState(parentCurrentState).getType() == WorkflowStateType.wait) { + execution.wakeUpParentWorkflow(); + } } WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) // .setNextActivation(execution.getNextActivation()) // diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java similarity index 86% rename from nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java rename to nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java index 5be22718d..1cb1a25bb 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/BulkWorkflow.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/BulkWorkflow.java @@ -1,17 +1,17 @@ -package io.nflow.tests.demo.workflow; +package io.nflow.engine.workflow.definition; +import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done; +import static io.nflow.engine.workflow.definition.BulkWorkflow.State.error; +import static io.nflow.engine.workflow.definition.BulkWorkflow.State.splitWork; +import static io.nflow.engine.workflow.definition.BulkWorkflow.State.waitForChildrenToFinish; import static io.nflow.engine.workflow.definition.NextAction.moveToState; import static io.nflow.engine.workflow.definition.NextAction.retryAfter; import static io.nflow.engine.workflow.definition.WorkflowStateType.end; import static io.nflow.engine.workflow.definition.WorkflowStateType.manual; -import static io.nflow.engine.workflow.definition.WorkflowStateType.normal; import static io.nflow.engine.workflow.definition.WorkflowStateType.start; +import static io.nflow.engine.workflow.definition.WorkflowStateType.wait; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created; import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.finished; -import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; -import static io.nflow.tests.demo.workflow.BulkWorkflow.State.error; -import static io.nflow.tests.demo.workflow.BulkWorkflow.State.splitWork; -import static io.nflow.tests.demo.workflow.BulkWorkflow.State.waitForChildrenToFinish; import static java.lang.Math.max; import static java.lang.Math.min; import static java.util.Collections.emptyList; @@ -31,12 +31,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.nflow.engine.service.WorkflowInstanceService; -import io.nflow.engine.workflow.definition.NextAction; -import io.nflow.engine.workflow.definition.StateExecution; -import io.nflow.engine.workflow.definition.StateVar; -import io.nflow.engine.workflow.definition.WorkflowDefinition; -import io.nflow.engine.workflow.definition.WorkflowSettings; -import io.nflow.engine.workflow.definition.WorkflowStateType; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus; @@ -57,7 +51,7 @@ public class BulkWorkflow extends WorkflowDefinition { WorkflowInstanceService instanceService; public enum State implements io.nflow.engine.workflow.definition.WorkflowState { - splitWork(start), waitForChildrenToFinish(normal), done(end), error(manual); + splitWork(start), waitForChildrenToFinish(wait), done(end), error(manual); private WorkflowStateType type; diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java index d3d4a8a28..4a65c706b 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowSettings.java @@ -61,10 +61,6 @@ public class WorkflowSettings extends ModelObject { * By default, returns true roughly every tenth time. */ public final BooleanSupplier deleteHistoryCondition; - /** - * If true, automatically wake up parent workflow instance when an instance goes to end state. - */ - public final boolean wakeupParentWhenFinished; WorkflowSettings(Builder builder) { this.minErrorTransitionDelay = builder.minErrorTransitionDelay; @@ -76,7 +72,6 @@ public class WorkflowSettings extends ModelObject { this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState); this.historyDeletableAfterHours = builder.historyDeletableAfterHours; this.deleteHistoryCondition = builder.deleteHistoryCondition; - this.wakeupParentWhenFinished = builder.wakeupParentWhenFinished; } /** @@ -92,7 +87,6 @@ public static class Builder { int maxSubsequentStateExecutions = 100; Map maxSubsequentStateExecutionsPerState = new HashMap<>(); Integer historyDeletableAfterHours; - boolean wakeupParentWhenFinished; Random rnd = new Random(); BooleanSupplier deleteHistoryCondition = new BooleanSupplier() { @@ -218,17 +212,6 @@ public Builder setDeleteHistoryCondition(BooleanSupplier deleteHistoryCondition) return this; } - /** - * Set to true to automatically wake up parent workflow instance when an instance goes to end state. - * - * @param wakeupParentWhenFinished True to wake up parent automatically - * @return this. - */ - public Builder setWakeupParentWhenFinished(boolean wakeupParentWhenFinished) { - this.wakeupParentWhenFinished = wakeupParentWhenFinished; - return this; - } - /** * Create workflow settings object. * diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java index 709573a4e..d4b3f5de0 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/definition/WorkflowStateType.java @@ -26,6 +26,12 @@ public enum WorkflowStateType { */ normal(false, WorkflowInstanceStatus.inProgress), + /** + * State for waiting something outside this instance to happen, including (but not limited to) child workflow instances to go to + * end state. When a child workflow finishes and parent is in wait state, parent is automatically woken up. + */ + wait(false, WorkflowInstanceStatus.inProgress), + /** * Final state of the workflow. Workflow execution is stopped. */ diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index e7c3830f4..b197cf62b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -78,6 +78,7 @@ import io.nflow.engine.service.WorkflowDefinitionService; import io.nflow.engine.service.WorkflowInstanceInclude; import io.nflow.engine.service.WorkflowInstanceService; +import io.nflow.engine.workflow.definition.BulkWorkflow; import io.nflow.engine.workflow.definition.Mutable; import io.nflow.engine.workflow.definition.NextAction; import io.nflow.engine.workflow.definition.StateExecution; @@ -433,7 +434,9 @@ public void goToErrorStateWhenNextStateIsNull() { public void doNothingWhenNotifyingParentWithoutParentWorkflowId() { WorkflowInstance instance = executingInstanceBuilder().setType("wake-test").setState("wakeParent").build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + executor.run(); + verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(List.class)); } @@ -441,8 +444,15 @@ public void doNothingWhenNotifyingParentWithoutParentWorkflowId() { public void whenWakingUpParentWorkflowSucceeds() { WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType"); + when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState"); + TestDefinition parentDefinition = mock(TestDefinition.class); + doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType"); + when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1); when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(true); + executor.run(); + verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList()); } @@ -450,9 +460,34 @@ public void whenWakingUpParentWorkflowSucceeds() { public void whenWakingUpParentWorkflowFails() { WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType"); + when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState"); + TestDefinition parentDefinition = mock(TestDefinition.class); + doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType"); + when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1); when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(false); + + executor.run(); + + verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList()); + } + + @Test + public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() { + WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("simple-test").setState("processing") + .build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn(BulkWorkflow.BULK_WORKFLOW_TYPE); + when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)) + .thenReturn(BulkWorkflow.State.waitForChildrenToFinish.name()); + BulkWorkflow parentDefinition = new BulkWorkflow(); + doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition(BulkWorkflow.BULK_WORKFLOW_TYPE); + when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList())).thenReturn(true); + executor.run(); + verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList()); + } @Test diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java index 52c2849f5..4b92bf130 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/ListWorkflowDefinitionConverter.java @@ -58,7 +58,6 @@ public ListWorkflowDefinitionResponse convert(AbstractWorkflowDefinition { diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java index c8c590f29..c7db2cfc6 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/ListWorkflowDefinitionResponse.java @@ -41,9 +41,6 @@ public static class Settings extends ModelObject { @ApiModelProperty(value = "Delay after which workflow instance history (actions, states) can be deleted from database", required = false) public Integer historyDeletableAfterHours; - @ApiModelProperty(value = "True if parent workflow instance is automatically woken up when instance goes to end state", required = false) - public boolean wakeupParentWhenFinished; - } public static class TransitionDelays extends ModelObject { diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java index 6e783f40e..7cf7b6460 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoBulkWorkflow.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; +import io.nflow.engine.workflow.definition.BulkWorkflow; import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.instance.WorkflowInstance; diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java index a57421651..141feed83 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/workflow/DemoWorkflow.java @@ -12,7 +12,6 @@ import io.nflow.engine.workflow.definition.NextAction; import io.nflow.engine.workflow.definition.StateExecution; import io.nflow.engine.workflow.definition.WorkflowDefinition; -import io.nflow.engine.workflow.definition.WorkflowSettings; import io.nflow.engine.workflow.definition.WorkflowState; import io.nflow.engine.workflow.definition.WorkflowStateType; @@ -42,7 +41,7 @@ public String getDescription() { } public DemoWorkflow() { - super(DEMO_WORKFLOW_TYPE, State.begin, State.error, new WorkflowSettings.Builder().setWakeupParentWhenFinished(true).build()); + super(DEMO_WORKFLOW_TYPE, State.begin, State.error); setDescription("Simple demo workflow: start -> process -> end"); permit(State.begin, State.process); permit(State.process, State.done); diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index debfc437c..ad22fcb92 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -1,6 +1,6 @@ package io.nflow.tests; -import static io.nflow.tests.demo.workflow.BulkWorkflow.State.done; +import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done; import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; import static java.util.Arrays.asList; import static java.util.Comparator.naturalOrder; @@ -20,10 +20,11 @@ import org.junit.Test; import org.springframework.context.annotation.ComponentScan; +import io.nflow.engine.workflow.definition.BulkWorkflow; import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse; -import io.nflow.tests.demo.workflow.BulkWorkflow; +import io.nflow.tests.demo.workflow.DemoBulkWorkflow; import io.nflow.tests.runner.NflowServerRule; @FixMethodOrder(NAME_ASCENDING) @@ -38,7 +39,7 @@ public BulkWorkflowTest() { super(server); } - @ComponentScan(basePackageClasses = BulkWorkflow.class) + @ComponentScan(basePackageClasses = DemoBulkWorkflow.class) static class Configuration { // for component scanning only } From df65aac7edfdc94aebf8c4d741029810a4ce0f6b Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sun, 3 Mar 2019 22:20:48 +0200 Subject: [PATCH 14/22] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56efacfb9..2eac2b66c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ **Highlights** - Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system -- Allow child workflows to automatically wake up parent workflow instance when they enter end state (can be enabled via workflow settings) +- Child workflow instances automatically wake up the parent when the parent is in a wait state and the child enters an end state - Allow creating workflows via REST API with null activation time (by setting activate = false) **Details** From 44c103b8786f3bb082bd67ed943638e7619f5023 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sun, 3 Mar 2019 22:21:52 +0200 Subject: [PATCH 15/22] update version to 5.4.0-SNAPSHOT --- nflow-engine/pom.xml | 2 +- nflow-explorer/pom.xml | 2 +- nflow-jetty/pom.xml | 2 +- nflow-metrics/pom.xml | 2 +- nflow-netty/pom.xml | 2 +- nflow-perf-test/pom.xml | 2 +- nflow-rest-api-common/pom.xml | 2 +- nflow-rest-api-jax-rs/pom.xml | 2 +- nflow-rest-api-spring-web/pom.xml | 2 +- nflow-server-common/pom.xml | 2 +- nflow-tests/pom.xml | 2 +- pom.xml | 2 +- 12 files changed, 12 insertions(+), 12 deletions(-) diff --git a/nflow-engine/pom.xml b/nflow-engine/pom.xml index cbb3a0306..f97226e82 100644 --- a/nflow-engine/pom.xml +++ b/nflow-engine/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-explorer/pom.xml b/nflow-explorer/pom.xml index 4afa82c56..4b7efa46f 100644 --- a/nflow-explorer/pom.xml +++ b/nflow-explorer/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT .. diff --git a/nflow-jetty/pom.xml b/nflow-jetty/pom.xml index 79b00baf9..35aefc347 100644 --- a/nflow-jetty/pom.xml +++ b/nflow-jetty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT UTF-8 diff --git a/nflow-metrics/pom.xml b/nflow-metrics/pom.xml index 219089c98..cf96749e0 100644 --- a/nflow-metrics/pom.xml +++ b/nflow-metrics/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-netty/pom.xml b/nflow-netty/pom.xml index df14adabc..f40ec4ca5 100644 --- a/nflow-netty/pom.xml +++ b/nflow-netty/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT UTF-8 diff --git a/nflow-perf-test/pom.xml b/nflow-perf-test/pom.xml index ca5099d6f..3600145ce 100644 --- a/nflow-perf-test/pom.xml +++ b/nflow-perf-test/pom.xml @@ -13,7 +13,7 @@ io.nflow nflow-root - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-rest-api-common/pom.xml b/nflow-rest-api-common/pom.xml index 01f8cf011..c2e96391b 100644 --- a/nflow-rest-api-common/pom.xml +++ b/nflow-rest-api-common/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-rest-api-jax-rs/pom.xml b/nflow-rest-api-jax-rs/pom.xml index 2cf83e7dd..91dc6bab0 100644 --- a/nflow-rest-api-jax-rs/pom.xml +++ b/nflow-rest-api-jax-rs/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-rest-api-spring-web/pom.xml b/nflow-rest-api-spring-web/pom.xml index 10943874a..9f930d403 100644 --- a/nflow-rest-api-spring-web/pom.xml +++ b/nflow-rest-api-spring-web/pom.xml @@ -12,7 +12,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/nflow-server-common/pom.xml b/nflow-server-common/pom.xml index 4bc7146cd..cd4710d6a 100644 --- a/nflow-server-common/pom.xml +++ b/nflow-server-common/pom.xml @@ -13,7 +13,7 @@ nflow-root io.nflow - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT UTF-8 diff --git a/nflow-tests/pom.xml b/nflow-tests/pom.xml index 1c8cd9a53..c08a4d5d7 100644 --- a/nflow-tests/pom.xml +++ b/nflow-tests/pom.xml @@ -12,7 +12,7 @@ io.nflow nflow-root - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index f8b9441ea..ae86a1963 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ nflow-root pom nFlow Root - 5.3.4-SNAPSHOT + 5.4.0-SNAPSHOT http://nflow.io From 1b7710adfed1a79450ad3802233eb4e31f0d27ad Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Tue, 5 Mar 2019 17:01:56 +0200 Subject: [PATCH 16/22] Fix backwards compatibility for new activate flag. Add example usage of BulkWorkflow without extending it --- .../v1/converter/CreateWorkflowConverter.java | 4 +- .../v1/msg/CreateWorkflowInstanceRequest.java | 5 ++- .../java/io/nflow/tests/BulkWorkflowTest.java | 42 ++++++++++++++++++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java index a7ffadf79..2fd3eb229 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/converter/CreateWorkflowConverter.java @@ -1,5 +1,6 @@ package io.nflow.rest.v1.converter; +import static java.lang.Boolean.FALSE; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import java.util.Map.Entry; @@ -25,13 +26,14 @@ public CreateWorkflowConverter(WorkflowInstanceFactory factory) { public WorkflowInstance convert(CreateWorkflowInstanceRequest req) { WorkflowInstance.Builder builder = factory.newWorkflowInstanceBuilder().setType(req.type).setBusinessKey(req.businessKey) .setExternalId(req.externalId); - if (req.activate) { + if (!FALSE.equals(req.activate)) { if (req.activationTime != null) { builder.setNextActivation(req.activationTime); } } else { builder.setNextActivation(null); } + builder.setParentWorkflowId(req.parentWorkflowId); if (isNotEmpty(req.startState)) { builder.setState(req.startState); } diff --git a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java index 5385240d2..94ec341ca 100644 --- a/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java +++ b/nflow-rest-api-common/src/main/java/io/nflow/rest/v1/msg/CreateWorkflowInstanceRequest.java @@ -40,7 +40,10 @@ public class CreateWorkflowInstanceRequest extends ModelObject { public DateTime activationTime; @ApiModelProperty("Set to false to force activationTime to null. Default is true.") - public boolean activate = true; + public Boolean activate; + + @ApiModelProperty("Create the workflow as a child of the given parent workflow.") + public Integer parentWorkflowId; @ApiModelProperty("State variables to be set for the new workflow instance.") public Map stateVariables = new HashMap<>(); diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index ad22fcb92..f1ca98359 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -1,7 +1,9 @@ package io.nflow.tests; +import static io.nflow.engine.workflow.definition.BulkWorkflow.BULK_WORKFLOW_TYPE; import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done; import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; +import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; import static java.util.Arrays.asList; import static java.util.Comparator.naturalOrder; import static java.util.stream.Collectors.toList; @@ -9,11 +11,13 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; +import static org.joda.time.DateTime.now; import static org.junit.Assert.assertThat; import static org.junit.runners.MethodSorters.NAME_ASCENDING; import java.util.List; +import io.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest; import org.joda.time.DateTime; import org.junit.ClassRule; import org.junit.FixMethodOrder; @@ -27,6 +31,8 @@ import io.nflow.tests.demo.workflow.DemoBulkWorkflow; import io.nflow.tests.runner.NflowServerRule; +import javax.ws.rs.core.Response; + @FixMethodOrder(NAME_ASCENDING) public class BulkWorkflowTest extends AbstractNflowTest { @@ -45,7 +51,7 @@ static class Configuration { } @Test - public void t01_startBulkWorkflow() { + public void t01_startDemoBulkWorkflow() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); req.type = DEMO_BULK_WORKFLOW_TYPE; req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); @@ -68,4 +74,38 @@ public void t02_waitForBulkToFinish() throws InterruptedException { assertThat(minFinished, lessThan(maxStarted)); } + @Test + public void t11_createBulkWorkflow() { + CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); + req.type = BULK_WORKFLOW_TYPE; + req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); + req.activate = false; + CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, + CreateWorkflowInstanceResponse.class); + assertThat(resp.id, notNullValue()); + workflowId = resp.id; + + for (int i=0; i<10; ++i) { + CreateWorkflowInstanceRequest child = new CreateWorkflowInstanceRequest(); + child.type = DEMO_WORKFLOW_TYPE; + child.activate = false; + child.parentWorkflowId = workflowId; + resp = fromClient(workflowInstanceResource, true).put(child, CreateWorkflowInstanceResponse.class); + assertThat(resp.id, notNullValue()); + } + } + + @Test + public void t12_startBulkWorkflow() { + UpdateWorkflowInstanceRequest req = new UpdateWorkflowInstanceRequest(); + req.nextActivationTime = now(); + Response resp = fromClient(workflowInstanceResource, true).path("id").path(workflowId).put(req, Response.class); + assertThat(resp.getStatus(), equalTo(204)); + } + + @Test(timeout = 30000) + public void t13_waitForBulkToFinish() throws InterruptedException { + t02_waitForBulkToFinish(); + } + } From c7115c32ff41384598f70aeae1a7b7c2793421e1 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 6 Mar 2019 09:29:57 +0200 Subject: [PATCH 17/22] close response stream in test --- .../test/java/io/nflow/tests/BulkWorkflowTest.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index f1ca98359..e0be3dafa 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -17,7 +17,8 @@ import java.util.List; -import io.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest; +import javax.ws.rs.core.Response; + import org.joda.time.DateTime; import org.junit.ClassRule; import org.junit.FixMethodOrder; @@ -28,11 +29,10 @@ import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse; +import io.nflow.rest.v1.msg.UpdateWorkflowInstanceRequest; import io.nflow.tests.demo.workflow.DemoBulkWorkflow; import io.nflow.tests.runner.NflowServerRule; -import javax.ws.rs.core.Response; - @FixMethodOrder(NAME_ASCENDING) public class BulkWorkflowTest extends AbstractNflowTest { @@ -99,8 +99,9 @@ public void t11_createBulkWorkflow() { public void t12_startBulkWorkflow() { UpdateWorkflowInstanceRequest req = new UpdateWorkflowInstanceRequest(); req.nextActivationTime = now(); - Response resp = fromClient(workflowInstanceResource, true).path("id").path(workflowId).put(req, Response.class); - assertThat(resp.getStatus(), equalTo(204)); + try (Response resp = fromClient(workflowInstanceResource, true).path("id").path(workflowId).put(req, Response.class)) { + assertThat(resp.getStatus(), equalTo(204)); + } } @Test(timeout = 30000) From 93bfe876883757838d27e456572cca6fbce7a4ec Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 6 Mar 2019 09:34:43 +0200 Subject: [PATCH 18/22] refactor test code --- .../java/io/nflow/tests/BulkWorkflowTest.java | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index e0be3dafa..2ba34afbc 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -41,6 +41,8 @@ public class BulkWorkflowTest extends AbstractNflowTest { private static int workflowId; + private static int childrenCount; + public BulkWorkflowTest() { super(server); } @@ -55,7 +57,9 @@ public void t01_startDemoBulkWorkflow() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); req.type = DEMO_BULK_WORKFLOW_TYPE; req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); - req.stateVariables.put(BulkWorkflow.VAR_CHILD_DATA, nflowObjectMapper().valueToTree(asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))); + List childData = asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + childrenCount = childData.size(); + req.stateVariables.put(BulkWorkflow.VAR_CHILD_DATA, nflowObjectMapper().valueToTree(childData)); CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); assertThat(resp.id, notNullValue()); @@ -64,14 +68,7 @@ public void t01_startDemoBulkWorkflow() { @Test(timeout = 30000) public void t02_waitForBulkToFinish() throws InterruptedException { - ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); - assertThat(instance.childWorkflows.size(), equalTo(1)); - List childWorkflowIds = instance.childWorkflows.values().iterator().next(); - assertThat(childWorkflowIds.size(), equalTo(10)); - List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); - DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); - DateTime maxStarted = children.stream().map(child -> child.started).max(naturalOrder()).get(); - assertThat(minFinished, lessThan(maxStarted)); + waitForBulkToFinish(); } @Test @@ -81,11 +78,11 @@ public void t11_createBulkWorkflow() { req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); req.activate = false; CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, - CreateWorkflowInstanceResponse.class); + CreateWorkflowInstanceResponse.class); assertThat(resp.id, notNullValue()); workflowId = resp.id; - for (int i=0; i<10; ++i) { + for (int i = 0; i < childrenCount; ++i) { CreateWorkflowInstanceRequest child = new CreateWorkflowInstanceRequest(); child.type = DEMO_WORKFLOW_TYPE; child.activate = false; @@ -106,7 +103,18 @@ public void t12_startBulkWorkflow() { @Test(timeout = 30000) public void t13_waitForBulkToFinish() throws InterruptedException { - t02_waitForBulkToFinish(); + waitForBulkToFinish(); + } + + private void waitForBulkToFinish() throws InterruptedException { + ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); + assertThat(instance.childWorkflows.size(), equalTo(1)); + List childWorkflowIds = instance.childWorkflows.values().iterator().next(); + assertThat(childWorkflowIds.size(), equalTo(childrenCount)); + List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); + DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); + DateTime maxStarted = children.stream().map(child -> child.started).max(naturalOrder()).get(); + assertThat(minFinished, lessThan(maxStarted)); } } From 0c2c3ab90bd8f5156f8fabb8d76eb55d42862d8b Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 6 Mar 2019 13:49:27 +0200 Subject: [PATCH 19/22] refactor based on review comments --- .../test/java/io/nflow/tests/BulkWorkflowTest.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java index 2ba34afbc..b4b6da2dd 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/BulkWorkflowTest.java @@ -4,7 +4,6 @@ import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done; import static io.nflow.tests.demo.workflow.DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE; import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; -import static java.util.Arrays.asList; import static java.util.Comparator.naturalOrder; import static java.util.stream.Collectors.toList; import static org.apache.cxf.jaxrs.client.WebClient.fromClient; @@ -16,6 +15,7 @@ import static org.junit.runners.MethodSorters.NAME_ASCENDING; import java.util.List; +import java.util.stream.IntStream; import javax.ws.rs.core.Response; @@ -41,7 +41,7 @@ public class BulkWorkflowTest extends AbstractNflowTest { private static int workflowId; - private static int childrenCount; + private static final int CHILDREN_COUNT = 10; public BulkWorkflowTest() { super(server); @@ -57,8 +57,7 @@ public void t01_startDemoBulkWorkflow() { CreateWorkflowInstanceRequest req = new CreateWorkflowInstanceRequest(); req.type = DEMO_BULK_WORKFLOW_TYPE; req.stateVariables.put(BulkWorkflow.VAR_CONCURRENCY, 3); - List childData = asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); - childrenCount = childData.size(); + List childData = IntStream.rangeClosed(1, CHILDREN_COUNT).boxed().collect(toList()); req.stateVariables.put(BulkWorkflow.VAR_CHILD_DATA, nflowObjectMapper().valueToTree(childData)); CreateWorkflowInstanceResponse resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); @@ -82,7 +81,7 @@ public void t11_createBulkWorkflow() { assertThat(resp.id, notNullValue()); workflowId = resp.id; - for (int i = 0; i < childrenCount; ++i) { + for (int i = 0; i < CHILDREN_COUNT; ++i) { CreateWorkflowInstanceRequest child = new CreateWorkflowInstanceRequest(); child.type = DEMO_WORKFLOW_TYPE; child.activate = false; @@ -110,7 +109,7 @@ private void waitForBulkToFinish() throws InterruptedException { ListWorkflowInstanceResponse instance = getWorkflowInstance(workflowId, done.name()); assertThat(instance.childWorkflows.size(), equalTo(1)); List childWorkflowIds = instance.childWorkflows.values().iterator().next(); - assertThat(childWorkflowIds.size(), equalTo(childrenCount)); + assertThat(childWorkflowIds.size(), equalTo(CHILDREN_COUNT)); List children = childWorkflowIds.stream().map(this::getWorkflowInstance).collect(toList()); DateTime minFinished = children.stream().map(child -> child.modified).min(naturalOrder()).get(); DateTime maxStarted = children.stream().map(child -> child.started).max(naturalOrder()).get(); From fe71a25005d9746fae70a2152a87b329ffcdad5f Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 6 Mar 2019 15:09:27 +0200 Subject: [PATCH 20/22] Update CHANGELOG.md --- CHANGELOG.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eac2b66c..4e723ea71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,10 @@ ## 5.4.0-SNAPSHOT (future release) **Highlights** -- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system -- Child workflow instances automatically wake up the parent when the parent is in a wait state and the child enters an end state -- Allow creating workflows via REST API with null activation time (by setting activate = false) +- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system. +- Introduce new workflow instance state type `wait`. Child workflow instances automatically wake up the parent when the parent is in a `wait` state and the child enters an `end` state. +- Allow creating workflows via REST API with null activation time (by setting `activate = false`). +- Allow creating child workflows via REST API (by setting `parentWorkflowId`). **Details** - Support boxed primitives (Integer, Float etc) with @StateVar From 6331a19228e9ac0fc089f76354dd94679fcd37cd Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Wed, 6 Mar 2019 15:12:05 +0200 Subject: [PATCH 21/22] Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e723ea71..02a7d13db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Allow creating child workflows via REST API (by setting `parentWorkflowId`). **Details** +- See `BulkWorkflowTest` and `DemoBulkWorkflow` for examples on how to use bulk workflows - Support boxed primitives (Integer, Float etc) with @StateVar - nFlow Explorer: Library updates to `lodash` 4.17.11 and `moment` 2.24.0 Earlier lodash versions had this security vulnerability: https://nvd.nist.gov/vuln/detail/CVE-2018-16487 From 7d98e9ffc9d4ff6d6d69eb6eca06238add9a2481 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sun, 10 Mar 2019 15:37:05 +0200 Subject: [PATCH 22/22] show wait states with light sky blue color in explorer, add demo bulk instance in demo server startup --- nflow-explorer/src/app/components/graph.js | 2 +- nflow-explorer/src/styles/data/graph.css | 11 ++++++++--- .../main/java/io/nflow/tests/demo/DemoServer.java | 13 +++++++++++++ 3 files changed, 22 insertions(+), 4 deletions(-) diff --git a/nflow-explorer/src/app/components/graph.js b/nflow-explorer/src/app/components/graph.js index 22f5e186c..c007d8534 100644 --- a/nflow-explorer/src/app/components/graph.js +++ b/nflow-explorer/src/app/components/graph.js @@ -109,7 +109,7 @@ }; function resolveStyleClass() { - var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error'], state.type) ? state.type : 'normal'); + var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error', 'wait'], state.type) ? state.type : 'normal'); if (workflow && isPassiveNode()) { cssClass += ' node-passive'; } diff --git a/nflow-explorer/src/styles/data/graph.css b/nflow-explorer/src/styles/data/graph.css index 32db9809e..3194ed6e5 100644 --- a/nflow-explorer/src/styles/data/graph.css +++ b/nflow-explorer/src/styles/data/graph.css @@ -28,18 +28,19 @@ Generated svg looks like this */ /* changing font or font size may require code changes to have right size boxes */ -.node-normal, .node-manual, .node-start, .node-end, .node-error { +.node-normal, .node-manual, .node-start, .node-end, .node-error, .node-wait { font-family: "Helvetica Neue", Helvetica, Arial, sans-serif; font-size: 14px; fill: black; opacity: 1; cursor: pointer; } -.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect { +.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect, .node-wait > rect { stroke-width: 1.5px; stroke: black; fill: white; } + .node-start > rect { fill: LightBlue; } @@ -56,7 +57,11 @@ Generated svg looks like this fill: LightGreen; } -.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect { +.node-wait > rect { + fill: LightSkyBlue; +} + +.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect, .node-wait.selected > rect { stroke-width: 3px; } diff --git a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java index 180b2bf05..a6acdfbe1 100644 --- a/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java +++ b/nflow-tests/src/main/java/io/nflow/tests/demo/DemoServer.java @@ -5,6 +5,7 @@ import static io.nflow.tests.demo.SpringApplicationContext.applicationContext; import static io.nflow.tests.demo.workflow.DemoWorkflow.DEMO_WORKFLOW_TYPE; import static io.nflow.tests.demo.workflow.SlowWorkflow.SLOW_WORKFLOW_TYPE; +import static java.util.Arrays.asList; import static java.util.Collections.emptySet; import static org.joda.time.DateTime.now; @@ -16,10 +17,13 @@ import io.nflow.engine.service.WorkflowInstanceInclude; import io.nflow.engine.service.WorkflowInstanceService; +import io.nflow.engine.workflow.definition.BulkWorkflow; import io.nflow.engine.workflow.instance.WorkflowInstance; import io.nflow.engine.workflow.instance.WorkflowInstanceAction; +import io.nflow.engine.workflow.instance.WorkflowInstanceFactory; import io.nflow.jetty.StartNflow; import io.nflow.metrics.NflowMetricsContext; +import io.nflow.tests.demo.workflow.DemoBulkWorkflow; import io.nflow.tests.demo.workflow.DemoWorkflow; public class DemoServer { @@ -38,6 +42,7 @@ static class DemoServerWorkflowsConfiguration { private static void insertDemoWorkflows() { WorkflowInstanceService workflowInstanceService = applicationContext.getBean(WorkflowInstanceService.class); + WorkflowInstanceFactory workflowInstanceFactory = applicationContext.getBean(WorkflowInstanceFactory.class); WorkflowInstance instance = new WorkflowInstance.Builder().setType(DEMO_WORKFLOW_TYPE) .setState(DemoWorkflow.State.begin.name()).build(); int id = workflowInstanceService.insertWorkflowInstance(instance); @@ -53,5 +58,13 @@ private static void insertDemoWorkflows() { instance = new WorkflowInstance.Builder().setType(SLOW_WORKFLOW_TYPE).setSignal(Optional.of(1)).setNextActivation(null) .build(); workflowInstanceService.insertWorkflowInstance(instance); + // insert demo bulk workflow with couple of children + instance = workflowInstanceFactory.newWorkflowInstanceBuilder() // + .setType(DemoBulkWorkflow.DEMO_BULK_WORKFLOW_TYPE) // + .setState(BulkWorkflow.State.splitWork.name()) // + .putStateVariable(BulkWorkflow.VAR_CONCURRENCY, 2) // + .putStateVariable(BulkWorkflow.VAR_CHILD_DATA, asList(1, 2, 3, 4, 5)) // + .build(); + workflowInstanceService.insertWorkflowInstance(instance); } }