Skip to content

Commit

Permalink
Merge 7d98e9f into 6f0e253
Browse files Browse the repository at this point in the history
  • Loading branch information
gmokki committed Mar 10, 2019
2 parents 6f0e253 + 7d98e9f commit 7c5aacc
Show file tree
Hide file tree
Showing 26 changed files with 440 additions and 21 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
## 5.3.4-SNAPSHOT (future release)
## 5.4.0-SNAPSHOT (future release)

**Highlights**
- Introduce BulkWorkflow which can be used or extended to handle mass of child workflows without overloading the system.
- Introduce new workflow instance state type `wait`. Child workflow instances automatically wake up the parent when the parent is in a `wait` state and the child enters an `end` state.
- Allow creating workflows via REST API with null activation time (by setting `activate = false`).
- Allow creating child workflows via REST API (by setting `parentWorkflowId`).

**Details**
- See `BulkWorkflowTest` and `DemoBulkWorkflow` for examples on how to use bulk workflows
- Support boxed primitives (Integer, Float etc) with @StateVar
- nFlow Explorer: Library updates to `lodash` 4.17.11, `moment` 2.24.0 and `extend` 3.0.2
Earlier lodash versions had this security vulnerability: https://nvd.nist.gov/vuln/detail/CVE-2018-16487
Expand Down
2 changes: 1 addition & 1 deletion nflow-engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
import io.nflow.engine.workflow.definition.WorkflowStateType;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;
import io.nflow.engine.workflow.instance.WorkflowInstanceAction;
Expand Down Expand Up @@ -204,9 +205,18 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
logger.debug("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
}
WorkflowState nextState = definition.getState(execution.getNextState());
if (instance.parentWorkflowId != null && nextState.getType() == WorkflowStateType.end) {
String parentType = workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId);
AbstractWorkflowDefinition<?> parentDefinition = workflowDefinitions.getWorkflowDefinition(parentType);
String parentCurrentState = workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId);
if (parentDefinition.getState(parentCurrentState).getType() == WorkflowStateType.wait) {
execution.wakeUpParentWorkflow();
}
}
WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) //
.setNextActivation(execution.getNextActivation()) //
.setStatus(getStatus(execution, definition.getState(execution.getNextState()))) //
.setStatus(getStatus(execution, nextState)) //
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.nflow.engine.workflow.definition;

import static io.nflow.engine.workflow.definition.BulkWorkflow.State.done;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.error;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.splitWork;
import static io.nflow.engine.workflow.definition.BulkWorkflow.State.waitForChildrenToFinish;
import static io.nflow.engine.workflow.definition.NextAction.moveToState;
import static io.nflow.engine.workflow.definition.NextAction.retryAfter;
import static io.nflow.engine.workflow.definition.WorkflowStateType.end;
import static io.nflow.engine.workflow.definition.WorkflowStateType.manual;
import static io.nflow.engine.workflow.definition.WorkflowStateType.start;
import static io.nflow.engine.workflow.definition.WorkflowStateType.wait;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.created;
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.finished;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Collections.emptyList;
import static java.util.EnumSet.complementOf;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.EnumSet;
import java.util.List;

import javax.inject.Inject;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.JsonNode;

import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.instance.WorkflowInstance;
import io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus;

/**
* Bulk child workflow executor that does not overflow the system.
*/
@Component
public class BulkWorkflow extends WorkflowDefinition<BulkWorkflow.State> {

public static final String BULK_WORKFLOW_TYPE = "bulk";
public static final String VAR_CHILD_DATA = "childData";
public static final String VAR_CONCURRENCY = "concurrency";

private static final EnumSet<WorkflowInstanceStatus> RUNNING_STATES = complementOf(EnumSet.of(finished, created));
private static final Logger logger = getLogger(BulkWorkflow.class);

@Inject
WorkflowInstanceService instanceService;

public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
splitWork(start), waitForChildrenToFinish(wait), done(end), error(manual);

private WorkflowStateType type;

State(WorkflowStateType type) {
this.type = type;
}

@Override
public WorkflowStateType getType() {
return type;
}

@Override
public String getDescription() {
return name();
}
}

protected BulkWorkflow(String type) {
super(type, splitWork, error, new WorkflowSettings.Builder().setMaxRetries(Integer.MAX_VALUE).build());
setDescription("Executes child workflows in bulk but gracefully without effecting non-bulk tasks.");
permit(splitWork, waitForChildrenToFinish);
permit(waitForChildrenToFinish, done);
}

public BulkWorkflow() {
this(BULK_WORKFLOW_TYPE);
}

public NextAction splitWork(StateExecution execution, @StateVar(value = VAR_CHILD_DATA, readOnly = true) JsonNode data) {
boolean childrenFound = splitWorkImpl(execution, data);
if (childrenFound) {
return moveToState(waitForChildrenToFinish, "Running");
}
return retryAfter(waitForChildrenUntil(), "Waiting for child workflows");
}

protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) {
if (execution.getAllChildWorkflows().isEmpty()) {
throw new RuntimeException(
"No child workflows found - either add them before starting the parent or implement splitWorkflowImpl");
}
return true;
}

protected DateTime waitForChildrenUntil() {
return now().plusHours(1);
}

public NextAction waitForChildrenToFinish(StateExecution execution,
@StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) {
List<WorkflowInstance> childWorkflows = execution.getAllChildWorkflows();
long completed = 0;
long running = 0;
for (WorkflowInstance child : childWorkflows) {
if (child.status == finished) {
completed++;
} else if (isRunning(child)) {
running++;
}
}
if (completed == childWorkflows.size()) {
return moveToState(done, "All children completed");
}
long toStart = min(max(1, concurrency) - running, childWorkflows.size() - completed);
if (toStart > 0) {
childWorkflows.stream().filter(this::isInInitialState).limit(toStart).forEach(this::wakeup);
logger.info("Started " + toStart + " child workflows");
}
long progress = completed * 100 / childWorkflows.size();
return retryAfter(waitForChildrenToCompleteUntil(), "Waiting for child workflows to complete - " + progress + "% done");
}

private void wakeup(WorkflowInstance instance) {
instanceService.wakeupWorkflowInstance(instance.id, emptyList());
}

protected boolean isRunning(WorkflowInstance instance) {
return RUNNING_STATES.contains(instance.status);
}

private boolean isInInitialState(WorkflowInstance instance) {
return instance.status == created;
}

protected DateTime waitForChildrenToCompleteUntil() {
return now().plusMinutes(15);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ public enum WorkflowStateType {
*/
normal(false, WorkflowInstanceStatus.inProgress),

/**
* State for waiting something outside this instance to happen, including (but not limited to) child workflow instances to go to
* end state. When a child workflow finishes and parent is in wait state, parent is automatically woken up.
*/
wait(false, WorkflowInstanceStatus.inProgress),

/**
* Final state of the workflow. Workflow execution is stopped.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.BulkWorkflow;
import io.nflow.engine.workflow.definition.Mutable;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
Expand Down Expand Up @@ -433,26 +434,60 @@ public void goToErrorStateWhenNextStateIsNull() {
public void doNothingWhenNotifyingParentWithoutParentWorkflowId() {
WorkflowInstance instance = executingInstanceBuilder().setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);

executor.run();

verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(List.class));
}

@Test
public void whenWakingUpParentWorkflowSucceeds() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
TestDefinition parentDefinition = mock(TestDefinition.class);
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(true);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());
}

@Test
public void whenWakingUpParentWorkflowFails() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn("parentType");
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId)).thenReturn("parentState");
TestDefinition parentDefinition = mock(TestDefinition.class);
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition("parentType");
when(parentDefinition.getState("parentState")).thenReturn(TestDefinition.TestState.start1);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(false);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());
}

@Test
public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("simple-test").setState("processing")
.build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
when(workflowInstanceDao.getWorkflowInstanceType(instance.parentWorkflowId)).thenReturn(BulkWorkflow.BULK_WORKFLOW_TYPE);
when(workflowInstanceDao.getWorkflowInstanceState(instance.parentWorkflowId))
.thenReturn(BulkWorkflow.State.waitForChildrenToFinish.name());
BulkWorkflow parentDefinition = new BulkWorkflow();
doReturn(parentDefinition).when(workflowDefinitions).getWorkflowDefinition(BulkWorkflow.BULK_WORKFLOW_TYPE);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new ArrayList<String>())).thenReturn(true);

executor.run();

verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new ArrayList<String>());

}

@Test
Expand Down
2 changes: 1 addition & 1 deletion nflow-explorer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<properties>
Expand Down
2 changes: 1 addition & 1 deletion nflow-explorer/src/app/components/graph.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
};

function resolveStyleClass() {
var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error'], state.type) ? state.type : 'normal');
var cssClass = 'node-' + (_.includes(['start', 'manual', 'end', 'error', 'wait'], state.type) ? state.type : 'normal');
if (workflow && isPassiveNode()) {
cssClass += ' node-passive';
}
Expand Down
11 changes: 8 additions & 3 deletions nflow-explorer/src/styles/data/graph.css
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,19 @@ Generated svg looks like this
</g>
*/
/* changing font or font size may require code changes to have right size boxes */
.node-normal, .node-manual, .node-start, .node-end, .node-error {
.node-normal, .node-manual, .node-start, .node-end, .node-error, .node-wait {
font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
font-size: 14px;
fill: black;
opacity: 1; cursor: pointer;
}

.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect {
.node-start > rect, .node-manual > rect, .node-normal > rect, .node-end > rect, .node-error > rect, .node-wait > rect {
stroke-width: 1.5px;
stroke: black;
fill: white;
}

.node-start > rect {
fill: LightBlue;
}
Expand All @@ -56,7 +57,11 @@ Generated svg looks like this
fill: LightGreen;
}

.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect {
.node-wait > rect {
fill: LightSkyBlue;
}

.node-normal.selected > rect, .node-manual.selected > rect, .node-error.selected > rect, .node-start.selected > rect, .node-end.selected > rect, .node-wait.selected > rect {
stroke-width: 3px;
}

Expand Down
2 changes: 1 addition & 1 deletion nflow-jetty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
2 changes: 1 addition & 1 deletion nflow-metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<groupId>io.nflow</groupId>
<artifactId>nflow-root</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion nflow-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
2 changes: 1 addition & 1 deletion nflow-perf-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>io.nflow</groupId>
<artifactId>nflow-root</artifactId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<build>
<plugins>
Expand Down
2 changes: 1 addition & 1 deletion nflow-rest-api-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<parent>
<artifactId>nflow-root</artifactId>
<groupId>io.nflow</groupId>
<version>5.3.4-SNAPSHOT</version>
<version>5.4.0-SNAPSHOT</version>
</parent>
<properties>
<EMPTY></EMPTY>
Expand Down
Loading

0 comments on commit 7c5aacc

Please sign in to comment.