Skip to content

Commit

Permalink
Add and improve handling workflow uncompleted status return (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Feb 6, 2023
1 parent ae0386b commit 2ec5b62
Show file tree
Hide file tree
Showing 14 changed files with 452 additions and 20 deletions.
9 changes: 7 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ See [samples](https://github.com/indeedeng/iwf-java-samples) for how to use this
## Gradle
```gradle
// https://mvnrepository.com/artifact/io.iworkflow/iwf-java-sdk
implementation 'io.iworkflow:iwf-java-sdk:1.2.+'
implementation 'io.iworkflow:iwf-java-sdk:1.3.+'
```
## Maven
```
<!-- https://mvnrepository.com/artifact/io.iworkflow/iwf-java-sdk -->
<dependency>
<groupId>io.iworkflow</groupId>
<artifactId>iwf-java-sdk</artifactId>
<version>1.2.+</version>
<version>1.3.+</version>
<type>pom</type>
</dependency>
Expand Down Expand Up @@ -88,6 +88,11 @@ Run the command `git submodule update --remote --merge` to update IDL to the lat
- [x] Skip timer API for testing/operation
- [x] Decider trigger type: any command combination

## 1.3

- [x] Support failing workflow with results
- [x] Improve workflow uncompleted error return(canceled, failed, timeout, terminated)

## Future

- [ ] WaitForMoreResults in StateDecision
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ signing {
}

group = "io.iworkflow"
version = "1.2.7"
version = "1.3.0"

nexusPublishing {
repositories {
Expand Down
2 changes: 1 addition & 1 deletion iwf-idl
26 changes: 18 additions & 8 deletions src/main/java/io/iworkflow/core/StateDecision.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@ public abstract class StateDecision {

public static final StateDecision DEAD_END = ImmutableStateDecision.builder().build();

public static final StateDecision FORCE_FAILING_WORKFLOW = ImmutableStateDecision.builder()
.nextStates(Arrays.asList(StateMovement.FORCE_FAILING_WORKFLOW_MOVEMENT))
.build();

public static ImmutableStateDecision.Builder builder() {
return ImmutableStateDecision.builder();
}
Expand All @@ -27,15 +23,15 @@ public static StateDecision gracefulCompleteWorkflow(final Object output) {
)).build();
}

public static StateDecision forceCompleteWorkflow(final Object output) {
public static StateDecision gracefulCompleteWorkflow() {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
StateMovement.forceCompleteWorkflow(output)
StateMovement.gracefulCompleteWorkflow()
)).build();
}

public static StateDecision gracefulCompleteWorkflow() {
public static StateDecision forceCompleteWorkflow(final Object output) {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
StateMovement.gracefulCompleteWorkflow()
StateMovement.forceCompleteWorkflow(output)
)).build();
}

Expand All @@ -45,6 +41,20 @@ public static StateDecision forceCompleteWorkflow() {
)).build();
}

public static StateDecision forceFailWorkflow(final Object output) {
return ImmutableStateDecision.builder().nextStates(Arrays.asList(
StateMovement.forceFailWorkflow(output)
)).build();
}

public static StateDecision forceFailWorkflow() {
return FORCE_FAILING_WORKFLOW;
}

public static final StateDecision FORCE_FAILING_WORKFLOW = ImmutableStateDecision.builder()
.nextStates(Arrays.asList(StateMovement.FORCE_FAILING_WORKFLOW_MOVEMENT))
.build();

public static StateDecision singleNextState(final Class<? extends WorkflowState> stateClass) {
return singleNextState(stateClass.getSimpleName());
}
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/io/iworkflow/core/StateMovement.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,34 @@ public abstract class StateMovement {
private final static String GRACEFUL_COMPLETING_WORKFLOW_STATE_ID = "_SYS_GRACEFUL_COMPLETING_WORKFLOW";
private final static String FORCE_COMPLETING_WORKFLOW_STATE_ID = "_SYS_FORCE_COMPLETING_WORKFLOW";
private final static String FORCE_FAILING_WORKFLOW_STATE_ID = "_SYS_FORCE_FAILING_WORKFLOW";
public static final StateMovement FORCE_FAILING_WORKFLOW_MOVEMENT = ImmutableStateMovement.builder().stateId(FORCE_FAILING_WORKFLOW_STATE_ID).build();

public static StateMovement gracefulCompleteWorkflow() {
return ImmutableStateMovement.builder().stateId(GRACEFUL_COMPLETING_WORKFLOW_STATE_ID)
.build();
}

public static StateMovement gracefulCompleteWorkflow(final Object output) {
return ImmutableStateMovement.builder().stateId(GRACEFUL_COMPLETING_WORKFLOW_STATE_ID)
.stateInput(output)
.build();
}

public static StateMovement forceCompleteWorkflow(final Object output) {
public static StateMovement forceCompleteWorkflow() {
return ImmutableStateMovement.builder().stateId(FORCE_COMPLETING_WORKFLOW_STATE_ID)
.stateInput(output)
.build();
}

public static StateMovement gracefulCompleteWorkflow() {
return ImmutableStateMovement.builder().stateId(GRACEFUL_COMPLETING_WORKFLOW_STATE_ID)
public static StateMovement forceCompleteWorkflow(final Object output) {
return ImmutableStateMovement.builder().stateId(FORCE_COMPLETING_WORKFLOW_STATE_ID)
.stateInput(output)
.build();
}

public static StateMovement forceCompleteWorkflow() {
return ImmutableStateMovement.builder().stateId(FORCE_COMPLETING_WORKFLOW_STATE_ID)
public static final StateMovement FORCE_FAILING_WORKFLOW_MOVEMENT = ImmutableStateMovement.builder().stateId(FORCE_FAILING_WORKFLOW_STATE_ID).build();

public static StateMovement forceFailWorkflow(final Object output) {
return ImmutableStateMovement.builder().stateId(FORCE_FAILING_WORKFLOW_STATE_ID)
.stateInput(output)
.build();
}

Expand Down
20 changes: 19 additions & 1 deletion src/main/java/io/iworkflow/core/UnregisteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.iworkflow.gen.models.WorkflowStartOptions;
import io.iworkflow.gen.models.WorkflowStartRequest;
import io.iworkflow.gen.models.WorkflowStartResponse;
import io.iworkflow.gen.models.WorkflowStatus;
import io.iworkflow.gen.models.WorkflowStopRequest;

import java.util.List;
Expand Down Expand Up @@ -162,19 +163,32 @@ public <T> T getSimpleWorkflowResultWithWait(
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}

if (workflowGetResponse.getWorkflowStatus() != WorkflowStatus.COMPLETED) {
throwUncompletedException(workflowGetResponse);
}

if (workflowGetResponse.getResults() == null || workflowGetResponse.getResults().size() == 0) {
return null;
}

String checkErrorMessage = "this workflow should have one or zero state output for using this API";
Preconditions.checkNotNull(workflowGetResponse.getResults(), checkErrorMessage);
Preconditions.checkArgument(workflowGetResponse.getResults().size() == 1, checkErrorMessage);
//Preconditions.checkNotNull(workflowGetResponse.getResults().get(0).getCompletedStateOutput(), checkErrorMessage);

final StateCompletionOutput output = workflowGetResponse.getResults().get(0);
return clientOptions.getObjectEncoder().decode(output.getCompletedStateOutput(), valueClass);
}

private void throwUncompletedException(final WorkflowGetResponse workflowGetResponse) {
throw new WorkflowUncompletedException(
workflowGetResponse.getWorkflowRunId(),
workflowGetResponse.getWorkflowStatus(),
workflowGetResponse.getErrorType(),
workflowGetResponse.getErrorMessage(),
workflowGetResponse.getResults(),
this.clientOptions.getObjectEncoder());
}

public <T> T getSimpleWorkflowResultWithWait(
Class<T> valueClass,
final String workflowId) {
Expand All @@ -199,6 +213,10 @@ public List<StateCompletionOutput> getComplexWorkflowResultWithWait(
.workflowRunId(workflowRunId)
);

if (workflowGetResponse.getWorkflowStatus() != WorkflowStatus.COMPLETED) {
throwUncompletedException(workflowGetResponse);
}

return workflowGetResponse.getResults();
} catch (FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
Expand Down
57 changes: 57 additions & 0 deletions src/main/java/io/iworkflow/core/WorkflowUncompletedException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.iworkflow.core;

import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowErrorType;
import io.iworkflow.gen.models.WorkflowStatus;

import java.util.List;

public class WorkflowUncompletedException extends RuntimeException {
private final String runId;
private final WorkflowStatus closedStatus;
private final WorkflowErrorType errorType;
private final String errorMessage;
private final List<StateCompletionOutput> stateResults;
private final ObjectEncoder encoder;

public WorkflowUncompletedException(
final String runId, final WorkflowStatus closedStatus, final WorkflowErrorType errorType, final String errorMessage,
final List<StateCompletionOutput> stateResults, final ObjectEncoder encoder) {
this.runId = runId;
this.closedStatus = closedStatus;
this.errorType = errorType;
this.errorMessage = errorMessage;
this.stateResults = stateResults;
this.encoder = encoder;
}

public String getRunId() {
return runId;
}

public WorkflowStatus getClosedStatus() {
return closedStatus;
}

// Today, this only applies to FAILED as closedStatus to differentiate different failed types
public WorkflowErrorType getErrorSubType() {
return errorType;
}

public String getErrorMessage() {
return errorMessage;
}

public int getStateResultsSize() {
if (stateResults == null) {
return 0;
}
return stateResults.size();
}

public <T> T getStateResult(final int index, Class<T> type) {
final StateCompletionOutput output = stateResults.get(index);
return encoder.decode(output.getCompletedStateOutput(), type);
}

}
144 changes: 144 additions & 0 deletions src/test/java/io/iworkflow/integ/WorkflowUncompletedTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package io.iworkflow.integ;

import io.iworkflow.core.Client;
import io.iworkflow.core.ClientOptions;
import io.iworkflow.core.WorkflowUncompletedException;
import io.iworkflow.gen.models.WorkflowErrorType;
import io.iworkflow.gen.models.WorkflowStatus;
import io.iworkflow.integ.forcefail.ForceFailWorkflow;
import io.iworkflow.integ.signal.BasicSignalWorkflow;
import io.iworkflow.integ.stateapifail.StateApiFailWorkflow;
import io.iworkflow.integ.stateapitimeout.StateApiTimeoutFailWorkflow;
import io.iworkflow.spring.TestSingletonWorkerService;
import io.iworkflow.spring.controller.WorkflowRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;

public class WorkflowUncompletedTest {

@BeforeEach
public void setup() throws ExecutionException, InterruptedException {
TestSingletonWorkerService.startWorkerIfNotUp();
}

@Test
public void testWorkflowTimeout() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "testWorkflowTimeout" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
BasicSignalWorkflow.class, wfId, 1, input);

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
} catch (WorkflowUncompletedException e) {
Assertions.assertEquals(runId, e.getRunId());
Assertions.assertEquals(WorkflowStatus.TIMEOUT, e.getClosedStatus());
Assertions.assertNull(e.getErrorSubType());
Assertions.assertNull(e.getErrorMessage());
Assertions.assertEquals(0, e.getStateResultsSize());
return;
}
Assertions.fail("no exception caught");
}

@Test
public void testWorkflowCanceled() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final String wfId = "testWorkflowTimeout" + System.currentTimeMillis() / 1000;
final Integer input = 1;
final String runId = client.startWorkflow(
BasicSignalWorkflow.class, wfId, 10, input);

client.stopWorkflow(wfId, "");

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
} catch (WorkflowUncompletedException e) {
Assertions.assertEquals(runId, e.getRunId());
Assertions.assertEquals(WorkflowStatus.CANCELED, e.getClosedStatus());
Assertions.assertNull(e.getErrorSubType());
Assertions.assertNull(e.getErrorMessage());
Assertions.assertEquals(0, e.getStateResultsSize());
return;
}
Assertions.fail("no exception caught");
}

@Test
public void testForceFailWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final long startTs = System.currentTimeMillis();
final String wfId = "testForceFailWorkflow" + startTs / 1000;
final Integer input = 5;

final String runId = client.startWorkflow(
ForceFailWorkflow.class, wfId, 10, input);

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
} catch (WorkflowUncompletedException e) {
Assertions.assertEquals(runId, e.getRunId());
Assertions.assertEquals(WorkflowStatus.FAILED, e.getClosedStatus());
Assertions.assertEquals(WorkflowErrorType.STATE_DECISION_FAILING_WORKFLOW_ERROR_TYPE, e.getErrorSubType());
Assertions.assertNull(e.getErrorMessage());
Assertions.assertEquals(1, e.getStateResultsSize());
String out = e.getStateResult(0, String.class);
Assertions.assertEquals("a failing message", out);
return;
}
Assertions.fail("no exception caught");
}

@Test
public void testStateApiFailWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final long startTs = System.currentTimeMillis();
final String wfId = "testStateApiFailWorkflow" + startTs / 1000;
final Integer input = 5;

final String runId = client.startWorkflow(
StateApiFailWorkflow.class, wfId, 10, input);

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
} catch (WorkflowUncompletedException e) {
Assertions.assertEquals(runId, e.getRunId());
Assertions.assertEquals(WorkflowStatus.FAILED, e.getClosedStatus());
Assertions.assertEquals(WorkflowErrorType.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE, e.getErrorSubType());
Assertions.assertTrue(e.getErrorMessage().contains("/api/v1/workflowState/decide"));
Assertions.assertEquals(0, e.getStateResultsSize());
return;
}
Assertions.fail("no exception caught");
}

@Test
public void testStateApiTimeoutWorkflow() throws InterruptedException {
final Client client = new Client(WorkflowRegistry.registry, ClientOptions.localDefault);
final long startTs = System.currentTimeMillis();
final String wfId = "testStateApiTimeoutWorkflow" + startTs / 1000;
final Integer input = 5;

final String runId = client.startWorkflow(
StateApiTimeoutFailWorkflow.class, wfId, 10, input);

try {
client.getSimpleWorkflowResultWithWait(Integer.class, wfId);
} catch (WorkflowUncompletedException e) {
Assertions.assertEquals(runId, e.getRunId());
Assertions.assertEquals(WorkflowStatus.FAILED, e.getClosedStatus());
Assertions.assertEquals(WorkflowErrorType.STATE_API_FAIL_MAX_OUT_RETRY_ERROR_TYPE, e.getErrorSubType());
Assertions.assertTrue(
e.getErrorMessage().contains("activity StartToClose timeout"),
e.getErrorMessage()
);
Assertions.assertEquals(0, e.getStateResultsSize());
return;
}
Assertions.fail("no exception caught");
}
}
Loading

0 comments on commit 2ec5b62

Please sign in to comment.