Skip to content

Commit

Permalink
Merge 3e194eb into b5a46a5
Browse files Browse the repository at this point in the history
  • Loading branch information
gmokki committed Jan 3, 2020
2 parents b5a46a5 + 3e194eb commit ade0efc
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
**Details**
- `nflow-engine`
- Throw `StateVariableValueTooLongException` if a state variable value that does not fit into the database column is detected. Checked in `StateExecution.setVariable`, `StateExecution.addWorkflows`, `StateExecution.addChildWorkflows`, `WorkflowInstanceService.insertWorkflowInstance` and when creating a new instance via REST API. If the exception is thrown during state processing and not handled by the state implementation, nFlow engine will catch the exception and retry state processing after delay configured by property `nflow.executor.stateVariableValueTooLongRetryDelay.minutes` (default is 60).
- Fix honoring of `includeCurrentStateVariables` flag in `WorkflowInstanceService.listWorkflowInstances`. This caused major slowness when using bulk workflows.
To preserve the existing (incorrect) default behaviour in backwards compatible way the default value in `QueryWorkflowInstances.Builder` is changed to `true`. The REST API is unaffected.
Especially in workflows with many children that use the `StateExecution.getAllChildWorkflows` method the performance impact can be high. Before 7.0.0 release, it is recommended to use `StateExecution.queryChildWorkflows(new QueryWorkflowInstances.Builder().setIncludeCurrentStateVariables(false).build())` if state variables are not needed.
- Dependency updates:
- jetty 9.4.24.v20191120
- junit4 4.13-rc-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,10 @@ public List<WorkflowInstance> queryWorkflowInstances(QueryWorkflowInstances quer
sql = sqlVariants.limit(sql, getMaxResults(query.maxResults));
List<WorkflowInstance> ret = namedJdbc.query(sql, params, new WorkflowInstanceRowMapper()).stream()
.map(WorkflowInstance.Builder::build).collect(toList());
for (WorkflowInstance instance : ret) {
fillState(instance);
if (query.includeCurrentStateVariables) {
for (WorkflowInstance instance : ret) {
fillState(instance);
}
}
if (query.includeActions) {
for (WorkflowInstance instance : ret) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import javax.inject.Inject;

import io.nflow.engine.workflow.instance.QueryWorkflowInstances;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -90,7 +91,8 @@ public NextAction splitWork(StateExecution execution, @StateVar(value = VAR_CHIL
}

protected boolean splitWorkImpl(StateExecution execution, @SuppressWarnings("unused") JsonNode data) {
if (execution.getAllChildWorkflows().isEmpty()) {
// TODO: change back to getAllChildWorkflows after it no longer returns state variables
if (execution.queryChildWorkflows(new QueryWorkflowInstances.Builder().setIncludeCurrentStateVariables(false).build()).isEmpty()) {
throw new RuntimeException("No child workflows found for workflow instance " + execution.getWorkflowInstanceId()
+ " - either add them before starting the parent or implement splitWorkflowImpl");
}
Expand All @@ -103,7 +105,8 @@ protected DateTime waitForChildrenUntil() {

public NextAction waitForChildrenToFinish(StateExecution execution,
@StateVar(value = VAR_CONCURRENCY, readOnly = true) int concurrency) {
List<WorkflowInstance> childWorkflows = execution.getAllChildWorkflows();
// TODO: change back to getAllChildWorkflows after it no longer returns state variables
List<WorkflowInstance> childWorkflows = execution.queryChildWorkflows(new QueryWorkflowInstances.Builder().setIncludeCurrentStateVariables(false).build());
long completed = 0;
long running = 0;
for (WorkflowInstance child : childWorkflows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public interface StateExecution {
List<WorkflowInstance> queryChildWorkflows(QueryWorkflowInstances query);

/**
* Return all child workflows for current workflow.
* Return all child workflows with state variables for current workflow.
* TODO: Starting from 7.0.0 release, the state variables of child workflows will not be returned anymore. Use {@link #queryChildWorkflows(QueryWorkflowInstances)} instead to get the child workflows with state variables.
* @return List of all child workflows.
*/
List<WorkflowInstance> getAllChildWorkflows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public static class Builder {
String businessKey;
String externalId;
boolean includeActions;
boolean includeCurrentStateVariables;
/** TODO: remove setting the default value to true in 7.0.0 release. */
boolean includeCurrentStateVariables = true;
boolean includeActionStateVariables;
boolean includeChildWorkflows;
Long maxResults;
Expand Down Expand Up @@ -235,6 +236,7 @@ public Builder setIncludeActions(boolean includeActions) {

/**
* Set whether current workflow state variables should be included in the results.
* The default is `true`. TODO: Change default to `false` in 7.0.0 release.
* @param includeCurrentStateVariables True to include state variables, false otherwise.
* @return this.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static java.lang.Thread.sleep;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.apache.commons.lang3.StringUtils.countMatches;
import static org.apache.commons.lang3.StringUtils.repeat;
Expand Down Expand Up @@ -147,11 +148,13 @@ public void queryWorkflowInstanceWithMinimalConditions() {
WorkflowInstance i1 = constructWorkflowInstanceBuilder().build();
long id = dao.insertWorkflowInstance(i1);
assertThat(id, not(equalTo(-1)));
QueryWorkflowInstances q = new QueryWorkflowInstances.Builder().build();
// TODO: remove setIncludeCurrentStateVariables(false) in 7.0.0 release
QueryWorkflowInstances q = new QueryWorkflowInstances.Builder().setIncludeCurrentStateVariables(false).build();
List<WorkflowInstance> createdInstances = dao.queryWorkflowInstances(q);
assertThat(createdInstances.size(), is(1));
WorkflowInstance instance = createdInstances.get(0);
checkSameWorkflowInfo(i1, instance);
WorkflowInstance originalWithoutStateVariables = new WorkflowInstance.Builder(i1).setStateVariables(emptyMap()).build();
checkSameWorkflowInfo(originalWithoutStateVariables, instance);
assertNull(instance.started);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
import static org.hamcrest.Matchers.hasEntry;
import static org.joda.time.DateTime.now;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -132,6 +134,8 @@ public void queryChildWorkflowsIsRestrictedToChildsOfCurrentWorkflow() {
assertThat(actualQuery.parentWorkflowId, is(99L));
assertThat(actualQuery.types, is(asList("a", "b")));
assertThat(actualQuery.businessKey, is("123"));
// TODO: change to `false` in 7.0.0 release
assertThat(actualQuery.includeCurrentStateVariables, is(true));
}

@Test
Expand All @@ -143,8 +147,10 @@ public void getAllChildWorkflowsQueriesAllChildWorkflows() {
QueryWorkflowInstances actualQuery = queryCaptor.getValue();

assertThat(actualQuery.parentWorkflowId, is(99L));
assertThat(actualQuery.types, is(Collections.<String>emptyList()));
assertThat(actualQuery.types, emptyCollectionOf(String.class));
assertThat(actualQuery.businessKey, is(nullValue()));
// TODO: change to `false` in 7.0.0 release
assertThat(actualQuery.includeCurrentStateVariables, is(true));
}

@Test
Expand All @@ -165,7 +171,7 @@ public void workflowInstanceBuilder() {
builder.putStateVariable("foo", data);
WorkflowInstance i = builder.build();
assertThat(i.nextActivation, is(notNullValue()));
assertThat(i.stateVariables.get("foo"), is(serializedData));
assertThat(i.stateVariables, hasEntry("foo", serializedData));
verify(objectStringMapper).convertFromObject("foo", data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private NextAction nextStep(StateExecution execution, int nextN, int offset, Sta
public NextAction poll(StateExecution execution) {
// get finished and failed child workflows
QueryWorkflowInstances query = new QueryWorkflowInstances.Builder().addStatuses(WorkflowInstanceStatus.manual,
WorkflowInstanceStatus.finished).build();
WorkflowInstanceStatus.finished).setIncludeCurrentStateVariables(true).build();
List<WorkflowInstance> finishedChildren = execution.queryChildWorkflows(query);

if (finishedChildren.size() < execution.getAllChildWorkflows().size()) {
Expand Down
27 changes: 12 additions & 15 deletions nflow-tests/src/test/java/io/nflow/tests/ChildWorkflowTest.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
package io.nflow.tests;

import static java.time.Duration.ofSeconds;
import static org.apache.cxf.jaxrs.client.WebClient.fromClient;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;

import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest;
import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse;
import io.nflow.rest.v1.msg.ListWorkflowInstanceResponse;
import io.nflow.tests.demo.workflow.FibonacciWorkflow;
import io.nflow.tests.extension.NflowServerConfig;
import io.nflow.tests.extension.NflowServerExtension;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;

import static java.time.Duration.ofSeconds;
import static org.apache.cxf.jaxrs.client.WebClient.fromClient;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.notNullValue;

@ExtendWith(NflowServerExtension.class)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
Expand Down Expand Up @@ -48,8 +46,7 @@ public void startFibonacciWorkflow() {
public void checkFibonacciWorkflowComputesCorrectResult() {
ListWorkflowInstanceResponse response = getWorkflowInstanceWithTimeout(workflowId, FibonacciWorkflow.State.done.name(),
ofSeconds(30));
assertTrue(response.stateVariables.containsKey("result"));
assertEquals(8, response.stateVariables.get("result"));
assertThat(response.stateVariables, hasEntry("result", 8));
}

}

0 comments on commit ade0efc

Please sign in to comment.