Skip to content

Commit

Permalink
Inform workflow dispatcher about pending shutdown to allw faster clea…
Browse files Browse the repository at this point in the history
…n shutdown (#393)

* Pass information to workflow disptcher about pending shutdown to allow faster clean shutdown

* Add test for expedited shutdown for workflow processor. Remove unneeded internalRetryEnabled variable. Tests seem to pass nicely without it

* Tune retry loops log properly on shutdown and avoid extra sleeps

* Shut down the executor forcibly to interrupt any stuck workflows

* Make dispatcher shutdown block until ongoing shutdown is finished when the shutdown was already in progress, tune logging

Co-authored-by: Edvard Fonsell <edvard.fonsell@nitorcreations.com>
  • Loading branch information
gmokki and Edvard Fonsell committed Apr 18, 2020
1 parent 91700e1 commit 3157162
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 62 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
## 7.1.1-SNAPSHOT (future release)

**Highlights**
- `nflow-engine`
- Expedited clean shutdown for workflows that run many steps without delays.

**Details**

- `nflow-engine`
- When shutdown is requested, stop processing workflows immediately after the current state has been executed.
- Dependency updates:
- spring 5.2.5
- jackson 2.10.3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ public class WorkflowDispatcher implements Runnable {
private static final Logger logger = getLogger(WorkflowDispatcher.class);
private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60);

private volatile boolean started;
private volatile boolean shutdownRequested;
private volatile boolean running = false;
private volatile boolean paused = false;
private volatile boolean running;
private volatile boolean paused;
private final CountDownLatch shutdownDone = new CountDownLatch(1);

private final WorkflowInstanceExecutor executor;
Expand Down Expand Up @@ -65,8 +66,9 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao

@Override
public void run() {
logger.info("Starting.");
logger.info("Dispacther started.");
try {
started = true;
workflowDefinitions.postProcessWorkflowDefinitions();
running = true;
while (!shutdownRequested) {
Expand Down Expand Up @@ -102,33 +104,33 @@ public void run() {
running = false;
shutdownPool();
executorDao.markShutdown();
logger.info("Shutdown finished.");
shutdownDone.countDown();
}
}

public void shutdown() {
shutdownRequested = true;
if (running) {
logger.info("Shutdown requested.");
if (started && shutdownDone.getCount() > 0) {
logger.info("Shutdown initiated.");
try {
shutdownDone.await();
logger.info("Shutdown completed.");
} catch (@SuppressWarnings("unused") InterruptedException e) {
logger.warn("Shutdown interrupted.");
}
} else {
logger.info("Shutdown requested, but executor not running, exiting.");
return;
}
try {
// TODO use timeout?
shutdownDone.await();
} catch (@SuppressWarnings("unused") InterruptedException e) {
logger.info("Shutdown interrupted.");
logger.info("Dispatcher was not started or was already shut down.");
}
}

public void pause() {
paused = true;
logger.info("Dispatcher paused.");
}

public void resume() {
paused = false;
logger.info("Dispatcher resumed.");
}

public boolean isPaused() {
Expand All @@ -155,7 +157,7 @@ private void dispatch(List<Long> nextInstanceIds) {
}
logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size());
for (Long instanceId : nextInstanceIds) {
executor.execute(stateProcessorFactory.createProcessor(instanceId));
executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> shutdownRequested));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution;
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed;
import static java.lang.String.format;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
Expand All @@ -25,6 +24,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand Down Expand Up @@ -68,6 +68,7 @@ class WorkflowStateProcessor implements Runnable {
private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowInstanceService workflowInstances;
private final WorkflowInstancePreProcessor workflowInstancePreProcessor;
private final Supplier<Boolean> shutdownRequested;
final ObjectStringMapper objectMapper;
private final WorkflowInstanceDao workflowInstanceDao;
private final MaintenanceDao maintenanceDao;
Expand All @@ -78,16 +79,16 @@ class WorkflowStateProcessor implements Runnable {
private final int stateProcessingRetryDelay;
private final int stateSaveRetryDelay;
private final int stateVariableValueTooLongRetryDelay;
private boolean internalRetryEnabled = true;
private final Map<Long, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
private Thread thread;

WorkflowStateProcessor(long instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowStateProcessor(long instanceId, Supplier<Boolean> shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
Map<Long, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.shutdownRequested = shutdownRequested;
this.objectMapper = objectMapper;
this.workflowDefinitions = workflowDefinitions;
this.workflowInstances = workflowInstances;
Expand All @@ -111,16 +112,19 @@ public void run() {
startTimeSeconds = currentTimeMillis() / 1000;
thread = currentThread();
processingInstances.put(instanceId, this);
boolean stateProcessingFinished = false;
do {
while (true) {
try {
runImpl();
stateProcessingFinished = true;
break;
} catch (Throwable ex) {
if (shutdownRequested.get()) {
logger.error("Failed to process workflow instance and shutdown requested", ex);
break;
}
logger.error("Failed to process workflow instance, retrying after {} seconds", stateProcessingRetryDelay, ex);
sleepIgnoreInterrupted(stateProcessingRetryDelay);
}
} while (!stateProcessingFinished && internalRetryEnabled);
}
processingInstances.remove(instanceId);
MDC.remove(MDC_KEY);
}
Expand All @@ -136,7 +140,7 @@ private void runImpl() {
}
WorkflowSettings settings = definition.getSettings();
int subsequentStateExecutions = 0;
while (instance.status == executing) {
while (instance.status == executing && !shutdownRequested.get()) {
startTimeSeconds = currentTimeMillis() / 1000;
StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao,
workflowInstancePreProcessor, workflowInstances);
Expand Down Expand Up @@ -259,23 +263,21 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
do {
while (true) {
try {
return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder);
} catch (Exception ex) {
if (shutdownRequested.get()) {
logger.error("Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.",
instance.id, ex);
// return the original instance since persisting failed
return instance;
}
logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay,
ex);
sleepIgnoreInterrupted(stateSaveRetryDelay);
}
} while (internalRetryEnabled);
throw new IllegalStateException(format("Failed to save workflow instance %s new state", instance.id));
}

/**
* For unit testing only
*/
void setInternalRetryEnabled(boolean internalRetryEnabled) {
this.internalRetryEnabled = internalRetryEnabled;
}
}

private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map<String, String> originalStateVars,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import javax.inject.Inject;

Expand Down Expand Up @@ -47,8 +48,8 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio
this.env = env;
}

public WorkflowStateProcessor createProcessor(long instanceId) {
return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
public WorkflowStateProcessor createProcessor(long instanceId, Supplier<Boolean> shutdownRequested) {
return new WorkflowStateProcessor(instanceId, shutdownRequested, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listeners);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.nflow.engine.internal.executor;

import static edu.umd.cs.mtc.TestFramework.runOnce;
import static java.lang.Boolean.FALSE;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -53,7 +57,6 @@

@ExtendWith(MockitoExtension.class)
public class WorkflowDispatcherTest {

WorkflowDispatcher dispatcher;
WorkflowInstanceExecutor executor;
MockEnvironment env = new MockEnvironment();
Expand Down Expand Up @@ -120,9 +123,9 @@ public void threadDispatcher() {
.thenThrow(new RuntimeException("Expected: exception during dispatcher execution"))
.thenAnswer(waitForTickAndAnswer(2, ids(2L), this));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable());
when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor);
when(executorFactory.createProcessor(eq(1L), any())).thenReturn(fakeWorkflowExecutor);
WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable());
when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2);
when(executorFactory.createProcessor(eq(2L), any())).thenReturn(fakeWorkflowExecutor2);
dispatcher.run();
}

Expand All @@ -135,8 +138,8 @@ public void threadShutdown() {
public void finish() {
verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt());
InOrder inOrder = inOrder(executorFactory);
inOrder.verify(executorFactory).createProcessor(1);
inOrder.verify(executorFactory).createProcessor(2);
inOrder.verify(executorFactory).createProcessor(eq(1L), any());
inOrder.verify(executorFactory).createProcessor(eq(2L), any());
}
}
runOnce(new ExceptionDuringDispatcherExecutionCausesRetry());
Expand All @@ -159,7 +162,7 @@ public void threadDispatcher() {
@Override
public void finish() {
verify(workflowInstances).pollNextWorkflowInstanceIds(anyInt());
verify(executorFactory, never()).createProcessor(anyInt());
verify(executorFactory, never()).createProcessor(anyLong(), any());
}
}
runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher());
Expand All @@ -184,7 +187,7 @@ public void threadShutdown() {
@Override
public void finish() {
verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt());
verify(executorFactory, never()).createProcessor(anyInt());
verify(executorFactory, never()).createProcessor(anyLong(), any());
}
}
runOnce(new EmptyPollResultCausesNoTasksToBeScheduled());
Expand All @@ -197,7 +200,7 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1L), this));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);
when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor);
dispatcher.run();
}

Expand Down Expand Up @@ -238,7 +241,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
});
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);
when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor);
dispatcher.run();
}

Expand Down Expand Up @@ -369,7 +372,7 @@ public void run() {
}

WorkflowStateProcessor fakeWorkflowExecutor(long instanceId, final Runnable fakeCommand) {
return new WorkflowStateProcessor(instanceId, null, null, null, null, null, null, env, new ConcurrentHashMap<>(),
return new WorkflowStateProcessor(instanceId, FALSE::booleanValue, null, null, null, null, null, null, env, new ConcurrentHashMap<>(),
(WorkflowExecutorListener) null) {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.nflow.engine.internal.executor;

import static java.lang.Boolean.FALSE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.joda.time.DateTimeUtils.currentTimeMillis;
Expand Down Expand Up @@ -61,14 +62,14 @@ public void setup() {

@Test
public void factoryCreatesExecutorsWithoutListeners() {
WorkflowStateProcessor executor = factory.createProcessor(12);
WorkflowStateProcessor executor = factory.createProcessor(12, FALSE::booleanValue);
assertNotNull(executor);
}

@Test
public void factoryCreatesExecutorsWithListeners() {
factory.listeners = listeners;
WorkflowStateProcessor executor = factory.createProcessor(122);
WorkflowStateProcessor executor = factory.createProcessor(122, FALSE::booleanValue);
assertNotNull(executor);
}

Expand Down
Loading

0 comments on commit 3157162

Please sign in to comment.