Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix sync system task execution handling; trigger failure workflow fro…
Browse files Browse the repository at this point in the history
…m terminate task; fix large payload retry in client
  • Loading branch information
apanicker-nflx committed Jul 1, 2020
1 parent 72197ae commit fddef78
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 36 deletions.
Expand Up @@ -27,6 +27,7 @@
import com.netflix.conductor.common.run.SearchResult;
import com.netflix.conductor.common.run.TaskSummary;
import com.netflix.conductor.common.utils.ExternalPayloadStorage;
import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType;
import com.sun.jersey.api.client.ClientHandler;
import com.sun.jersey.api.client.GenericType;
import com.sun.jersey.api.client.config.ClientConfig;
Expand Down Expand Up @@ -124,7 +125,7 @@ public Task pollTask(String taskType, String workerId, String domain) {
Object[] params = new Object[]{"workerid", workerId, "domain", domain};
Task task = Optional.ofNullable(getForEntity("tasks/poll/{taskType}", params, Task.class, taskType))
.orElse(new Task());
populateTaskInput(task);
populateTaskPayloads(task);
return task;
}

Expand All @@ -144,7 +145,7 @@ public List<Task> batchPollTasksByTaskType(String taskType, String workerId, int

Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond};
List<Task> tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
tasks.forEach(this::populateTaskInput);
tasks.forEach(this::populateTaskPayloads);
return tasks;
}

Expand All @@ -165,21 +166,26 @@ public List<Task> batchPollTasksInDomain(String taskType, String domain, String

Object[] params = new Object[]{"workerid", workerId, "count", count, "timeout", timeoutInMillisecond, "domain", domain};
List<Task> tasks = getForEntity("tasks/poll/batch/{taskType}", params, taskList, taskType);
tasks.forEach(this::populateTaskInput);
tasks.forEach(this::populateTaskPayloads);
return tasks;
}

/**
* Populates the task input from external payload storage if the external storage path is specified.
* Populates the task input/output from external payload storage if the external storage path is specified.
*
* @param task the task for which the input is to be populated.
*/
private void populateTaskInput(Task task) {
private void populateTaskPayloads(Task task) {
if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) {
MetricsContainer.incrementExternalPayloadUsedCount(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.name(), ExternalPayloadStorage.PayloadType.TASK_INPUT.name());
task.setInputData(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.TASK_INPUT, task.getExternalInputPayloadStoragePath()));
task.setExternalInputPayloadStoragePath(null);
}
if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) {
MetricsContainer.incrementExternalPayloadUsedCount(task.getTaskDefName(), ExternalPayloadStorage.Operation.READ.name(), PayloadType.TASK_OUTPUT.name());
task.setOutputData(downloadFromExternalStorage(ExternalPayloadStorage.PayloadType.TASK_OUTPUT, task.getExternalOutputPayloadStoragePath()));
task.setExternalOutputPayloadStoragePath(null);
}
}

/**
Expand Down
Expand Up @@ -294,7 +294,7 @@ public static TaskResult newTaskResult(Status status) {

/**
* Copy the given task result object
* @return a deep copy of the task result object
* @return a deep copy of the task result object except the externalOutputPayloadStoragePath field
*/
public TaskResult copy() {
TaskResult taskResult = new TaskResult();
Expand All @@ -307,7 +307,6 @@ public TaskResult copy() {
taskResult.setOutputData(outputData);
taskResult.setOutputMessage(outputMessage);
taskResult.setLogs(logs);
taskResult.setExternalOutputPayloadStoragePath(externalOutputPayloadStoragePath);
taskResult.setSubWorkflowId(subWorkflowId);
return taskResult;
}
Expand Down
Expand Up @@ -764,8 +764,8 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
workflow.getOutput().put("conductor.failure_workflow", "Error workflow " + failureWorkflow + " failed to start. reason: " + e.getMessage());
Monitors.recordWorkflowStartError(failureWorkflow, WorkflowContext.get().getClientApp());
}
executionDAOFacade.updateWorkflow(workflow);
}

queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue
executionDAOFacade.removeFromPendingWorkflow(workflow.getWorkflowName(), workflow.getWorkflowId());

Expand Down Expand Up @@ -985,33 +985,29 @@ public boolean decide(String workflowId) {
if (isSystemTask.and(isNonTerminalTask).test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
try {
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
// FIXME: temporary hack to workaround TERMINATE task
if (TERMINATE.name().equals(task.getTaskType())) {
deciderService.externalizeTaskData(task);
executionDAOFacade.updateTask(task);
if (workflowInstance.getStatus().equals(WorkflowStatus.COMPLETED)) {
completeWorkflow(workflow);
} else {
workflow.setStatus(workflowInstance.getStatus());
terminateWorkflow(workflow, "Workflow is FAILED by TERMINATE task: " + task.getTaskId(), null);
}
return true;
}
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
// FIXME: temporary hack to workaround TERMINATE task
if (TERMINATE.name().equals(task.getTaskType())) {
deciderService.externalizeTaskData(task);
executionDAOFacade.updateTask(task);
if (workflowInstance.getStatus().equals(WorkflowStatus.COMPLETED)) {
completeWorkflow(workflow);
} else {
workflow.setStatus(workflowInstance.getStatus());
terminate(workflow, new TerminateWorkflowException("Workflow is FAILED by TERMINATE task: " + task.getTaskId()));
}
return true;
}
deciderService.externalizeTaskData(task);
tasksToBeUpdated.add(task);
stateChanged = true;
} else if (SUB_WORKFLOW.name().equals(task.getTaskType()) && task.getStatus().equals(IN_PROGRESS)) {
// Verifies and updates the task inplace, based on the Subworkflow and parent Workflow state,
// and continues with the current decide.
if (updateParentWorkflow(task, workflow)) {
tasksToBeUpdated.add(task);
stateChanged = true;
} else if (SUB_WORKFLOW.name().equals(task.getTaskType()) && task.getStatus().equals(IN_PROGRESS)) {
// Verifies and updates the task inplace, based on the Subworkflow and parent Workflow state,
// and continues with the current decide.
if (updateParentWorkflow(task, workflow)) {
tasksToBeUpdated.add(task);
stateChanged = true;
}
}
} catch (Exception e) {
throw new ApplicationException(Code.INTERNAL_ERROR, String.format("Unable to start system task: %s", workflowSystemTask.getName()), e);
}
}
}
Expand Down
Expand Up @@ -35,6 +35,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -5380,7 +5381,8 @@ public void testTerminateTaskWithCompletedStatus() {
assertNotNull(workflowDef);
metadataService.registerWorkflowDef(workflowDef);

Map wfInput = Collections.singletonMap("a", 1);
Map<String, Object> wfInput = new HashMap<>();
wfInput.put("a", 1);
String workflowId = startOrLoadWorkflowExecution(workflowDef.getName(), workflowDef.getVersion(), "", wfInput, null, null);
Workflow workflow = workflowExecutor.getWorkflow(workflowId, true);

Expand All @@ -5401,6 +5403,7 @@ public void testTerminateTaskWithCompletedStatus() {

@Test
public void testTerminateTaskWithFailedStatus() {
String failureWorkflowName = "failure_workflow";
WorkflowDef workflowDef = new WorkflowDef();
workflowDef.setName("test_terminate_task_wf");
workflowDef.setSchemaVersion(2);
Expand Down Expand Up @@ -5431,11 +5434,17 @@ public void testTerminateTaskWithFailedStatus() {

workflowDef.getTasks().addAll(Arrays.asList(lambdaWorkflowTask, terminateWorkflowTask, workflowTask2));

assertNotNull(workflowDef);
WorkflowDef failureWorkflowDef = new WorkflowDef();
failureWorkflowDef.setName(failureWorkflowName);
failureWorkflowDef.setTasks(Collections.singletonList(lambdaWorkflowTask));

workflowDef.setFailureWorkflow(failureWorkflowName);

metadataService.registerWorkflowDef(failureWorkflowDef);
metadataService.registerWorkflowDef(workflowDef);

Map wfInput = Collections.singletonMap("a", 1);
//noinspection unchecked
Map<String, Object> wfInput = new HashMap<>();
wfInput.put("a", 1);
String workflowId = startOrLoadWorkflowExecution(workflowDef.getName(), workflowDef.getVersion(), "", wfInput, null, null);
Workflow workflow = workflowExecutor.getWorkflow(workflowId, true);

Expand All @@ -5449,9 +5458,20 @@ public void testTerminateTaskWithFailedStatus() {
assertEquals("tasks:" + workflow.getTasks(), WorkflowStatus.FAILED, workflow.getStatus());
assertEquals(TaskType.TASK_TYPE_LAMBDA, workflow.getTasks().get(0).getTaskType());
assertEquals(TaskType.TASK_TYPE_TERMINATE, workflow.getTasks().get(1).getTaskType());
assertEquals(workflow.getTasks().get(1).getOutputData(), workflow.getOutput());
assertNotNull(workflow.getOutput());
assertNotNull(workflow.getOutput().get("conductor.failure_workflow"));

String failureWorkflowId = (String)workflow.getOutput().get("conductor.failure_workflow");
Workflow failureWorkflow = workflowExecutionService.getExecutionStatus(failureWorkflowId, true);
assertNotNull(failureWorkflow);
assertEquals(failureWorkflowName, failureWorkflow.getWorkflowName());
assertEquals(workflowId, failureWorkflow.getInput().get("workflowId"));
assertEquals(WorkflowStatus.COMPLETED, failureWorkflow.getStatus());
assertEquals(1, failureWorkflow.getTasks().size());
assertEquals(TaskType.TASK_TYPE_LAMBDA, failureWorkflow.getTasks().get(0).getTaskType());

metadataService.unregisterWorkflowDef("test_terminate_task_wf", 1);
metadataService.unregisterWorkflowDef(failureWorkflowName, 1);
}

@Test
Expand Down

0 comments on commit fddef78

Please sign in to comment.