Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Mar 28, 2016
1 parent 1305bbb commit bc3980a
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import com.nitorcreations.nflow.engine.internal.dao.ExecutorDao;
import com.nitorcreations.nflow.engine.internal.dao.PollingRaceConditionException;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.util.PeriodicLogger;

@Component
public class WorkflowDispatcher implements Runnable {

private static final Logger logger = getLogger(WorkflowDispatcher.class);
private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60);

private volatile boolean shutdownRequested;
private final CountDownLatch shutdownDone = new CountDownLatch(1);
Expand All @@ -30,6 +32,7 @@ public class WorkflowDispatcher implements Runnable {
private final WorkflowStateProcessorFactory stateProcessorFactory;
private final ExecutorDao executorRecovery;
private final long sleepTime;
private final int stuckThreadThresholdSeconds;
private final Random rand = new Random();

@Inject
Expand All @@ -40,6 +43,7 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
this.stateProcessorFactory = stateProcessorFactory;
this.executorRecovery = executorRecovery;
this.sleepTime = env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class);
this.stuckThreadThresholdSeconds = env.getRequiredProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.class);
if (!executorRecovery.isTransactionSupportEnabled()) {
throw new BeanCreationException("Transaction support must be enabled");
}
Expand All @@ -55,8 +59,10 @@ public void run() {

if (!shutdownRequested) {
executorRecovery.tick();
if (stateProcessorFactory.getPotentiallyStuckProcessors() == executor.getThreadCount()) {
logger.warn("All state processor threads are potentially stuck.");
int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors();
if (potentiallyStuckProcessors > 0) {
periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)",
potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds);
}
dispatch(getNextInstanceIds());
}
Expand Down Expand Up @@ -89,7 +95,7 @@ public void shutdown() {
}

private void shutdownPool() {
try {
try {
executor.shutdown();
} catch (Exception e) {
logger.error("Error in shutting down thread pool.", e);
Expand Down Expand Up @@ -117,7 +123,7 @@ private List<Integer> getNextInstanceIds() {
private void sleep(boolean randomize) {
try {
if (randomize) {
Thread.sleep((long)(sleepTime * rand.nextDouble()));
Thread.sleep((long) (sleepTime * rand.nextDouble()));
} else {
Thread.sleep(sleepTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
import static com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress;
import static com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution;
import static com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
import static org.joda.time.DateTime.now;
import static org.joda.time.DateTimeUtils.currentTimeMillis;
import static org.joda.time.Duration.standardMinutes;
import static org.joda.time.Duration.standardSeconds;
import static org.slf4j.LoggerFactory.getLogger;
import static org.springframework.util.ReflectionUtils.invokeMethod;

Expand All @@ -20,14 +21,14 @@
import java.util.List;
import java.util.Map;

import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.core.env.Environment;
import org.springframework.util.Assert;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.util.PeriodicLogger;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
import com.nitorcreations.nflow.engine.internal.workflow.StateExecutionImpl;
import com.nitorcreations.nflow.engine.internal.workflow.WorkflowInstancePreProcessor;
Expand All @@ -50,6 +51,8 @@
class WorkflowStateProcessor implements Runnable {

static final Logger logger = getLogger(WorkflowStateProcessor.class);
private static final PeriodicLogger laggingLogger = new PeriodicLogger(logger, 30);
private static final PeriodicLogger threadStuckLogger = new PeriodicLogger(logger, 60);
private static final String MDC_KEY = "workflowInstanceId";

private final int MAX_SUBSEQUENT_STATE_EXECUTIONS = 100;
Expand All @@ -62,15 +65,16 @@ class WorkflowStateProcessor implements Runnable {
private final WorkflowInstanceDao workflowInstanceDao;
private final List<WorkflowExecutorListener> executorListeners;
final String illegalStateChangeAction;
DateTime lastLogged = now();
private final int unknownWorkflowTypeRetryDelay;
private final int unknownWorkflowStateRetryDelay;
private final Map<Integer, DateTime> processingInstances;
private final Map<Integer, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
private Thread thread;

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env, Map<Integer, DateTime> processingInstances,
WorkflowExecutorListener... executorListeners) {
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
Map<Integer, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.objectMapper = objectMapper;
this.workflowDefinitions = workflowDefinitions;
Expand All @@ -88,7 +92,9 @@ class WorkflowStateProcessor implements Runnable {
public void run() {
try {
MDC.put(MDC_KEY, String.valueOf(instanceId));
processingInstances.put(instanceId, now());
startTimeSeconds = currentTimeMillis() / 1000;
thread = currentThread();
processingInstances.put(instanceId, this);
runImpl();
} catch (Throwable ex) {
logger.error("Unexpected failure occurred", ex);
Expand All @@ -110,7 +116,8 @@ private void runImpl() {
WorkflowSettings settings = definition.getSettings();
int subsequentStateExecutions = 0;
while (instance.status == executing) {
StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao, workflowInstancePreProcessor);
StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao,
workflowInstancePreProcessor);
ListenerContext listenerContext = new ListenerContext(definition, instance, execution);
WorkflowInstanceAction.Builder actionBuilder = new WorkflowInstanceAction.Builder(instance);
WorkflowState state;
Expand Down Expand Up @@ -145,14 +152,9 @@ private void runImpl() {
}

void logIfLagging(WorkflowInstance instance) {
DateTime now = now();
Duration executionLag = new Duration(instance.nextActivation, now);
Duration executionLag = new Duration(instance.nextActivation, now());
if (executionLag.isLongerThan(standardMinutes(1))) {
Duration logInterval = new Duration(lastLogged, now);
if (logInterval.isLongerThan(standardSeconds(30))) {
logger.warn("Execution lagging {} seconds.", executionLag.getStandardSeconds());
lastLogged = now;
}
laggingLogger.warn("Execution lagging {} seconds.", executionLag.getStandardSeconds());
}
}

Expand All @@ -173,8 +175,7 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) {
logger.debug("Finished.");
}

private int busyLoopPrevention(WorkflowSettings settings,
int subsequentStateExecutions, StateExecutionImpl execution) {
private int busyLoopPrevention(WorkflowSettings settings, int subsequentStateExecutions, StateExecutionImpl execution) {
if (subsequentStateExecutions++ >= MAX_SUBSEQUENT_STATE_EXECUTIONS && execution.getNextActivation() != null) {
logger.warn("Executed {} times without delay, forcing short transition delay", MAX_SUBSEQUENT_STATE_EXECUTIONS);
if (execution.getNextActivation().isBefore(settings.getShortTransitionActivation())) {
Expand All @@ -190,13 +191,12 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
logger.info("No handler method defined for {}, clearing next activation", execution.getNextState());
execution.setNextActivation(null);
}
WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance)
.setNextActivation(execution.getNextActivation())
.setStatus(getStatus(execution, definition.getState(execution.getNextState())))
.setStateText(getStateText(instance, execution))
.setState(execution.getNextState())
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);

WorkflowInstance.Builder builder = new WorkflowInstance.Builder(instance) //
.setNextActivation(execution.getNextActivation()) //
.setStatus(getStatus(execution, definition.getState(execution.getNextState()))) //
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
if (execution.isStateProcessInvoked()) {
actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)).setStateText(execution.getNextStateReason());
if (execution.isFailed()) {
Expand Down Expand Up @@ -251,7 +251,8 @@ private WorkflowActionType getActionType(StateExecutionImpl execution) {
}

private boolean isNextActivationImmediately(StateExecutionImpl execution) {
return execution.isStateProcessInvoked() && execution.getNextActivation() != null && !execution.getNextActivation().isAfterNow();
return execution.isStateProcessInvoked() && execution.getNextActivation() != null
&& !execution.getNextActivation().isAfterNow();
}

private NextAction processWithListeners(ListenerContext listenerContext, WorkflowInstance instance,
Expand All @@ -269,12 +270,15 @@ private NextAction processWithListeners(ListenerContext listenerContext, Workflo

static class ExecutorListenerChain implements ListenerChain {
private final Iterator<WorkflowExecutorListener> chain;

ExecutorListenerChain(List<WorkflowExecutorListener> chain) {
this.chain = chain.iterator();
}

@Override
public NextAction next(ListenerContext context) {
Assert.isTrue(chain.hasNext(), "Ran out of listeners in listener chain. The last listener must not call " + this.getClass().getSimpleName() + ".next().");
Assert.isTrue(chain.hasNext(), "Ran out of listeners in listener chain. The last listener must not call "
+ this.getClass().getSimpleName() + ".next().");
return chain.next().process(context, this);
}
}
Expand All @@ -285,8 +289,8 @@ private class ProcessingExecutorListener extends AbstractWorkflowExecutorListene
private final StateExecutionImpl execution;
private final WorkflowState state;

public ProcessingExecutorListener(final WorkflowInstance instance, final AbstractWorkflowDefinition<? extends WorkflowState> definition,
final StateExecutionImpl execution, final WorkflowState state) {
public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDefinition<? extends WorkflowState> definition,
StateExecutionImpl execution, WorkflowState state) {
this.instance = instance;
this.definition = definition;
this.execution = execution;
Expand All @@ -301,7 +305,8 @@ public NextAction process(ListenerContext listenerContext, ListenerChain chain)

private class NormalStateHandler extends StateHandler {

public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution, WorkflowState currentState) {
public NormalStateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
WorkflowState currentState) {
super(instance, definition, execution, currentState);
}

Expand All @@ -314,7 +319,9 @@ protected NextAction getNextAction(WorkflowStateMethod method, Object args[]) {

private class SkippedStateHandler extends StateHandler {
private final NextAction nextAction;
public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution, WorkflowState currentState) {

public SkippedStateHandler(NextAction nextAction, WorkflowInstance instance, AbstractWorkflowDefinition<?> definition,
StateExecutionImpl execution, WorkflowState currentState) {
super(instance, definition, execution, currentState);
this.nextAction = nextAction;
}
Expand All @@ -330,13 +337,15 @@ abstract private class StateHandler {
protected final AbstractWorkflowDefinition<?> definition;
protected final StateExecutionImpl execution;
protected final WorkflowState currentState;
public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition,
StateExecutionImpl execution, WorkflowState currentState) {

public StateHandler(WorkflowInstance instance, AbstractWorkflowDefinition<?> definition, StateExecutionImpl execution,
WorkflowState currentState) {
this.instance = instance;
this.definition = definition;
this.execution = execution;
this.currentState = currentState;
}

abstract protected NextAction getNextAction(WorkflowStateMethod method, Object args[]);

public NextAction processState() {
Expand All @@ -354,29 +363,30 @@ public NextAction processState() {
try {
nextAction = getNextAction(method, args);
if (nextAction == null) {
logger.error("State '{}' handler method returned null, proceeding to error state '{}'", instance.state, definition
.getErrorState().name());
logger.error("State '{}' handler method returned null, proceeding to error state '{}'", instance.state,
definition.getErrorState().name());
nextAction = moveToState(definition.getErrorState(), "State handler method returned null");
execution.setFailed();
} else if (nextAction.getNextState() != null && !definition.getStates().contains(nextAction.getNextState())) {
logger.error("State '{}' is not a state of '{}' workflow definition, proceeding to error state '{}'",
nextAction.getNextState(), definition.getType(), definition.getErrorState().name());
nextAction = moveToState(definition.getErrorState(), "State '" + instance.state
+ "' handler method returned invalid next state '" + nextAction.getNextState() + "'");
nextAction.getNextState(), definition.getType(), definition.getErrorState().name());
nextAction = moveToState(definition.getErrorState(),
"State '" + instance.state + "' handler method returned invalid next state '" + nextAction.getNextState() + "'");
execution.setFailed();
} else if (!"ignore".equals(illegalStateChangeAction) && !definition.isAllowedNextAction(instance, nextAction)) {
logger.warn("State transition from '{}' to '{}' is not allowed by workflow definition.", instance.state,
nextAction.getNextState());
nextAction.getNextState());
if ("fail".equals(illegalStateChangeAction)) {
nextAction = moveToState(definition.getErrorState(), "Illegal state transition from " + instance.state + " to "
+ nextAction.getNextState().name() + ", proceeding to error state " + definition.getErrorState().name());
+ nextAction.getNextState().name() + ", proceeding to error state " + definition.getErrorState().name());
execution.setFailed();
}
}
} catch (InvalidNextActionException e) {
logger.error("State '" + instance.state
+ "' handler method failed to create valid next action, proceeding to error state '"
+ definition.getErrorState().name() + "'", e);
logger.error(
"State '" + instance.state + "' handler method failed to create valid next action, proceeding to error state '"
+ definition.getErrorState().name() + "'",
e);
nextAction = moveToState(definition.getErrorState(), e.getMessage());
execution.setFailed(e);
}
Expand Down Expand Up @@ -428,4 +438,21 @@ private void processAfterFailureListeners(ListenerContext listenerContext, Throw
}
}
}

public long getStartTimeSeconds() {
return startTimeSeconds;
}

public void logPotentiallyStuck(long processingTimeSeconds) {
threadStuckLogger.warn("Workflow instance {} has been processed for {} seconds, it may be stuck. Stack trace: {}", instanceId,
processingTimeSeconds, getStackTraceAsString());
}

private StringBuilder getStackTraceAsString() {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : thread.getStackTrace()) {
sb.append(element.toString()).append('\n');
}
return sb;
}
}
Loading

0 comments on commit bc3980a

Please sign in to comment.