Skip to content

Commit

Permalink
Changed the Hello World sample to return a value from the workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
aquesnel committed Apr 27, 2015
1 parent eb574b8 commit 87a80b5
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ public interface HelloWorldActivities {

@Activity(name = "PrintHello", version = "1.0")
@ActivityRegistrationOptions(defaultTaskScheduleToStartTimeoutSeconds = 30, defaultTaskStartToCloseTimeoutSeconds = 10)
void printHello(String name);
String printHello(String name);

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
public class HelloWorldActivitiesImpl implements HelloWorldActivities {

@Override
public void printHello(String name) {
System.out.println("Hello " + name + "!");
public String printHello(String name) {
String msg = "Hello " + name + "!";
System.out.println(msg);
return msg;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.amazonaws.services.simpleworkflow.flow.examples.helloworld;

public final class HelloWorldResult
{
private String user;
private String result;

public String getUser()
{
return user;
}

public void setUser(String user)
{
this.user = user;
}

public String getResult()
{
return result;
}

public void setResult(String result)
{
this.result = result;
}

@Override
public String toString()
{
return "HelloWorldResult [user=" + user + ", result=" + result + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ private final class TestHelloWorldActivities implements HelloWorldActivities {
private String greeting;

@Override
public void printHello(String name) {
public String printHello(final String name)
{
greeting = "Hello " + name + "!";
return greeting;
}

public String getGreeting() {
Expand Down Expand Up @@ -66,25 +68,33 @@ public void setUp() throws Exception {
*/
@Test
public void testThroughClient() throws Exception {
HelloWorldWorkflowClient workflow = workflowFactory.getClient();
Promise<Void> done = workflow.helloWorld("World");
assertGreeting(done);
final HelloWorldWorkflowClient workflow = workflowFactory.getClient();
final Promise<HelloWorldResult> done = workflow.helloWorld("World");
assertGreetingWorld(done);
}

@Asynchronous
private void assertGreetingWorld(final Promise<HelloWorldResult> done)
{
assertGreeting("World", done);
}

@Asynchronous
private void assertGreeting(Promise<Void> done) {
Assert.assertEquals("Hello World!", activitiesImplementation.getGreeting());
private void assertGreeting(String expected, final Promise<HelloWorldResult> done)
{
Assert.assertEquals("Hello " + expected + "!", activitiesImplementation.getGreeting());
Assert.assertEquals("Hello " + expected + "!", done.get());
}

@Test
public void testThroughClientAssertWithTask() throws Exception {
HelloWorldWorkflowClient workflow = workflowFactory.getClient();
Promise<Void> done = workflow.helloWorld("AWS");
final HelloWorldWorkflowClient workflow = workflowFactory.getClient();
final Promise<HelloWorldResult> done = workflow.helloWorld("AWS");
new Task(done) {

@Override
protected void doExecute() throws Throwable {
Assert.assertEquals("Hello AWS!", activitiesImplementation.getGreeting());
assertGreeting("AWS", done);
}
};
}
Expand All @@ -100,7 +110,7 @@ public void directTest() {

@Override
protected void doTry() throws Throwable {
// helloWorld returns void so we use TryFinally
// helloWorld returns void so we use TryFinally
// to wait for its completion
workflow.helloWorld("SWF");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.amazonaws.services.simpleworkflow.flow.annotations.Execute;
import com.amazonaws.services.simpleworkflow.flow.annotations.Workflow;
import com.amazonaws.services.simpleworkflow.flow.annotations.WorkflowRegistrationOptions;
import com.amazonaws.services.simpleworkflow.flow.core.Promise;

/**
* Contract of the hello world workflow
Expand All @@ -26,6 +27,6 @@
public interface HelloWorldWorkflow {

@Execute(version = "1.0")
void helloWorld(String name);
Promise<HelloWorldResult> helloWorld(String name);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
*/
package com.amazonaws.services.simpleworkflow.flow.examples.helloworld;

import com.amazonaws.services.simpleworkflow.flow.annotations.Asynchronous;
import com.amazonaws.services.simpleworkflow.flow.core.Promise;
import com.amazonaws.services.simpleworkflow.flow.core.Settable;

/**
* Implementation of the hello world workflow
Expand All @@ -23,8 +26,22 @@ public class HelloWorldWorkflowImpl implements HelloWorldWorkflow{
HelloWorldActivitiesClient client = new HelloWorldActivitiesClientImpl();

@Override
public void helloWorld(String name) {
client.printHello(name);
public Promise<HelloWorldResult> helloWorld(final String name)
{
final Promise<String> result = client.printHello(name);
return makeResult(name, result);
}


@Asynchronous
private Promise<HelloWorldResult> makeResult(String name, Promise<String> printResult)
{
HelloWorldResult helloWorldResult = new HelloWorldResult();
helloWorldResult.setResult(printResult.get());
helloWorldResult.setUser(name);

Settable<HelloWorldResult> result = new Settable<HelloWorldResult>();
result.set(helloWorldResult);
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,141 @@

import com.amazonaws.services.simpleworkflow.AmazonSimpleWorkflow;
import com.amazonaws.services.simpleworkflow.flow.examples.common.ConfigHelper;
import com.amazonaws.services.simpleworkflow.model.DescribeWorkflowExecutionRequest;
import com.amazonaws.services.simpleworkflow.model.GetWorkflowExecutionHistoryRequest;
import com.amazonaws.services.simpleworkflow.model.History;
import com.amazonaws.services.simpleworkflow.model.HistoryEvent;
import com.amazonaws.services.simpleworkflow.model.WorkflowExecution;
import com.amazonaws.services.simpleworkflow.model.WorkflowExecutionCompletedEventAttributes;
import com.amazonaws.services.simpleworkflow.model.WorkflowExecutionInfo;

public class WorkflowExecutionStarter {

private static final String COMPLETED = "COMPLETED";
private static final String CLOSED = "CLOSED";

private static final int MAX_RETRIES = 1000;
private static final int BASE_SINGLE_RETRY_WAIT_MILLISECONDS = 1000;
private static final int MAX_SINGLE_RETRY_WAIT_MILLISECONDS = 5 * 60 * 1000;

private static ConfigHelper configHelper;
private static AmazonSimpleWorkflow swfService;

public static void main(String[] args) throws Exception {
ConfigHelper configHelper = ConfigHelper.createConfig();
AmazonSimpleWorkflow swfService = configHelper.createSWFClient();
String domain = configHelper.getDomain();
configHelper = ConfigHelper.createConfig();
swfService = configHelper.createSWFClient();

HelloWorldWorkflowClientExternalFactory clientFactory = new HelloWorldWorkflowClientExternalFactoryImpl(swfService,
domain);
HelloWorldWorkflowClientExternalFactory clientFactory =
new HelloWorldWorkflowClientExternalFactoryImpl(
swfService, configHelper.getDomain());
HelloWorldWorkflowClientExternal workflow = clientFactory.getClient();

// Start Wrokflow Execution
// Start Workflow Execution
workflow.helloWorld("User");

// WorkflowExecution is available after workflow creation
WorkflowExecution workflowExecution = workflow.getWorkflowExecution();
System.out.println("Started helloWorld workflow with workflowId=\"" + workflowExecution.getWorkflowId()
System.out.println("Started helloWorld workflow with workflowId=\""
+ workflowExecution.getWorkflowId()
+ "\" and runId=\"" + workflowExecution.getRunId() + "\"");

boolean workflowComplete = pollWorkflowStatusUntilComplete(workflowExecution);

if (workflowComplete)
{
String result = getWorkflowResult(workflowExecution);
HelloWorldResult helloWorldResult =
workflow.getDataConverter().fromData(result, HelloWorldResult.class);
System.out.println("Workflow result: " + helloWorldResult);
}
else
{
System.out.println("There is no result because the workflow failed.");
}
}

private static String getWorkflowResult(WorkflowExecution workflowExecution)
{
String nextPageToken = null;

GetWorkflowExecutionHistoryRequest historyRequest =
new GetWorkflowExecutionHistoryRequest();
historyRequest.setDomain(configHelper.getDomain());
historyRequest.setExecution(workflowExecution);
historyRequest.setReverseOrder(true);

do
{
historyRequest.setNextPageToken(nextPageToken);
History workflowExecutionHistory =
swfService.getWorkflowExecutionHistory(historyRequest);
nextPageToken = workflowExecutionHistory.getNextPageToken();

for (HistoryEvent event : workflowExecutionHistory.getEvents())
{
WorkflowExecutionCompletedEventAttributes workflowCompleteAttributes =
event.getWorkflowExecutionCompletedEventAttributes();
if (null != workflowCompleteAttributes)
{
return workflowCompleteAttributes.getResult();
}
}
} while (null != nextPageToken);

throw new IllegalStateException("Workflow complete event not found.");
}

/**
* Polls for workflow until workflow is closed or until all retries are exhausted
* @param id the ID of the workflow to poll
* @return boolean status indicating whether the workflow completed successfully or not
* @throws InterruptedException if an exception occurs during thread sleep
*/
private static boolean pollWorkflowStatusUntilComplete(final WorkflowExecution execution)
throws InterruptedException
{
DescribeWorkflowExecutionRequest request = new DescribeWorkflowExecutionRequest();
request.setExecution(execution);
request.setDomain(configHelper.getDomain());

int requestCount = 0;
boolean success = false;
// Default to a non-passing status
String status = "unknown";

while (requestCount <= MAX_RETRIES)
{
requestCount++;
System.out.println("Getting workflow status attempt " + Integer.toString(requestCount)
+ " of " + Integer.toString(MAX_RETRIES));
WorkflowExecutionInfo info =
swfService.describeWorkflowExecution(request).getExecutionInfo();
System.out.println("Workflow execution status is " + info.getExecutionStatus());

if (CLOSED.equals(info.getExecutionStatus()))
{
status = info.getCloseStatus();
break;
}
else
{
// Exponential backoff algorithm
long sleepTime = Math.min(
MAX_SINGLE_RETRY_WAIT_MILLISECONDS,
BASE_SINGLE_RETRY_WAIT_MILLISECONDS
* Math.round(Math.pow(2, requestCount - 1)));

System.out.println("Waiting " + Long.toString(sleepTime)
+ " milliseconds before retrying.");
Thread.sleep(sleepTime);
}
}

System.out.println("Workflow status: " + status);
if (COMPLETED.equals(status))
{
success = true;
}
return success;
}
}

0 comments on commit 87a80b5

Please sign in to comment.