Skip to content

Commit

Permalink
TemporalFailure thrown from Signal method now fails Workflow Execution (
Browse files Browse the repository at this point in the history
  • Loading branch information
Spikhalskiy committed Jan 25, 2023
1 parent c2afb32 commit 301faf7
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 55 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ ext {
protoVersion = '3.21.12' // [3.10.0,)
annotationApiVersion = '1.3.2'
guavaVersion = '31.1-jre' // [10.0,)
tallyVersion = '0.11.1' // [0.4.0,)
tallyVersion = '0.12.0' // [0.4.0,)

gsonVersion = '2.10.1' // [2.0,)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class SyncWorkflow implements ReplayWorkflow {
private final WorkflowMethodThreadNameStrategy workflowMethodThreadNameStrategy =
ExecutionInfoStrategy.INSTANCE;
private final SyncWorkflowContext workflowContext;
private WorkflowExecuteRunnable workflowProc;
private WorkflowExecutionHandler workflowProc;
private DeterministicRunner runner;

public SyncWorkflow(
Expand Down Expand Up @@ -100,7 +100,7 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
this.workflowContext.setReplayContext(context);

workflowProc =
new WorkflowExecuteRunnable(
new WorkflowExecutionHandler(
workflowContext, workflow, startEvent, workflowImplementationOptions);
// The following order is ensured by this code and DeterministicRunner implementation:
// 1. workflow.initialize
Expand All @@ -113,7 +113,7 @@ public void start(HistoryEvent event, ReplayWorkflowContext context) {
() -> {
workflow.initialize();
WorkflowInternal.newWorkflowMethodThread(
() -> workflowProc.run(),
() -> workflowProc.runWorkflowMethod(),
workflowMethodThreadNameStrategy.createThreadName(
context.getWorkflowExecution()))
.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WorkflowExecuteRunnable implements Runnable {
class WorkflowExecutionHandler {

private static final Logger log = LoggerFactory.getLogger(WorkflowExecuteRunnable.class);
private static final Logger log = LoggerFactory.getLogger(WorkflowExecutionHandler.class);

private final SyncWorkflowContext context;
private final SyncWorkflowDefinition workflow;
Expand All @@ -52,7 +52,7 @@ class WorkflowExecuteRunnable implements Runnable {
private Optional<Payloads> output = Optional.empty();
private boolean done;

public WorkflowExecuteRunnable(
public WorkflowExecutionHandler(
SyncWorkflowContext context,
SyncWorkflowDefinition workflow,
WorkflowExecutionStartedEventAttributes attributes,
Expand All @@ -63,29 +63,13 @@ public WorkflowExecuteRunnable(
this.attributes = Objects.requireNonNull(attributes);
}

@Override
public void run() {
public void runWorkflowMethod() {
try {
Optional<Payloads> input =
attributes.hasInput() ? Optional.of(attributes.getInput()) : Optional.empty();
output = workflow.execute(new Header(attributes.getHeader()), input);
} catch (Throwable e) {
if (e instanceof DestroyWorkflowThreadError) {
throw (DestroyWorkflowThreadError) e;
}
Throwable exception = unwrap(e);

Class<? extends Throwable>[] failTypes =
implementationOptions.getFailWorkflowExceptionTypes();
if (exception instanceof TemporalFailure) {
throwAndFailWorkflowExecution(exception);
}
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(exception.getClass())) {
throwAndFailWorkflowExecution(exception);
}
}
throw wrap(exception);
applyWorkflowFailurePolicyAndRethrow(e);
} finally {
done = true;
}
Expand All @@ -104,13 +88,36 @@ public Optional<Payloads> getOutput() {
public void close() {}

public void handleSignal(String signalName, Optional<Payloads> input, long eventId) {
context.handleSignal(signalName, input, eventId);
try {
context.handleSignal(signalName, input, eventId);
} catch (Throwable e) {
applyWorkflowFailurePolicyAndRethrow(e);
}
}

public Optional<Payloads> handleQuery(String type, Optional<Payloads> args) {
return context.handleQuery(type, args);
}

private void applyWorkflowFailurePolicyAndRethrow(Throwable e) {
if (e instanceof DestroyWorkflowThreadError) {
throw (DestroyWorkflowThreadError) e;
}
Throwable exception = unwrap(e);

Class<? extends Throwable>[] failTypes = implementationOptions.getFailWorkflowExceptionTypes();
if (exception instanceof TemporalFailure) {
throwAndFailWorkflowExecution(exception);
}
for (Class<? extends Throwable> failType : failTypes) {
if (failType.isAssignableFrom(exception.getClass())) {
throwAndFailWorkflowExecution(exception);
}
}

throw wrap(exception);
}

private void throwAndFailWorkflowExecution(Throwable exception) {
ReplayWorkflowContext replayWorkflowContext = context.getReplayContext();
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@

package io.temporal.workflow.activityTests;

import static org.junit.Assert.assertThrows;

import io.temporal.activity.ActivityOptions;
import io.temporal.api.enums.v1.RetryState;
import io.temporal.client.WorkflowException;
import io.temporal.client.WorkflowStub;
import io.temporal.common.RetryOptions;
import io.temporal.failure.ActivityFailure;
import io.temporal.failure.ApplicationFailure;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
import io.temporal.workflow.shared.TestWorkflows.TestSignaledWorkflow;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
import java.io.IOException;
import java.time.Duration;
Expand All @@ -44,49 +48,95 @@ public class ActivityApplicationFailureNonRetryableTest {
@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestActivityApplicationFailureNonRetryable.class)
.setWorkflowTypes(
ActivityCalledFromWorkflowMethodWorkflowImpl.class,
ActivityCalledFromSignalMethodWorkflowImpl.class)
.setActivityImplementations(activitiesImpl)
.build();

@Test
public void testActivityApplicationFailureNonRetryable() {
public void testActivityApplicationFailureNonRetryable_workflowMethod() {
TestWorkflow1 workflowStub =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
try {
workflowStub.execute(testWorkflowRule.getTaskQueue());
Assert.fail("unreachable");
} catch (WorkflowException e) {
Assert.assertTrue(e.getCause() instanceof ActivityFailure);
Assert.assertTrue(e.getCause().getCause() instanceof ApplicationFailure);
Assert.assertEquals(
"java.io.IOException", ((ApplicationFailure) e.getCause().getCause()).getType());
Assert.assertEquals(
RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
((ActivityFailure) e.getCause()).getRetryState());
}

verifyWorkflowFailure(
assertThrows(
WorkflowException.class, () -> workflowStub.execute(testWorkflowRule.getTaskQueue())));
}

@Test
public void testActivityApplicationFailureNonRetryable_signal() {
TestSignaledWorkflow workflow =
testWorkflowRule.newWorkflowStubTimeoutOptions(TestSignaledWorkflow.class);
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
workflowStub.start();
workflow.signal("");
verifyWorkflowFailure(
assertThrows(WorkflowException.class, () -> workflowStub.getResult(String.class)));
}

/**
* Failure of the workflow should be exactly the same, doesn't matter of the activity failure
* happened in the workflow method or in the signal handler.
*/
private void verifyWorkflowFailure(WorkflowException e) {
Assert.assertTrue(e.getCause() instanceof ActivityFailure);
Assert.assertTrue(e.getCause().getCause() instanceof ApplicationFailure);
Assert.assertEquals(
"java.io.IOException", ((ApplicationFailure) e.getCause().getCause()).getType());
Assert.assertEquals(
RetryState.RETRY_STATE_NON_RETRYABLE_FAILURE,
((ActivityFailure) e.getCause()).getRetryState());
Assert.assertEquals(activitiesImpl.toString(), 1, activitiesImpl.invocations.size());
}

public static class TestActivityApplicationFailureNonRetryable implements TestWorkflow1 {
public static class ActivityCalledFromWorkflowMethodWorkflowImpl implements TestWorkflow1 {

private VariousTestActivities activities;
private final VariousTestActivities activities =
Workflow.newActivityStub(
VariousTestActivities.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
.setStartToCloseTimeout(Duration.ofSeconds(1))
.setRetryOptions(
RetryOptions.newBuilder()
.setMaximumInterval(Duration.ofSeconds(1))
.setDoNotRetry(IOException.class.getName())
.build())
.build());

@Override
public String execute(String taskQueue) {
ActivityOptions options =
ActivityOptions.newBuilder()
.setTaskQueue(taskQueue)
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
.setStartToCloseTimeout(Duration.ofSeconds(1))
.setRetryOptions(
RetryOptions.newBuilder()
.setMaximumInterval(Duration.ofSeconds(1))
.setDoNotRetry(IOException.class.getName())
.build())
.build();
activities = Workflow.newActivityStub(VariousTestActivities.class, options);
activities.throwIO();
return "ignored";
}
}

public static class ActivityCalledFromSignalMethodWorkflowImpl implements TestSignaledWorkflow {

private final VariousTestActivities activities =
Workflow.newActivityStub(
VariousTestActivities.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
.setStartToCloseTimeout(Duration.ofSeconds(1))
.setRetryOptions(
RetryOptions.newBuilder()
.setMaximumInterval(Duration.ofSeconds(1))
.setDoNotRetry(IOException.class.getName())
.build())
.build());

@Override
public String execute() {
Workflow.await(() -> false);
activities.throwIO();
return "ignored";
}

@Override
public void signal(String arg) {
activities.throwIO();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void setUp() throws Exception {
workflowCodeExecutionCount.set(0);

Logger workflowExecuteRunnableLogger =
(Logger) LoggerFactory.getLogger("io.temporal.internal.sync.WorkflowExecuteRunnable");
(Logger) LoggerFactory.getLogger("io.temporal.internal.sync.WorkflowExecutionHandler");
workflowExecuteRunnableLoggerAppender.start();
workflowExecuteRunnableLogger.addAppender(workflowExecuteRunnableLoggerAppender);
}
Expand Down

0 comments on commit 301faf7

Please sign in to comment.