Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for moving to a failure state without retries based on thrown exception and state #386

Merged
merged 17 commits into from
Mar 18, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion nflow-engine/src/main/java/io/nflow/engine/NflowEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import io.nflow.engine.config.NFlow;
import io.nflow.engine.internal.executor.WorkflowLifecycle;
import io.nflow.engine.internal.storage.db.SQLVariants;
import io.nflow.engine.service.MaintenanceService;
import io.nflow.engine.service.HealthCheckService;
import io.nflow.engine.service.MaintenanceService;
import io.nflow.engine.service.StatisticsService;
import io.nflow.engine.service.WorkflowDefinitionService;
import io.nflow.engine.service.WorkflowExecutorService;
Expand Down Expand Up @@ -47,8 +47,11 @@ public class NflowEngine implements AutoCloseable {
* started automatically. If nflow.autostart=false, then the thread can be started with start() method.
*
* @param dataSource
* nFlow database data source.
* @param sqlVariants
* SQL variants for the configured database type.
* @param workflowDefinitions
* The registered workflow definitions.
*/
public NflowEngine(DataSource dataSource, SQLVariants sqlVariants,
Collection<AbstractWorkflowDefinition<? extends WorkflowState>> workflowDefinitions) {
Expand Down Expand Up @@ -94,13 +97,17 @@ public void resume() {

/**
* Returns true if the nFlow engine is currently paused.
*
* @return True if engine is currently paused.
*/
public boolean isPaused() {
return workflowLifecycle.isPaused();
}

/**
* Returns true if the nFlow engine is currently running.
*
* @return True if engine is currently running.
*/
public boolean isRunning() {
return workflowLifecycle.isRunning();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,16 @@ private void runImpl() {
saveInstanceState = false;
} catch (Throwable t) {
execution.setFailed(t);
logger.error("Handler threw exception, trying again later.", t);
execution.setRetry(true);
execution.setNextState(state);
execution.setNextStateReason(getStackTrace(t));
execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition);
if (state.isRetryAllowed(t)) {
logger.error("Handler threw a retryable exception, trying again later.", t);
execution.setRetry(true);
execution.setNextState(state);
execution.setNextStateReason(getStackTrace(t));
execution.handleRetryAfter(definition.getSettings().getErrorTransitionActivation(execution.getRetries()), definition);
} else {
logger.error("Handler threw an exception and retrying is not allowed, going to failure state.", t);
execution.handleFailure(definition, "Handler threw an exception and retrying is not allowed");
}
} finally {
if (saveInstanceState) {
if (execution.isFailed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,42 +268,35 @@ public boolean isHistoryCleaningForced() {
return historyCleaningForced;
}

/**
* Handle retries for the state execution. Moves the workflow to a failure state after the maximum retry attempts is exceeded.
* If there is no failure state defined for the retried state, moves the workflow to the generic error state and stops
* processing. Error state handler method, if it exists, is not executed. If the maximum retry attempts is not exceeded,
* schedules the next attempt to the given activation time.
*
* @param activation
* Time for next retry attempt.
* @param definition
* Workflow definition
*/
public void handleRetryAfter(DateTime activation, AbstractWorkflowDefinition<?> definition) {
if (getRetries() >= definition.getSettings().maxRetries) {
setRetry(false);
isRetryCountExceeded = true;
String currentStateName = getCurrentStateName();
WorkflowState failureState = definition.getFailureTransitions().get(currentStateName);
WorkflowState currentState = definition.getState(currentStateName);
if (failureState != null) {
setNextState(failureState);
setNextStateReason("Max retry count exceeded, going to failure state");
setNextActivation(now());
} else {
WorkflowState errorState = definition.getErrorState();
setNextState(errorState);
if (errorState.equals(currentState)) {
setNextStateReason("Max retry count exceeded when handling error state, processing stopped");
setNextActivation(null);
} else {
setNextStateReason("Max retry count exceeded, no failure state defined, going to error state");
setNextActivation(now());
}
}
handleFailure(definition, "Max retry count exceeded");
} else {
setNextActivation(activation);
}
}

public void handleFailure(AbstractWorkflowDefinition<?> definition, String failureReason) {
setRetry(false);
String currentStateName = getCurrentStateName();
WorkflowState failureState = definition.getFailureTransitions().get(currentStateName);
WorkflowState currentState = definition.getState(currentStateName);
if (failureState != null) {
setNextState(failureState);
setNextStateReason(failureReason + ", going to failure state");
setNextActivation(now());
} else {
WorkflowState errorState = definition.getErrorState();
setNextState(errorState);
if (errorState.equals(currentState)) {
setNextStateReason(failureReason + " when handling error state, processing stopped");
setNextActivation(null);
} else {
setNextStateReason(failureReason + ", no failure state defined, going to error state");
setNextActivation(now());
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
public WorkflowStateType getType() {
return type;
}

@Override
public String getDescription() {
return name();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ public enum State implements io.nflow.engine.workflow.definition.WorkflowState {
public WorkflowStateType getType() {
return type;
}

@Override
public String getDescription() {
return name();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.nflow.engine.workflow.definition;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* When a state method throws an exception annotated with this annotation, it is not retried by nFlow engine. Instead, the
* workflow instance is moved to failure state.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface NonRetryable {
// marker interface
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ private void requireEnumValuesHaveMatchingMethod() {
}

/**
* Return all states of the workflow.
* @return Set of workflow states.
* {@inheritDoc}
*/
@Override
public Set<S> getStates() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,24 @@ public interface WorkflowState {
WorkflowStateType getType();

/**
* Return the description of the workflow state.
* Return the description of the workflow state. Default implementation returns {@link #name()}.
*
* @return The description.
*/
String getDescription();
default String getDescription() {
return name();
}

/**
* Return true if this state can be automatically retried after throwing an exception, or false if the workflow instance should
* move directly to failure state. Default implementation returns true if the throwable class is not annotated with
* {@code @NonRetryable}.
*
* @param thrown
* The thrown exception.
* @return True if the state can be retried.
*/
default boolean isRetryAllowed(Throwable thrown) {
return !thrown.getClass().isAnnotationPresent(NonRetryable.class);
}
}
Loading