Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1098 from Netflix/fix_task_client_large_payloads
Browse files Browse the repository at this point in the history
Updated WorkflowTaskCoordinator to separate retry logic for large pay…
  • Loading branch information
kishorebanala committed Apr 11, 2019
2 parents bfbca01 + fc5d312 commit 8fc13b0
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,29 @@ public Task getPendingTaskForWorkflow(String workflowId, String taskReferenceNam
return getForEntity("tasks/in_progress/{workflowId}/{taskRefName}", null, Task.class, workflowId, taskReferenceName);
}

/**
* Use updateTask(TaskResult taskResult) instead.
* @param taskResult
* @param taskType
*/
@Deprecated
public void updateTask(TaskResult taskResult, String taskType) {
updateTask(taskResult);
}

/**
* Updates the result of a task execution.
* If the size of the task output payload is bigger than {@link ConductorClientConfiguration#getTaskOutputPayloadThresholdKB()},
* it is uploaded to {@link ExternalPayloadStorage}, if enabled, else the task is marked as FAILED_WITH_TERMINAL_ERROR.
*
* @param taskResult the {@link TaskResult} of the executed task to be updated.
* @param taskType the type of the task
*/
public void updateTask(TaskResult taskResult, String taskType) {
public void updateTask(TaskResult taskResult) {
Preconditions.checkNotNull(taskResult, "Task result cannot be null");
postForEntityWithRequestOnly("tasks", taskResult);
}

public void evaluateAndUploadLargePayload(TaskResult taskResult, String taskType) {
Preconditions.checkNotNull(taskResult, "Task result cannot be null");
Preconditions.checkArgument(StringUtils.isBlank(taskResult.getExternalOutputPayloadStoragePath()), "External Storage Path must not be set");

Expand Down Expand Up @@ -249,7 +263,6 @@ public void updateTask(TaskResult taskResult, String taskType) {
logger.error(errorMsg, e);
throw new ConductorClientException(errorMsg, e);
}
postForEntityWithRequestOnly("tasks", taskResult);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,21 @@ public String getWorkerNamePrefix()

private void updateWithRetry(int count, Task task, TaskResult result, Worker worker) {
try {
String description = String.format("Retry updating task result: %s for task: %s in worker: %s", result.toString(), task.getTaskDefName(), worker.getIdentity());
String methodName = "updateWithRetry";
String updateTaskDesc = String.format("Retry updating task result: %s for task: %s in worker: %s", result.toString(), task.getTaskDefName(), worker.getIdentity());
String evaluatePayloadDesc = String.format("Evaluate Task payload for task: %s in worker: %s", task.getTaskDefName(), worker.getIdentity());
String methodName = "updateWithRetry";

new RetryUtil<>().retryOnException(() ->
{
taskClient.evaluateAndUploadLargePayload(result, task.getTaskType());
return null;
}, null, null, count, evaluatePayloadDesc, methodName);

new RetryUtil<>().retryOnException(() ->
{
taskClient.updateTask(result, task.getTaskType());
taskClient.updateTask(result);
return null;
}, null, null, count, description, methodName);
}, null, null, count, updateTaskDesc, methodName);
} catch (Exception e) {
worker.onErrorUpdate(task);
WorkflowTaskMetrics.incrementTaskUpdateErrorCount(worker.getTaskDefName(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.client.exceptions.ConductorClientException;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -33,6 +34,7 @@
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -105,10 +107,10 @@ public void testTaskException() {
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());
).when(client).updateTask(any());
coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);
Mockito.verify(client).updateTask(any(), anyString());
Mockito.verify(client).updateTask(any());
}

@Test
Expand Down Expand Up @@ -138,7 +140,7 @@ public void testNoOpWhenAckFailed() {

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
verify(client, never()).updateTask(any(), any());
verify(client, never()).updateTask(any());
}

@Test
Expand Down Expand Up @@ -168,7 +170,7 @@ public void testNoOpWhenAckThrowsException() {

// then worker.execute must not be called and task must be updated with IN_PROGRESS status
verify(worker, never()).execute(any());
verify(client, never()).updateTask(any(), any());
verify(client, never()).updateTask(any());
}

@Test
Expand Down Expand Up @@ -207,7 +209,7 @@ public void testReturnTaskWhenRejectedExecutionExceptionThrown() {
latch.countDown();
return null;
}
).when(client).updateTask(any(), anyString());
).when(client).updateTask(any());
coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

Expand All @@ -217,6 +219,53 @@ public void testReturnTaskWhenRejectedExecutionExceptionThrown() {

// task must be updated with IN_PROGRESS status three times, two from worker.execute() and
// one from returnTask caused by RejectedExecutionException.
verify(client, times(3)).updateTask(any(), anyString());
verify(client, times(3)).updateTask(any());
}

@Test
public void testLargePayloadCanFailUpdateWithRetry() {
Task testTask = new Task();
testTask.setStatus(Task.Status.COMPLETED);

Worker worker = mock(Worker.class);
when(worker.getPollingInterval()).thenReturn(3000);
when(worker.getPollCount()).thenReturn(1);
when(worker.getTaskDefName()).thenReturn("test");
when(worker.preAck(any())).thenReturn(true);
when(worker.execute(any())).thenAnswer(invocation -> {
// Sleep for 2 seconds to trigger RejectedExecutionException
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
return new TaskResult(testTask);
});

TaskClient taskClient = Mockito.mock(TaskClient.class);
WorkflowTaskCoordinator coordinator = new WorkflowTaskCoordinator.Builder()
.withWorkers(worker)
.withThreadCount(1)
.withWorkerQueueSize(1)
.withSleepWhenRetry(100000)
.withUpdateRetryCount(3)
.withTaskClient(taskClient)
.withWorkerNamePrefix("test-worker-")
.build();

when(taskClient.batchPollTasksInDomain(anyString(), anyString(), anyString(), anyInt(), anyInt())).thenReturn(ImmutableList.of(testTask));
when(taskClient.ack(anyString(), anyString())).thenReturn(true);

doThrow(ConductorClientException.class).when(taskClient).evaluateAndUploadLargePayload(any(TaskResult.class), anyString());

CountDownLatch latch = new CountDownLatch(1);

doAnswer(invocation -> {
latch.countDown();
return null;
}
).when(worker).onErrorUpdate(any());

coordinator.init();
Uninterruptibles.awaitUninterruptibly(latch);

// When evaluateAndUploadLargePayload fails indefinitely, task update shouldn't be called.
verify(taskClient, times(0)).updateTask(any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testAll() throws Exception {

task.getOutputData().put("key1", "value1");
task.setStatus(Status.COMPLETED);
taskClient.updateTask(new TaskResult(task), task.getTaskType());
taskClient.updateTask(new TaskResult(task));

polled = taskClient.batchPollTasksByTaskType(t0.getName(), "test", 1, 100);
assertNotNull(polled);
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testStartWorkflow() {
public void testUpdateTask() {
TaskResult taskResult = new TaskResult();
try {
taskClient.updateTask(taskResult, "taskTest");
taskClient.updateTask(taskResult);
} catch (ConductorClientException e) {
assertEquals(400, e.getStatus());
assertEquals("Validation failed, check below errors for detail.", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testDecision1Default() throws Exception {
String taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task1").getTaskId();
taskOutput.put("taskReferenceName", "task1");
TaskResult taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

Workflow workflow = workflowClient.getWorkflow(wfInstanceId, true);
String taskReferenceName = workflow.getTaskByRefName("exclusiveJoin").getOutputData().get("taskReferenceName")
Expand All @@ -125,12 +125,12 @@ public void testDecision1TrueAndDecision2Default() throws Exception {
String taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task1").getTaskId();
taskOutput.put("taskReferenceName", "task1");
TaskResult taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task2").getTaskId();
taskOutput.put("taskReferenceName", "task2");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

Workflow workflow = workflowClient.getWorkflow(wfInstanceId, true);
String taskReferenceName = workflow.getTaskByRefName("exclusiveJoin").getOutputData().get("taskReferenceName")
Expand All @@ -152,17 +152,17 @@ public void testDecision1TrueAndDecision2True() throws Exception {
String taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task1").getTaskId();
taskOutput.put("taskReferenceName", "task1");
TaskResult taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task2").getTaskId();
taskOutput.put("taskReferenceName", "task2");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task3").getTaskId();
taskOutput.put("taskReferenceName", "task3");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

Workflow workflow = workflowClient.getWorkflow(wfInstanceId, true);
String taskReferenceName = workflow.getTaskByRefName("exclusiveJoin").getOutputData().get("taskReferenceName")
Expand All @@ -184,12 +184,12 @@ public void testDecision1FalseAndDecision3Default() throws Exception {
String taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task1").getTaskId();
taskOutput.put("taskReferenceName", "task1");
TaskResult taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task4").getTaskId();
taskOutput.put("taskReferenceName", "task4");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

Workflow workflow = workflowClient.getWorkflow(wfInstanceId, true);
String taskReferenceName = workflow.getTaskByRefName("exclusiveJoin").getOutputData().get("taskReferenceName")
Expand All @@ -211,17 +211,17 @@ public void testDecision1FalseAndDecision3True() throws Exception {
String taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task1").getTaskId();
taskOutput.put("taskReferenceName", "task1");
TaskResult taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task4").getTaskId();
taskOutput.put("taskReferenceName", "task4");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

taskId = taskClient.getPendingTaskForWorkflow(wfInstanceId, "task5").getTaskId();
taskOutput.put("taskReferenceName", "task5");
taskResult = setTaskResult(wfInstanceId, taskId, TaskResult.Status.COMPLETED, taskOutput);
taskClient.updateTask(taskResult, "");
taskClient.updateTask(taskResult);

Workflow workflow = workflowClient.getWorkflow(wfInstanceId, true);
String taskReferenceName = workflow.getTaskByRefName("exclusiveJoin").getOutputData().get("taskReferenceName")
Expand Down

0 comments on commit 8fc13b0

Please sign in to comment.