Skip to content

Commit

Permalink
wakeup parent in expected states only
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Jun 5, 2016
1 parent c5f2262 commit cd8c084
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -405,21 +405,25 @@ public boolean updateNotRunningWorkflowInstance(WorkflowInstance instance) {
}

@Transactional
public boolean wakeUpWorkflowExternally(int workflowInstanceId) {
String sql = "update nflow_workflow set next_activation = (case when executor_id is null then "
public boolean wakeUpWorkflowExternally(int workflowInstanceId, String[] expectedStates) {
StringBuilder sql = new StringBuilder("update nflow_workflow set next_activation = (case when executor_id is null then "
+ "least(current_timestamp, coalesce(next_activation, current_timestamp)) else next_activation end), "
+ "external_next_activation = current_timestamp where " + executorInfo.getExecutorGroupCondition()
+ " and id = ? and next_activation is not null";
return jdbc.update(sql, workflowInstanceId) == 1;
+ " and id = ? and next_activation is not null");
return addExpectedStatesToQueryAndUpdate(sql, workflowInstanceId, expectedStates);
}

public boolean wakeupWorkflowInstanceIfNotExecuting(long id, String[] expectedStates) {
public boolean wakeupWorkflowInstanceIfNotExecuting(long workflowInstanceId, String[] expectedStates) {
StringBuilder sql = new StringBuilder("update nflow_workflow set next_activation = current_timestamp")
.append(" where id = ? and executor_id is null and status in (").append(sqlVariants.workflowStatus(inProgress))
.append(", ").append(sqlVariants.workflowStatus(created))
.append(") and (next_activation is null or next_activation > current_timestamp)");
return addExpectedStatesToQueryAndUpdate(sql, workflowInstanceId, expectedStates);
}

private boolean addExpectedStatesToQueryAndUpdate(StringBuilder sql, long workflowInstanceId, String[] expectedStates) {
Object[] args = new Object[1 + expectedStates.length];
args[0] = id;
args[0] = workflowInstanceId;
if (expectedStates.length > 0) {
sql.append(" and state in (");
for (int i = 0; i < expectedStates.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,14 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
}

private void processSuccess(StateExecutionImpl execution, WorkflowInstance instance) {
if (execution.isWakeUpParentWorkflowSet()) {
if (instance.parentWorkflowId != null) {
logger.debug("wake up {}", instance.parentWorkflowId);
boolean notified = workflowInstanceDao.wakeUpWorkflowExternally(instance.parentWorkflowId);
if (notified) {
logger.info("Woke up parent workflow instance {}", instance.parentWorkflowId);
} else {
logger.warn("Failed to wake up parent workflow instance {}", instance.parentWorkflowId);
}
String[] expectedStates = execution.getWakeUpParentWorkflowStates();
if (expectedStates != null) {
logger.debug("Possibly waking up parent workflow instance {}", instance.parentWorkflowId);
boolean notified = workflowInstanceDao.wakeUpWorkflowExternally(instance.parentWorkflowId, expectedStates);
if (notified) {
logger.info("Woke up parent workflow instance {}", instance.parentWorkflowId);
} else {
logger.warn("Workflow {} trying to wake up non existing parent workflow", instance.type);
logger.info("Did not woke up parent workflow instance {}", instance.parentWorkflowId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import static java.util.Collections.unmodifiableList;
import static org.joda.time.DateTime.now;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.Assert.notNull;

import java.util.LinkedList;
import java.util.List;

import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.util.Assert;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
Expand All @@ -18,6 +20,7 @@

public class StateExecutionImpl implements StateExecution {

private static final Logger LOG = getLogger(StateExecutionImpl.class);
private final WorkflowInstance instance;
private final ObjectStringMapper objectMapper;
private final WorkflowInstanceDao workflowDao;
Expand All @@ -29,11 +32,11 @@ public class StateExecutionImpl implements StateExecution {
private Throwable thrown;
private boolean isFailed;
private boolean isRetryCountExceeded;
private boolean wakeUpParentWorkflow = false;
private boolean isStateProcessInvoked = false;
private final List<WorkflowInstance> newChildWorkflows = new LinkedList<>();
private final List<WorkflowInstance> newWorkflows = new LinkedList<>();
private boolean createAction = true;
private String[] wakeUpParentStates;

public StateExecutionImpl(WorkflowInstance instance, ObjectStringMapper objectMapper, WorkflowInstanceDao workflowDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor) {
Expand Down Expand Up @@ -202,12 +205,16 @@ public List<WorkflowInstance> getAllChildWorkflows() {
}

@Override
public void wakeUpParentWorkflow() {
wakeUpParentWorkflow = true;
public void wakeUpParentWorkflow(String... expectedStates) {
if (instance.parentWorkflowId == null) {
LOG.warn("wakeUpParentWorkflow called on non-child workflow");
return;
}
wakeUpParentStates = expectedStates;
}

public boolean isWakeUpParentWorkflowSet() {
return wakeUpParentWorkflow;
public String[] getWakeUpParentWorkflowStates() {
return wakeUpParentStates;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ public interface StateExecution {
/**
* Notify parent workflow that it may start processing again. Calling this schedules parent workflow for immediate
* execution. Scheduling is performed when current state method processing completes successfully.
*
* @param expectedStates If parent state is not one of the expected states, it is not woken up. If no expected states are
* given, parent workflow is woken up regardless of the state.
*/
void wakeUpParentWorkflow();
void wakeUpParentWorkflow(String... expectedStates);

/**
* Create a builder for creating child workflows. Created builder has nextActivation set to current time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ public void insertingSubWorkflowWorks() {
}

@Test
public void wakeUpWorkflowExternallyWorks() {
public void wakeUpWorkflowExternallyWorksWithEmptyExpectedStates() {
DateTime now = DateTime.now();
DateTime scheduled = now.plusDays(1);
WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).build();
Expand All @@ -725,11 +725,61 @@ public void wakeUpWorkflowExternallyWorks() {
assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId));
assertThat(i2.parentActionId, equalTo(parentActionId));

dao.wakeUpWorkflowExternally(parentWorkflowId);
dao.wakeUpWorkflowExternally(parentWorkflowId, new String[0]);
WorkflowInstance wakenWorkflow = dao.getWorkflowInstance(parentWorkflowId);
assertTrue(wakenWorkflow.nextActivation.isBefore(now.plusMinutes(1)));
}

@Test
public void wakeUpWorkflowExternallyWorksWithExpectedStates() {
DateTime now = DateTime.now();
DateTime scheduled = now.plusDays(1);
WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).build();
int parentWorkflowId = dao.insertWorkflowInstance(i1);
assertThat(parentWorkflowId, not(equalTo(-1)));
WorkflowInstance createdWorkflow = dao.getWorkflowInstance(parentWorkflowId);

assertThat(createdWorkflow.nextActivation, equalTo(scheduled));

int parentActionId = addWorkflowAction(parentWorkflowId, i1);
assertThat(parentActionId, not(equalTo(-1)));

int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId);
WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId);
assertThat(subWorkflowId, not(equalTo(-1)));
assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId));
assertThat(i2.parentActionId, equalTo(parentActionId));

dao.wakeUpWorkflowExternally(parentWorkflowId, new String[] { "CreateLoan" });
WorkflowInstance wakenWorkflow = dao.getWorkflowInstance(parentWorkflowId);
assertTrue(wakenWorkflow.nextActivation.isBefore(now.plusMinutes(1)));
}

@Test
public void wakeUpWorkflowExternallyDoesNotWakeUpWorkflowInUnexpectedState() {
DateTime now = DateTime.now();
DateTime scheduled = now.plusDays(1);
WorkflowInstance i1 = constructWorkflowInstanceBuilder().setNextActivation(scheduled).setState("unexpected").build();
int parentWorkflowId = dao.insertWorkflowInstance(i1);
assertThat(parentWorkflowId, not(equalTo(-1)));
WorkflowInstance createdWorkflow = dao.getWorkflowInstance(parentWorkflowId);

assertThat(createdWorkflow.nextActivation, equalTo(scheduled));

int parentActionId = addWorkflowAction(parentWorkflowId, i1);
assertThat(parentActionId, not(equalTo(-1)));

int subWorkflowId = addSubWorkflow(parentWorkflowId, parentActionId);
WorkflowInstance i2 = dao.getWorkflowInstance(subWorkflowId);
assertThat(subWorkflowId, not(equalTo(-1)));
assertThat(i2.parentWorkflowId, equalTo(parentWorkflowId));
assertThat(i2.parentActionId, equalTo(parentActionId));

dao.wakeUpWorkflowExternally(parentWorkflowId, new String[] { "CreateLoan" });
WorkflowInstance wakenWorkflow = dao.getWorkflowInstance(parentWorkflowId);
assertThat(wakenWorkflow.nextActivation, is(scheduled));
}

private static void checkSameWorkflowInfo(WorkflowInstance i1, WorkflowInstance i2) {
assertThat(i1.type, equalTo(i2.type));
assertThat(i1.executorId, equalTo(i2.executorId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,25 +397,25 @@ public void doNothingWhenNotifyingParentWithoutParentWorkflowId() {
WorkflowInstance instance = executingInstanceBuilder().setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);
executor.run();
verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class));
verify(workflowInstanceDao, never()).wakeUpWorkflowExternally(any(Integer.class), any(String[].class));
}

@Test
public void whenWakingUpParentWorkflowSucceeds() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);
when(workflowInstanceDao.wakeUpWorkflowExternally(999)).thenReturn(true);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new String[0])).thenReturn(true);
executor.run();
verify(workflowInstanceDao).wakeUpWorkflowExternally(999);
verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new String[0]);
}

@Test
public void whenWakingUpParentWorkflowFails() {
WorkflowInstance instance = executingInstanceBuilder().setParentWorkflowId(999).setType("wake-test").setState("wakeParent").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);
when(workflowInstanceDao.wakeUpWorkflowExternally(999)).thenReturn(false);
when(workflowInstanceDao.wakeUpWorkflowExternally(999, new String[0])).thenReturn(false);
executor.run();
verify(workflowInstanceDao).wakeUpWorkflowExternally(999);
verify(workflowInstanceDao).wakeUpWorkflowExternally(999, new String[0]);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.emptyArray;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -79,12 +81,21 @@ public void addWorkflows() {
}

@Test
public void wakeUpParentWorkflowSetsWakeupFlag() {
assertThat(execution.isWakeUpParentWorkflowSet(), is(false));
public void wakeUpParentWorkflowSetsWakeUpStates() {
instance = new WorkflowInstance.Builder().setId(99).setExternalId("ext").setRetries(88).setState("myState")
.setBusinessKey("business").setParentWorkflowId(123).build();
execution = new StateExecutionImpl(instance, objectStringMapper, workflowDao, workflowInstancePreProcessor);
assertThat(execution.getWakeUpParentWorkflowStates(), is(nullValue()));
execution.wakeUpParentWorkflow();
assertThat(execution.isWakeUpParentWorkflowSet(), is(true));
assertThat(execution.getWakeUpParentWorkflowStates(), is(emptyArray()));
execution.wakeUpParentWorkflow("state1", "state2");
assertThat(execution.getWakeUpParentWorkflowStates(), is(arrayContaining("state1", "state2")));
}

@Test
public void nonChildWorkflowCannotWakeUpParent() {
execution.wakeUpParentWorkflow();
assertThat(execution.isWakeUpParentWorkflowSet(), is(true));
assertThat(execution.getWakeUpParentWorkflowStates(), is(nullValue()));
}

@Test
Expand Down

0 comments on commit cd8c084

Please sign in to comment.