Skip to content

Commit

Permalink
log warnings when state processor threads are stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Mar 26, 2016
1 parent 4b3ffe2 commit 0e7e9e4
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public void run() {

if (!shutdownRequested) {
executorRecovery.tick();
if (stateProcessorFactory.getPotentiallyStuckProcessors() == executor.getThreadCount()) {
logger.warn("All state processor threads are potentially stuck.");
}
dispatch(getNextInstanceIds());
}
} catch (PollingRaceConditionException pex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class WorkflowInstanceExecutor {
private static final Logger logger = getLogger(WorkflowInstanceExecutor.class);

private final int awaitTerminationSeconds;
private final int threadCount;
final ThreadPoolExecutor executor;
final ThresholdBlockingQueue<Runnable> queue;

Expand All @@ -24,6 +25,11 @@ public WorkflowInstanceExecutor(int maxQueueSize, int threadCount, int notifyThr
executor = new ThreadPoolExecutor(threadCount, threadCount, keepAliveSeconds, SECONDS, queue, threadFactory);
executor.allowCoreThreadTimeOut(keepAliveSeconds > 0);
this.awaitTerminationSeconds = awaitTerminationSeconds;
this.threadCount = threadCount;
}

public int getThreadCount() {
return threadCount;
}

public void waitUntilQueueSizeLowerThanThreshold(DateTime waitUntil) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.joda.time.DateTime;
import org.joda.time.Duration;
Expand Down Expand Up @@ -64,16 +65,18 @@ class WorkflowStateProcessor implements Runnable {
DateTime lastLogged = now();
private final int unknownWorkflowTypeRetryDelay;
private final int unknownWorkflowStateRetryDelay;
private final Map<Integer, DateTime> processingInstances;

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env, Map<Integer, DateTime> processingInstances,
WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.objectMapper = objectMapper;
this.workflowDefinitions = workflowDefinitions;
this.workflowInstances = workflowInstances;
this.workflowInstanceDao = workflowInstanceDao;
this.processingInstances = processingInstances;
this.executorListeners = asList(executorListeners);
this.workflowInstancePreProcessor = workflowInstancePreProcessor;
illegalStateChangeAction = env.getRequiredProperty("nflow.illegal.state.change.action");
Expand All @@ -85,10 +88,12 @@ class WorkflowStateProcessor implements Runnable {
public void run() {
try {
MDC.put(MDC_KEY, String.valueOf(instanceId));
processingInstances.put(instanceId, now());
runImpl();
} catch (Throwable ex) {
logger.error("Unexpected failure occurred", ex);
} finally {
processingInstances.remove(instanceId);
MDC.remove(MDC_KEY);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package com.nitorcreations.nflow.engine.internal.executor;

import static java.util.Collections.synchronizedMap;
import static org.joda.time.DateTime.now;
import static org.joda.time.Minutes.minutesBetween;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;

import javax.inject.Inject;

import com.nitorcreations.nflow.engine.internal.workflow.WorkflowInstancePreProcessor;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
import com.nitorcreations.nflow.engine.internal.workflow.WorkflowInstancePreProcessor;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener;
import com.nitorcreations.nflow.engine.service.WorkflowDefinitionService;
import com.nitorcreations.nflow.engine.service.WorkflowInstanceService;
Expand All @@ -23,6 +34,8 @@ public class WorkflowStateProcessorFactory {
private final Environment env;
@Autowired(required = false)
protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0];
final Map<Integer, DateTime> processingInstances = synchronizedMap(new HashMap<Integer, DateTime>());
private static final Logger logger = getLogger(WorkflowStateProcessorFactory.class);

@Inject
public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
Expand All @@ -38,7 +51,23 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio

public WorkflowStateProcessor createProcessor(int instanceId) {
return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
workflowInstancePreProcessor, env, listeners);
workflowInstancePreProcessor, env, processingInstances, listeners);
}

public int getPotentiallyStuckProcessors() {
DateTime now = now();
int potentiallyStuck = 0;
synchronized (processingInstances) {
for (Entry<Integer, DateTime> entry : processingInstances.entrySet()) {
int processingTimeMinutes = minutesBetween(entry.getValue(), now).getMinutes();
if (processingTimeMinutes > 5) {
potentiallyStuck++;
logger.warn("Workflow instance {} has been processed for {} minutes, it may be stuck.", entry.getKey(),
processingTimeMinutes);
}
}
}
return potentiallyStuck;
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package com.nitorcreations.nflow.engine.internal.executor;

import static edu.umd.cs.mtc.TestFramework.runOnce;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.slf4j.Logger.ROOT_LOGGER_NAME;
import static org.slf4j.LoggerFactory.getLogger;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
Expand All @@ -32,19 +42,28 @@
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import edu.umd.cs.mtc.MultithreadedTestCase;
import edu.umd.cs.mtc.TestFramework;

@RunWith(MockitoJUnitRunner.class)
public class WorkflowDispatcherTest {

WorkflowDispatcher dispatcher;
WorkflowInstanceExecutor executor;

@Mock WorkflowInstanceDao workflowInstances;
@Mock ExecutorDao recovery;
@Mock WorkflowStateProcessorFactory executorFactory;

MockEnvironment env = new MockEnvironment();
@Mock
WorkflowInstanceDao workflowInstances;
@Mock
ExecutorDao recovery;
@Mock
WorkflowStateProcessorFactory executorFactory;
@Mock
Appender<ILoggingEvent> mockAppender;
@Captor
ArgumentCaptor<ILoggingEvent> loggingEventCaptor;

@Before
public void setup() {
Expand All @@ -56,6 +75,14 @@ public void setup() {
when(recovery.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
Logger logger = (Logger) getLogger(ROOT_LOGGER_NAME);
logger.addAppender(mockAppender);
}

@After
public void teardown() {
Logger logger = (Logger) getLogger(ROOT_LOGGER_NAME);
logger.detachAppender(mockAppender);
}

@Test(expected = BeanCreationException.class)
Expand All @@ -69,15 +96,13 @@ public void exceptionDuringDispatcherExecutionCausesRetry() throws Throwable {
@SuppressWarnings("unused")
class ExceptionDuringDispatcherExecutionCausesRetry extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenReturn(ids(1))
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenReturn(ids(1))
.thenThrow(new RuntimeException("Expected: exception during dispatcher execution"))
.thenAnswer(waitForTickAndAnswer(2, ids(2), this));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable());
when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor);
WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable());
when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2);

dispatcher.run();
}

Expand All @@ -94,18 +119,15 @@ public void finish() {
inOrder.verify(executorFactory).createProcessor(2);
}
}
TestFramework.runOnce(new ExceptionDuringDispatcherExecutionCausesRetry());
runOnce(new ExceptionDuringDispatcherExecutionCausesRetry());
}

@Test
public void errorDuringDispatcherExecutionStopsDispatcher() throws Throwable {
@SuppressWarnings("unused")
class ErrorDuringDispatcherExecutionStopsDispatcher extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenThrow(new AssertionError())
.thenReturn(ids(1));

when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenThrow(new AssertionError()).thenReturn(ids(1));
try {
dispatcher.run();
Assert.fail("Error should stop the dispatcher");
Expand All @@ -120,7 +142,7 @@ public void finish() {
verify(executorFactory, never()).createProcessor(anyInt());
}
}
TestFramework.runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher());
runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher());
}

@Test
Expand All @@ -129,8 +151,7 @@ public void emptyPollResultCausesNoTasksToBeScheduled() throws Throwable {
class EmptyPollResultCausesNoTasksToBeScheduled extends MultithreadedTestCase {
@SuppressWarnings("unchecked")
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenReturn(ids(), ids())
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenReturn(ids(), ids())
.thenAnswer(waitForTickAndAnswer(2, ids(), this));
dispatcher.run();
}
Expand All @@ -146,19 +167,17 @@ public void finish() {
verify(executorFactory, never()).createProcessor(anyInt());
}
}
TestFramework.runOnce(new EmptyPollResultCausesNoTasksToBeScheduled());
runOnce(new EmptyPollResultCausesNoTasksToBeScheduled());
}

@Test
public void shutdownBlocksUntilPoolShutdown() throws Throwable {
@SuppressWarnings("unused")
class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenAnswer(waitForTickAndAnswer(2, ids(1), this));
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1), this));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);

dispatcher.run();
}

Expand All @@ -168,7 +187,7 @@ public void threadShutdown() {
assertPoolIsShutdown(true);
}
}
TestFramework.runOnce(new ShutdownBlocksUntilPoolShutdown());
runOnce(new ShutdownBlocksUntilPoolShutdown());
}

@Test
Expand All @@ -186,7 +205,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
});
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);

dispatcher.run();
}

Expand All @@ -198,7 +216,7 @@ public void threadShutdown() {
assertPoolIsShutdown(true);
}
}
TestFramework.runOnce(new ShutdownCanBeInterrupted());
runOnce(new ShutdownCanBeInterrupted());
}

@Test
Expand All @@ -216,7 +234,6 @@ public void initialize() {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(), this));
doThrow(new RuntimeException("Expected: exception on pool shutdown")).when(poolSpy).shutdown();

dispatcher.run();
}

Expand All @@ -230,7 +247,7 @@ public void finish() {
verify(poolSpy).shutdown();
}
}
TestFramework.runOnce(new ExceptionOnPoolShutdownIsNotPropagated());
runOnce(new ExceptionOnPoolShutdownIsNotPropagated());
}

@Test
Expand All @@ -253,7 +270,38 @@ public void finish() {
dispatcher.shutdown();
}
}
TestFramework.runOnce(new ShutdownCanBeCalledMultipleTimes());
runOnce(new ShutdownCanBeCalledMultipleTimes());
}

@Test
public void dispatcherLogsWarningWhenAllThreadsArePotentiallyStuck() throws Throwable {
@SuppressWarnings("unused")
class DispatcherLogsWarning extends MultithreadedTestCase {
public void threadDispatcher() throws InterruptedException {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenAnswer(waitForTickAndAnswer(2, Collections.<Integer> emptyList(), this));
when(executorFactory.getPotentiallyStuckProcessors()).thenReturn(executor.getThreadCount());
dispatcher.run();
}

public void threadShutdown() {
waitForTick(1);
dispatcher.shutdown();
}

@Override
public void finish() {
verify(mockAppender, atLeast(1)).doAppend(loggingEventCaptor.capture());
for (ILoggingEvent event : loggingEventCaptor.getAllValues()) {
if (event.getLevel().equals(Level.WARN)
&& event.getFormattedMessage().equals("All state processor threads are potentially stuck.")) {
return;
}
}
Assert.fail("Expected warning was not logged");
}
}
runOnce(new DispatcherLogsWarning());
}

void assertPoolIsShutdown(boolean isTrue) {
Expand All @@ -278,7 +326,8 @@ public void run() {
}

WorkflowStateProcessor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) {
return new WorkflowStateProcessor(instanceId, null, null, null, null, null, env, (WorkflowExecutorListener) null) {
return new WorkflowStateProcessor(instanceId, null, null, null, null, null, env, new HashMap<Integer, DateTime>(),
(WorkflowExecutorListener) null) {
@Override
public void run() {
fakeCommand.run();
Expand Down
Loading

0 comments on commit 0e7e9e4

Please sign in to comment.