Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
MWI-1082: add async complete option to tasks EVENT and HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
apau committed Apr 28, 2019
1 parent 8fc13b0 commit c598881
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ public void setTasks(List<WorkflowTask> tasks) {
@ProtoField(id = 21)
private List<String> defaultExclusiveJoinTask = new LinkedList<>();

@ProtoField(id = 23)
private Boolean asyncComplete = false;

/**
* @return the name
*/
Expand Down Expand Up @@ -413,7 +416,19 @@ public String getSink() {
public void setSink(String sink) {
this.sink = sink;
}


/**
*
* @return whether wait for an external event to complete the task, for EVENT and HTTP tasks
*/
public Boolean isAsyncComplete() {
return asyncComplete;
}

public void setAsyncComplete(Boolean asyncComplete) {
this.asyncComplete = asyncComplete;
}

/**
*
* @return If the task is optional. When set to true, the workflow execution continues even when the task is in failed status.
Expand Down Expand Up @@ -635,6 +650,7 @@ public boolean equals(Object o) {
Objects.equals(getSubWorkflowParam(), that.getSubWorkflowParam()) &&
Objects.equals(getJoinOn(), that.getJoinOn()) &&
Objects.equals(getSink(), that.getSink()) &&
Objects.equals(isAsyncComplete(), that.isAsyncComplete()) &&
Objects.equals(getDefaultExclusiveJoinTask(), that.getDefaultExclusiveJoinTask());
}

Expand All @@ -660,6 +676,7 @@ public int hashCode() {
getSubWorkflowParam(),
getJoinOn(),
getSink(),
isAsyncComplete(),
isOptional(),
getDefaultExclusiveJoinTask()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,11 @@ public void start(Workflow workflow, Task task, WorkflowExecutor executor) {
HttpResponse response = httpCall(input);
logger.info("response {}, {}", response.statusCode, response.body);
if(response.statusCode > 199 && response.statusCode < 300) {
task.setStatus(Status.COMPLETED);
if (isAsyncComplete(task)) {
task.setStatus(Status.IN_PROGRESS);
} else {
task.setStatus(Status.COMPLETED);
}
} else {
if(response.body != null) {
task.setReasonForIncompletion(response.body.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,28 @@
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import java.time.Instant;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.junit.*;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.mockito.Mockito;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -187,6 +191,33 @@ public void testFailure() {
assertEquals(HttpTask.MISSING_REQUEST, task.getReasonForIncompletion());
}

@Test
public void testPostAsyncComplete() {

Task task = new Task();
Input input = new Input();
input.setUri("http://localhost:7009/post");
Map<String, Object> body = new HashMap<>();
body.put("input_key1", "value1");
body.put("input_key2", 45.3d);
input.setBody(body);
input.setMethod("POST");
task.getInputData().put(HttpTask.REQUEST_PARAMETER_NAME, input);
task.getInputData().put("asyncComplete", true);

httpTask.start(workflow, task, workflowExecutor);
assertEquals(task.getReasonForIncompletion(), Status.IN_PROGRESS, task.getStatus());
Map<String, Object> hr = (Map<String, Object>) task.getOutputData().get("response");
Object response = hr.get("body");
assertEquals(Status.IN_PROGRESS, task.getStatus());
assertTrue("response is: " + response, response instanceof Map);
Map<String, Object> map = (Map<String, Object>) response;
Set<String> inputKeys = body.keySet();
Set<String> responseKeys = map.keySet();
inputKeys.containsAll(responseKeys);
responseKeys.containsAll(inputKeys);
}

@Test
public void testTextGET() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -49,9 +48,11 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
String taskId = taskMapperContext.getTaskId();

taskToSchedule.getInputParameters().put("sink", taskToSchedule.getSink());
taskToSchedule.getInputParameters().put("asyncComplete", taskToSchedule.isAsyncComplete());
Map<String, Object> eventTaskInput = parametersUtils.getTaskInputV2(taskToSchedule.getInputParameters(),
workflowInstance, taskId, null);
String sink = (String) eventTaskInput.get("sink");
Boolean asynComplete = (Boolean) eventTaskInput.get("asyncComplete");

Task eventTask = new Task();
eventTask.setTaskType(Event.NAME);
Expand All @@ -63,6 +64,7 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
eventTask.setScheduledTime(System.currentTimeMillis());
eventTask.setInputData(eventTaskInput);
eventTask.getInputData().put("sink", sink);
eventTask.getInputData().put("asyncComplete", asynComplete);
eventTask.setTaskId(taskId);
eventTask.setStatus(Task.Status.SCHEDULED);
eventTask.setWorkflowTask(taskToSchedule);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
logger.debug("TaskMapperContext {} in HTTPTaskMapper", taskMapperContext);

WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
taskToSchedule.getInputParameters().put("asyncComplete", taskToSchedule.isAsyncComplete());
Workflow workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();
int retryCount = taskMapperContext.getRetryCount();
Expand All @@ -74,6 +75,7 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
}));

Map<String, Object> input = parametersUtils.getTaskInputV2(taskToSchedule.getInputParameters(), workflowInstance, taskId, taskDefinition);
Boolean asynComplete = (Boolean)input.get("asyncComplete");

Task httpTask = new Task();
httpTask.setTaskType(taskToSchedule.getType());
Expand All @@ -85,6 +87,7 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws Ter
httpTask.setScheduledTime(System.currentTimeMillis());
httpTask.setTaskId(taskId);
httpTask.setInputData(input);
httpTask.getInputData().put("asyncComplete", asynComplete);
httpTask.setStatus(Task.Status.SCHEDULED);
httpTask.setRetryCount(retryCount);
httpTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class Event extends WorkflowSystemTask {
private final ObjectMapper objectMapper = new ObjectMapper();
private final ParametersUtils parametersUtils;
private final EventQueues eventQueues;
private boolean isAsync=false;

@Inject
public Event(EventQueues eventQueues, ParametersUtils parametersUtils) {
Expand Down Expand Up @@ -81,7 +82,11 @@ public void start(Workflow workflow, Task task, WorkflowExecutor provider) {
if(queue != null) {
queue.publish(Collections.singletonList(message));
task.getOutputData().putAll(payload);
task.setStatus(Status.COMPLETED);
if (isAsyncComplete(task)) {
task.setStatus(Status.IN_PROGRESS);
} else {
task.setStatus(Status.COMPLETED);
}
} else {
task.setReasonForIncompletion("No queue found to publish.");
task.setStatus(Status.FAILED);
Expand Down Expand Up @@ -140,6 +145,6 @@ ObservableQueue getQueue(Workflow workflow, Task task) {

@Override
public boolean isAsync() {
return false;
return this.isAsync;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
*/
package com.netflix.conductor.core.execution.tasks;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.WorkflowExecutor;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* @author Viren
*
Expand Down Expand Up @@ -79,6 +79,19 @@ public void cancel(Workflow workflow, Task task, WorkflowExecutor executor) {
public boolean isAsync() {
return false;
}

/**
*
* @return True to keep task in 'IN_PROGRESS' state, and 'COMPLETE' later by an external message.
*/
public boolean isAsyncComplete(Task task) {
if (task.getInputData().containsKey("asyncComplete")) {
Object result = task.getInputData().get("asyncComplete");
return (result == null)?false:(Boolean)result;
} else {
return false;
}
}

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,51 @@ public void testDynamicSinks() {

}

@Test
public void testAsyncComplete() throws Exception {
Workflow workflow = new Workflow();
workflow.setWorkflowDefinition(testWorkflowDefinition);

Task task = new Task();
task.getInputData().put("sink", "conductor");
task.getInputData().put("asyncComplete", true);
task.setReferenceTaskName("task0");
task.setTaskId("task_id_0");

QueueDAO dao = mock(QueueDAO.class);
String[] publishedQueue = new String[1];
List<Message> publishedMessages = new LinkedList<>();

doAnswer((Answer<Void>) invocation -> {
String queueName = invocation.getArgumentAt(0, String.class);
System.out.println(queueName);
publishedQueue[0] = queueName;
List<Message> messages = invocation.getArgumentAt(1, List.class);
publishedMessages.addAll(messages);
return null;
}).when(dao).push(any(), any());

doAnswer((Answer<List<String>>) invocation -> {
String messageId = invocation.getArgumentAt(1, String.class);
if(publishedMessages.get(0).getId().equals(messageId)) {
publishedMessages.remove(0);
return Collections.singletonList(messageId);
}
return null;
}).when(dao).remove(any(), any());

Map<String, EventQueueProvider> providers = new HashMap<>();
providers.put("conductor", new DynoEventQueueProvider(dao, new TestConfiguration()));
eventQueues = new EventQueues(providers, parametersUtils);
Event event = new Event(eventQueues, parametersUtils);
event.start(workflow, task, null);

assertEquals(Status.IN_PROGRESS, task.getStatus());
assertNotNull(task.getOutputData());
assertEquals("conductor:" + workflow.getWorkflowName() + ":" + task.getReferenceTaskName(), task.getOutputData().get("event_produced"));
assertEquals(task.getOutputData().get("event_produced"), "conductor:" + publishedQueue[0]);
assertEquals(1, publishedMessages.size());
assertEquals(task.getTaskId(), publishedMessages.get(0).getId());
assertNotNull(publishedMessages.get(0).getPayload());
}
}
3 changes: 3 additions & 0 deletions docs/docs/configuration/systask.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ Event task provides ability to publish an event (message) to either Conductor or
|name|description|
|---|---|
| sink |Qualified name of the event that is produced. e.g. conductor or sqs:sqs_queue_name|
| asyncComplete |```false``` to mark status COMPLETED upon execution ; ```true``` to keep it IN_PROGRESS, wait for an external event (via Conductor or SQS or EventHandler) to complete it.


**Example**

``` json
{
"sink": "sqs:example_sqs_queue_name"
"asyncComplete": false
}
```

Expand Down Expand Up @@ -121,6 +123,7 @@ The task expects an input parameter named ```http_request``` as part of the task
|headers|A map of additional http headers to be sent along with the request.|
|body|Request body|
|vipAddress|When using discovery based service URLs.|
| asyncComplete |```false``` to mark status COMPLETED upon execution ; ```true``` to keep it IN_PROGRESS, wait for an external event (via Conductor or SQS or EventHandler) to complete it.

**HTTP Task Output**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,9 @@ public WorkflowTaskPb.WorkflowTask toProto(WorkflowTask from) {
to.setRateLimited( from.isRateLimited() );
}
to.addAllDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTask() );
if (from.isAsyncComplete() != null) {
to.setAsyncComplete( from.isAsyncComplete() );
}
return to.build();
}

Expand Down Expand Up @@ -1211,6 +1214,7 @@ public WorkflowTask fromProto(WorkflowTaskPb.WorkflowTask from) {
}
to.setRateLimited( from.getRateLimited() );
to.setDefaultExclusiveJoinTask( from.getDefaultExclusiveJoinTaskList().stream().collect(Collectors.toCollection(ArrayList::new)) );
to.setAsyncComplete( from.getAsyncComplete() );
return to;
}

Expand Down
1 change: 1 addition & 0 deletions grpc/src/main/proto/model/workflowtask.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ message WorkflowTask {
TaskDef task_definition = 19;
bool rate_limited = 20;
repeated string default_exclusive_join_task = 21;
bool async_complete = 23;
}

0 comments on commit c598881

Please sign in to comment.