Skip to content

Commit

Permalink
Merge c28b3e2 into ca1f80d
Browse files Browse the repository at this point in the history
  • Loading branch information
efonsell committed Feb 1, 2021
2 parents ca1f80d + c28b3e2 commit f1e4cc3
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.slf4j.event.Level;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
Expand All @@ -49,6 +51,7 @@
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.ExceptionSeverity;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
Expand Down Expand Up @@ -87,8 +90,9 @@ class WorkflowStateProcessor implements Runnable {
private Thread thread;
private ListenerContext listenerContext;

WorkflowStateProcessor(long instanceId, Supplier<Boolean> shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao,
WorkflowStateProcessor(long instanceId, Supplier<Boolean> shutdownRequested, ObjectStringMapper objectMapper,
WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
Map<Long, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
Expand Down Expand Up @@ -125,7 +129,8 @@ public void run() {
logger.error("Failed to process workflow instance and shutdown requested", ex);
break;
}
logger.error("Failed to process workflow instance {}, retrying after {} seconds", instanceId, stateProcessingRetryDelay, ex);
logger.error("Failed to process workflow instance {}, retrying after {} seconds", instanceId, stateProcessingRetryDelay,
ex);
sleepIgnoreInterrupted(stateProcessingRetryDelay);
}
}
Expand Down Expand Up @@ -170,7 +175,7 @@ private void runImpl() {
}
execution.setFailed(t);
if (state.isRetryAllowed(t)) {
logger.error("Handler threw a retryable exception, trying again later.", t);
logRetryableException(state, t);
execution.setRetry(true);
execution.setNextState(state);
execution.setNextStateReason(getStackTrace(t));
Expand All @@ -195,6 +200,33 @@ private void runImpl() {
logger.debug("Finished.");
}

private void logRetryableException(WorkflowState state, Throwable t) {
ExceptionSeverity exceptionSeverity = state.getExceptionSeverity(t);
BiConsumer<String, Object[]> logMethod = getLogMethod(exceptionSeverity.logLevel);
if (exceptionSeverity.logStackTrace) {
logMethod.accept("Handling state '{}' threw a retryable exception, trying again later.", new Object[] { state.name(), t });
} else {
logMethod.accept("Handling state '{}' threw a retryable exception, trying again later. Message: {}",
new Object[] { state.name(), t.getMessage() });
}
}

private BiConsumer<String, Object[]> getLogMethod(Level logLevel) {
switch (logLevel) {
case TRACE:
return logger::trace;
case DEBUG:
return logger::debug;
case INFO:
return logger::info;
case WARN:
return logger::warn;
case ERROR:
default:
return logger::error;
}
}

void logIfLagging(WorkflowInstance instance) {
Duration executionLag = new Duration(instance.nextActivation, now());
if (executionLag.isLongerThan(standardMinutes(1))) {
Expand Down Expand Up @@ -275,8 +307,9 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder);
} catch (Exception ex) {
if (shutdownRequested.get()) {
logger.error("Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.",
instance.id, ex);
logger.error(
"Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.",
instance.id, ex);
// return the original instance since persisting failed
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.nflow.engine.workflow.definition;

import static org.slf4j.event.Level.ERROR;

import org.slf4j.event.Level;

public class ExceptionSeverity {
public static final ExceptionSeverity DEFAULT = new ExceptionSeverity.Builder().build();
public final Level logLevel;
public final boolean logStackTrace;

ExceptionSeverity(Level logLevel, boolean logStackTrace) {
this.logLevel = logLevel;
this.logStackTrace = logStackTrace;
}

public static class Builder {
private Level logLevel = ERROR;
private boolean logStackTrace = true;

public void setLogLevel(Level logLevel) {
this.logLevel = logLevel;
}

public void setLogStackTrace(boolean logStackTrace) {
this.logStackTrace = logStackTrace;
}

public ExceptionSeverity build() {
return new ExceptionSeverity(logLevel, logStackTrace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,15 @@ default String getDescription() {
default boolean isRetryAllowed(Throwable thrown) {
return !thrown.getClass().isAnnotationPresent(NonRetryable.class);
}

/**
* Return the severity of the exception thrown by the state execution. Using default means ERROR level logging with stack trace.
*
* @param thrown
* The thrown exception.
* @return Exception severity.
*/
default ExceptionSeverity getExceptionSeverity(Throwable thrown) {
return ExceptionSeverity.DEFAULT;
}
}

0 comments on commit f1e4cc3

Please sign in to comment.