Skip to content

Commit

Permalink
Merge a6ee8ca into 78a48a3
Browse files Browse the repository at this point in the history
  • Loading branch information
efonsell committed Feb 2, 2021
2 parents 78a48a3 + a6ee8ca commit 6bc5f0e
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 56 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
**Highlights**
- Support updating workflow instance business key.
- Support for searching workflow instances by state variable key and value.
- Control retrying and logging of an exception thrown by a state method via `WorkflowSettings` (replaces deprecated `WorkflowState.isRetryAllowed(...)`)

**Details**
- `nflow-engine`
- `WorkflowInstanceService.updateWorkflowInstance` can now be used to update business key of the workflow instance.
- Use `QueryWorkflowInstances.setStateVariable` to limit search query by state variable name and key. Only the latest value of the state variable of the workflow instance is used.
- Control retrying and logging of an exception thrown by a state method via `WorkflowSettings.Builder.setExceptionAnalyzer(...)` / `ExceptionHandling`:
- Control whether the exception is considered retryable or not (replaces deprecated `WorkflowState.isRetryAllowed(...)`).
- Control which log level is used to log the retryable exception.
- Control whether the stack trace of the retryable exception is logged or not.
- `nflow-rest-api-common`, `nflow-rest-api-jax-rs`, `nflow-rest-api-spring-web`
- `UpdateWorkflowInstanceRequest.businessKey` field was added to support updating workflow instance business key via REST API.
- Added support for new query parameters `stateVariableKey` and `stateVariableValue` to `GET /v1/workflow-instance` to limit search query by state variable name and key. Only the latest value of the state variable of the workflow instance is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.slf4j.event.Level;
import org.springframework.core.env.Environment;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.EmptyResultDataAccessException;
Expand All @@ -49,6 +51,7 @@
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.definition.AbstractWorkflowDefinition;
import io.nflow.engine.workflow.definition.ExceptionHandling;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.WorkflowSettings;
import io.nflow.engine.workflow.definition.WorkflowState;
Expand Down Expand Up @@ -87,8 +90,9 @@ class WorkflowStateProcessor implements Runnable {
private Thread thread;
private ListenerContext listenerContext;

WorkflowStateProcessor(long instanceId, Supplier<Boolean> shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao,
WorkflowStateProcessor(long instanceId, Supplier<Boolean> shutdownRequested, ObjectStringMapper objectMapper,
WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao,
WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env,
Map<Long, WorkflowStateProcessor> processingInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
Expand Down Expand Up @@ -125,7 +129,8 @@ public void run() {
logger.error("Failed to process workflow instance and shutdown requested", ex);
break;
}
logger.error("Failed to process workflow instance {}, retrying after {} seconds", instanceId, stateProcessingRetryDelay, ex);
logger.error("Failed to process workflow instance {}, retrying after {} seconds", instanceId, stateProcessingRetryDelay,
ex);
sleepIgnoreInterrupted(stateProcessingRetryDelay);
}
}
Expand Down Expand Up @@ -164,19 +169,20 @@ private void runImpl() {
} catch (StateVariableValueTooLongException e) {
instance = rescheduleStateVariableValueTooLong(e, instance);
saveInstanceState = false;
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
} catch (Throwable thrown) {
if (thrown instanceof UndeclaredThrowableException) {
thrown = thrown.getCause();
}
execution.setFailed(t);
if (state.isRetryAllowed(t)) {
logger.error("Handler threw a retryable exception, trying again later.", t);
execution.setFailed(thrown);
ExceptionHandling exceptionHandling = settings.exceptionAnalyzer.apply(state, thrown);
if (exceptionHandling.isRetryable) {
logRetryableException(exceptionHandling, state.name(), thrown);
execution.setRetry(true);
execution.setNextState(state);
execution.setNextStateReason(getStackTrace(t));
execution.setNextStateReason(getStackTrace(thrown));
execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition);
} else {
logger.error("Handler threw an exception and retrying is not allowed, going to failure state.", t);
logger.error("Handler threw an exception and retrying is not allowed, going to failure state.", thrown);
execution.handleFailure(definition, "Handler threw an exception and retrying is not allowed");
}
} finally {
Expand All @@ -195,6 +201,32 @@ private void runImpl() {
logger.debug("Finished.");
}

private void logRetryableException(ExceptionHandling exceptionHandling, String state, Throwable thrown) {
BiConsumer<String, Object[]> logMethod = getLogMethod(exceptionHandling.logLevel);
if (exceptionHandling.logStackTrace) {
logMethod.accept("Handling state '{}' threw a retryable exception, trying again later.", new Object[] { state, thrown });
} else {
logMethod.accept("Handling state '{}' threw a retryable exception, trying again later. Message: {}",
new Object[] { state, thrown.getMessage() });
}
}

private BiConsumer<String, Object[]> getLogMethod(Level logLevel) {
switch (logLevel) {
case TRACE:
return logger::trace;
case DEBUG:
return logger::debug;
case INFO:
return logger::info;
case WARN:
return logger::warn;
case ERROR:
default:
return logger::error;
}
}

void logIfLagging(WorkflowInstance instance) {
Duration executionLag = new Duration(instance.nextActivation, now());
if (executionLag.isLongerThan(standardMinutes(1))) {
Expand Down Expand Up @@ -275,8 +307,9 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder);
} catch (Exception ex) {
if (shutdownRequested.get()) {
logger.error("Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.",
instance.id, ex);
logger.error(
"Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.",
instance.id, ex);
// return the original instance since persisting failed
return instance;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.nflow.engine.workflow.definition;

import static org.slf4j.event.Level.ERROR;

import org.slf4j.event.Level;

/**
* Controls how an exception thrown by a state method should be handled by the workflow state processor.
*/
public class ExceptionHandling {
/**
* True when the state method processing should be retried.
*/
public final boolean isRetryable;
/**
* The log entry level for logging the exception.
*/
public final Level logLevel;
/**
* True when the exception stack trace of the exception should be logged. False to log only exception message.
*/
public final boolean logStackTrace;

ExceptionHandling(boolean isRetryable, Level logLevel, boolean logStackTrace) {
this.isRetryable = isRetryable;
this.logLevel = logLevel;
this.logStackTrace = logStackTrace;
}

/**
* Builder for exception handling settings.
*/
public static class Builder {
private boolean isRetryable = true;
private Level logLevel = ERROR;
private boolean logStackTrace = true;

/**
* Set if state method processing is retryable or not.
*
* @param isRetryable
* True is state method processing should be retried.
* @return This.
*/
public Builder setRetryable(boolean isRetryable) {
this.isRetryable = isRetryable;
return this;
}

/**
* Set the log entry level.
*
* @param logLevel
* The log entry level.
* @return This.
*/
public Builder setLogLevel(Level logLevel) {
this.logLevel = logLevel;
return this;
}

/**
* Set if exception stack trace should be logged or not.
*
* @param logStackTrace
* True to log the exception stack trace, false to log the exception message only.
* @return This.
*/
public Builder setLogStackTrace(boolean logStackTrace) {
this.logStackTrace = logStackTrace;
return this;
}

/**
* Create the exception handling object.
*
* @return Exception handling.
*/
public ExceptionHandling build() {
return new ExceptionHandling(isRetryable, logLevel, logStackTrace);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;

import org.joda.time.DateTime;
Expand Down Expand Up @@ -68,6 +69,10 @@ public class WorkflowSettings extends ModelObject {
* Default priority for new workflow instances.
*/
public final short defaultPriority;
/**
* Exception analyzer controls how an exception thrown by a state method should be handled.
*/
public final BiFunction<WorkflowState, Throwable, ExceptionHandling> exceptionAnalyzer;

WorkflowSettings(Builder builder) {
this.minErrorTransitionDelay = builder.minErrorTransitionDelay;
Expand All @@ -80,12 +85,12 @@ public class WorkflowSettings extends ModelObject {
this.historyDeletableAfter = builder.historyDeletableAfter;
this.deleteHistoryCondition = builder.deleteHistoryCondition;
this.defaultPriority = builder.defaultPriority;
this.exceptionAnalyzer = builder.exceptionAnalyzer;
}

/**
* Builder for workflow settings.
*/
@SuppressFBWarnings(value = "MDM_RANDOM_SEED", justification = "Random does not need to be secure")
public static class Builder {

int maxErrorTransitionDelay = (int) DAYS.toMillis(1);
Expand All @@ -98,13 +103,19 @@ public static class Builder {
ReadablePeriod historyDeletableAfter;
short defaultPriority = 0;
BooleanSupplier deleteHistoryCondition = onAverageEveryNthExecution(100);
// TODO: replace state.isRetryAllowed(thrown) with !thrown.getClass().isAnnotationPresent(NonRetryable.class) in the next
// major release
@SuppressWarnings("deprecation")
BiFunction<WorkflowState, Throwable, ExceptionHandling> exceptionAnalyzer = (state, thrown) -> new ExceptionHandling.Builder()
.setRetryable(state.isRetryAllowed(thrown)).build();

/**
* Returns true randomly every n:th time.
*
* @param n The frequency of returning true.
* @return Producer of boolean values
*/
@SuppressFBWarnings(value = "MDM_RANDOM_SEED", justification = "Random does not need to be secure here")
public static BooleanSupplier onAverageEveryNthExecution(int n) {
return () -> ThreadLocalRandom.current().nextInt(n) == 0;
}
Expand Down Expand Up @@ -251,6 +262,18 @@ public Builder setDefaultPriority(short defaultPriority) {
return this;
}

/**
* Set the exception analyzer function.
*
* @param exceptionAnalyzer
* The exception analyzer function.
* @return this.
*/
public Builder setExceptionAnalyzer(BiFunction<WorkflowState, Throwable, ExceptionHandling> exceptionAnalyzer) {
this.exceptionAnalyzer = exceptionAnalyzer;
return this;
}

/**
* Create workflow settings object.
*
Expand Down Expand Up @@ -330,5 +353,4 @@ public boolean deleteWorkflowInstanceHistory() {
public Short getDefaultPriority() {
return defaultPriority;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ default String getDescription() {
* @param thrown
* The thrown exception.
* @return True if the state can be retried.
* @deprecated This will be removed in the next major release. Use new WorkflowSettings.Builder().setExceptionAnalyzer(...)
* instead.
*/
@Deprecated
default boolean isRetryAllowed(Throwable thrown) {
return !thrown.getClass().isAnnotationPresent(NonRetryable.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import io.nflow.engine.service.WorkflowInstanceInclude;
import io.nflow.engine.service.WorkflowInstanceService;
import io.nflow.engine.workflow.curated.BulkWorkflow;
import io.nflow.engine.workflow.definition.ExceptionHandling;
import io.nflow.engine.workflow.definition.Mutable;
import io.nflow.engine.workflow.definition.NextAction;
import io.nflow.engine.workflow.definition.StateExecution;
Expand Down Expand Up @@ -1290,7 +1291,8 @@ public NextAction start(StateExecution execution) {
public static class NonRetryableWorkflow extends WorkflowDefinition<NonRetryableWorkflow.State> {

protected NonRetryableWorkflow() {
super("non-retryable", State.start, State.end);
super("non-retryable", State.start, State.end, new WorkflowSettings.Builder()
.setExceptionAnalyzer((s, t) -> new ExceptionHandling.Builder().setRetryable(false).build()).build());
}

public static enum State implements WorkflowState {
Expand All @@ -1306,11 +1308,6 @@ private State(WorkflowStateType stateType) {
public WorkflowStateType getType() {
return stateType;
}

@Override
public boolean isRetryAllowed(Throwable thrown) {
return false;
}
}

public NextAction start(@SuppressWarnings("unused") StateExecution execution) {
Expand Down
Loading

0 comments on commit 6bc5f0e

Please sign in to comment.