Skip to content

Commit

Permalink
Inform workflow dispatcher about pending shutdown to allw faster clea…
Browse files Browse the repository at this point in the history
…n shutdown (#393)

* Pass information to workflow disptcher about pending shutdown to allow faster clean shutdown

* Add test for expedited shutdown for workflow processor. Remove unneeded internalRetryEnabled variable. Tests seem to pass nicely without it

* Tune retry loops log properly on shutdown and avoid extra sleeps

* Shut down the executor forcibly to interrupt any stuck workflows

* Make dispatcher shutdown block until ongoing shutdown is finished when the shutdown was already in progress, tune logging

Co-authored-by: Edvard Fonsell <edvard.fonsell@nitorcreations.com>
  • Loading branch information
gmokki and Edvard Fonsell committed Apr 19, 2020
1 parent 8d40ca5 commit 7a0b4a1
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class WorkflowDispatcher implements Runnable {
private static final Logger logger = getLogger(WorkflowDispatcher.class);
private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60);

private volatile boolean started;
private volatile boolean shutdownRequested;
private volatile boolean running;
private volatile boolean paused;
Expand Down Expand Up @@ -67,6 +68,7 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
public void run() {
logger.info("Dispacther started.");
try {
started = true;
workflowDefinitions.postProcessWorkflowDefinitions();
running = true;
while (!shutdownRequested) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Map;
import java.util.function.Supplier;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
Expand All @@ -35,6 +34,7 @@
import org.springframework.dao.EmptyResultDataAccessException;
import org.springframework.util.Assert;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.nflow.engine.internal.dao.MaintenanceDao;
import io.nflow.engine.internal.dao.WorkflowInstanceDao;
import io.nflow.engine.internal.util.PeriodicLogger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import static org.joda.time.DateTimeUtils.setCurrentMillisSystem;
import static org.joda.time.Duration.standardHours;
import static org.joda.time.Period.hours;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -948,7 +948,7 @@ public void handlePotentiallyStuckInterruptsThreadWhenListenerReturnsTrue() thro
executor.handlePotentiallyStuck(processingTime);

thread.join(1000);
assertFalse("Processing thread did not die after interruption", thread.isAlive());
assertFalse(thread.isAlive(), "Processing thread did not die after interruption");

verify(listener1).handlePotentiallyStuck(any(ListenerContext.class), eq(processingTime));
verify(listener2).handlePotentiallyStuck(any(ListenerContext.class), eq(processingTime));
Expand Down

0 comments on commit 7a0b4a1

Please sign in to comment.