Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix input param replacement when workflow is retried
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Jun 23, 2020
1 parent d3cd3ba commit 0e0e843
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 44 deletions.
Expand Up @@ -84,25 +84,26 @@ public class WorkflowExecutor {
private final Configuration config;
private final MetadataMapperService metadataMapperService;
private final ExecutionDAOFacade executionDAOFacade;

private WorkflowStatusListener workflowStatusListener;
private final ParametersUtils parametersUtils;
private final WorkflowStatusListener workflowStatusListener;

private int activeWorkerLastPollInSecs;
private int queueTaskMessagePostponeSeconds;
private final int queueTaskMessagePostponeSeconds;
public static final String DECIDER_QUEUE = "_deciderQueue";
private static final String className = WorkflowExecutor.class.getSimpleName();
private final ExecutionLockService executionLockService;

@Inject
public WorkflowExecutor(
DeciderService deciderService,
MetadataDAO metadataDAO,
QueueDAO queueDAO,
MetadataMapperService metadataMapperService,
WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade,
Configuration config,
ExecutionLockService executionLockService
DeciderService deciderService,
MetadataDAO metadataDAO,
QueueDAO queueDAO,
MetadataMapperService metadataMapperService,
WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade,
Configuration config,
ExecutionLockService executionLockService,
ParametersUtils parametersUtils
) {
this.deciderService = deciderService;
this.metadataDAO = metadataDAO;
Expand All @@ -114,6 +115,7 @@ public WorkflowExecutor(
this.queueTaskMessagePostponeSeconds = config.getIntProperty("task.queue.message.postponeSeconds", 60);
this.workflowStatusListener = workflowStatusListener;
this.executionLockService = executionLockService;
this.parametersUtils = parametersUtils;
}

/**
Expand Down Expand Up @@ -536,6 +538,7 @@ public void retry(String workflowId) {
switch (task.getStatus()) {
case FAILED:
case FAILED_WITH_TERMINAL_ERROR:
case TIMED_OUT:
retriableMap.put(task.getReferenceTaskName(), task);
break;
case CANCELED:
Expand Down Expand Up @@ -568,7 +571,7 @@ public void retry(String workflowId) {
// taskToBeRescheduled would set task `retried` to true, and hence it's important to updateTasks after obtaining task copy from taskToBeRescheduled.
List<Task> retriableTasks = retriableMap.values().stream()
.sorted(Comparator.comparingInt(Task::getSeq))
.map(this::taskToBeRescheduled)
.map(task -> taskToBeRescheduled(workflow, task))
.collect(Collectors.toList());

dedupAndAddTasks(workflow, retriableTasks);
Expand All @@ -590,7 +593,7 @@ public void retry(String workflowId) {
* @param task failed or cancelled task
* @return new instance of a task with "SCHEDULED" status
*/
private Task taskToBeRescheduled(Task task) {
private Task taskToBeRescheduled(Workflow workflow, Task task) {
Task taskToBeRetried = task.copy();
taskToBeRetried.setTaskId(IDGenerator.generate());
taskToBeRetried.setRetriedTaskId(task.getTaskId());
Expand All @@ -601,8 +604,15 @@ private Task taskToBeRescheduled(Task task) {
taskToBeRetried.setCallbackAfterSeconds(0);
taskToBeRetried.setSubWorkflowId(null);
taskToBeRetried.setReasonForIncompletion(null);

// perform parameter replacement for retried task
Map<String, Object> taskInput = parametersUtils.getTaskInput(taskToBeRetried.getWorkflowTask().getInputParameters(),
workflow, taskToBeRetried.getWorkflowTask().getTaskDefinition(), taskToBeRetried.getTaskId());
taskToBeRetried.getInputData().putAll(taskInput);

task.setRetried(true);
task.setExecuted(true); // since this task is being retried and a retry has been computed, task lifecycle is complete
// since this task is being retried and a retry has been computed, task lifecycle is complete
task.setExecuted(true);
return taskToBeRetried;
}

Expand Down Expand Up @@ -683,6 +693,21 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
try {
executionLockService.acquireLock(workflow.getWorkflowId(), 60000);

if (!workflow.getStatus().isTerminal()) {
workflow.setStatus(WorkflowStatus.TERMINATED);
}

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
if (workflow.getWorkflowDefinition() == null) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);
}
deciderService.updateWorkflowOutput(workflow, null);

String workflowId = workflow.getWorkflowId();
workflow.setReasonForIncompletion(reason);
executionDAOFacade.updateWorkflow(workflow);

List<Task> tasks = workflow.getTasks();
// Remove from the task queue if they were there
tasks.forEach(task -> queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId()));
Expand All @@ -706,21 +731,6 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo
}
}

if (!workflow.getStatus().isTerminal()) {
workflow.setStatus(WorkflowStatus.TERMINATED);
}

// FIXME Backwards compatibility for legacy workflows already running.
// This code will be removed in a future version.
if (workflow.getWorkflowDefinition() == null) {
workflow = metadataMapperService.populateWorkflowWithDefinitions(workflow);
}
deciderService.updateWorkflowOutput(workflow, null);

String workflowId = workflow.getWorkflowId();
workflow.setReasonForIncompletion(reason);
executionDAOFacade.updateWorkflow(workflow);

// If the following lines, for some reason fails, the sweep will take
// care of this again!
if (workflow.getParentWorkflowId() != null) {
Expand Down Expand Up @@ -867,7 +877,7 @@ public void updateTask(TaskResult taskResult) {
break;
default:
break;
};
}
return null;
}, null, null, 2, updateTaskQueueDesc, taskQueueOperation);

Expand Down
Expand Up @@ -22,8 +22,8 @@

class ExecutionConfig {

private ExecutorService executorService;
private SemaphoreUtil semaphoreUtil;
private final ExecutorService executorService;
private final SemaphoreUtil semaphoreUtil;

ExecutionConfig(int threadCount, String threadNameFormat) {

Expand Down
Expand Up @@ -126,7 +126,8 @@ public void init() {

DeciderService deciderService = new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, taskMappers, config);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config, executionLockService);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService,
workflowStatusListener, executionDAOFacade, config, executionLockService, parametersUtils);
}

@Test
Expand Down Expand Up @@ -440,11 +441,10 @@ public void testRestartWorkflow() {
public void testRetryNonTerminalWorkflow() {
Workflow workflow = new Workflow();
workflow.setWorkflowId("testRetryNonTerminalWorkflow");
workflow.setStatus(Workflow.WorkflowStatus.COMPLETED);
workflow.setStatus(Workflow.WorkflowStatus.RUNNING);
when(executionDAOFacade.getWorkflowById(anyString(), anyBoolean())).thenReturn(workflow);

workflowExecutor.retry(workflow.getWorkflowId());

}

@Test(expected = ApplicationException.class)
Expand Down Expand Up @@ -543,6 +543,7 @@ public void testRetryWorkflow() {
task_1_1.setStatus(Status.CANCELED);
task_1_1.setRetried(true);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

Task task_1_2 = new Task();
Expand All @@ -552,6 +553,7 @@ public void testRetryWorkflow() {
task_1_2.setTaskType(TaskType.SIMPLE.toString());
task_1_2.setStatus(Status.FAILED);
task_1_2.setTaskDefName("task1");
task_1_2.setWorkflowTask(new WorkflowTask());
task_1_2.setReferenceTaskName("task1_ref1");

Task task_2_1 = new Task();
Expand All @@ -561,6 +563,7 @@ public void testRetryWorkflow() {
task_2_1.setStatus(Status.FAILED);
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");


Expand All @@ -571,6 +574,7 @@ public void testRetryWorkflow() {
task_3_1.setStatus(Status.CANCELED);
task_3_1.setTaskType(TaskType.SIMPLE.toString());
task_3_1.setTaskDefName("task3");
task_3_1.setWorkflowTask(new WorkflowTask());
task_3_1.setReferenceTaskName("task3_ref1");

Task task_4_1 = new Task();
Expand All @@ -580,6 +584,7 @@ public void testRetryWorkflow() {
task_4_1.setStatus(Status.FAILED);
task_4_1.setTaskType(TaskType.SIMPLE.toString());
task_4_1.setTaskDefName("task1");
task_4_1.setWorkflowTask(new WorkflowTask());
task_4_1.setReferenceTaskName("task4_refABC");

workflow.getTasks().addAll(Arrays.asList(task_1_1, task_1_2, task_2_1, task_3_1, task_4_1));
Expand Down Expand Up @@ -617,41 +622,55 @@ public void testRetryWorkflowReturnsNoDuplicates() {

Task task_1_1 = new Task();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
task_1_1.setSeq(10);
task_1_1.setRetryCount(0);
task_1_1.setTaskType(TaskType.SIMPLE.toString());
task_1_1.setStatus(Status.FAILED);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

Task task_1_2 = new Task();
task_1_2.setTaskId(UUID.randomUUID().toString());
task_1_2.setSeq(21);
task_1_2.setSeq(11);
task_1_2.setRetryCount(1);
task_1_2.setTaskType(TaskType.SIMPLE.toString());
task_1_2.setStatus(Status.COMPLETED);
task_1_2.setTaskDefName("task1");
task_1_2.setWorkflowTask(new WorkflowTask());
task_1_2.setReferenceTaskName("task1_ref1");

Task task_2_1 = new Task();
task_2_1.setTaskId(UUID.randomUUID().toString());
task_2_1.setSeq(22);
task_2_1.setSeq(21);
task_2_1.setRetryCount(0);
task_2_1.setStatus(Status.CANCELED);
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");

Task task_3_1 = new Task();
task_3_1.setTaskId(UUID.randomUUID().toString());
task_3_1.setSeq(51);
task_3_1.setSeq(31);
task_3_1.setRetryCount(1);
task_3_1.setStatus(Status.FAILED_WITH_TERMINAL_ERROR);
task_3_1.setTaskType(TaskType.SIMPLE.toString());
task_3_1.setTaskDefName("task1");
task_3_1.setWorkflowTask(new WorkflowTask());
task_3_1.setReferenceTaskName("task3_ref1");

workflow.getTasks().addAll(Arrays.asList(task_1_1, task_1_2, task_2_1, task_3_1));
Task task_4_1 = new Task();
task_4_1.setTaskId(UUID.randomUUID().toString());
task_4_1.setSeq(41);
task_4_1.setRetryCount(0);
task_4_1.setStatus(Status.TIMED_OUT);
task_4_1.setTaskType(TaskType.SIMPLE.toString());
task_4_1.setTaskDefName("task1");
task_4_1.setWorkflowTask(new WorkflowTask());
task_4_1.setReferenceTaskName("task4_ref1");

workflow.getTasks().addAll(Arrays.asList(task_1_1, task_1_2, task_2_1, task_3_1, task_4_1));
//end of setup

//when:
Expand All @@ -661,7 +680,7 @@ public void testRetryWorkflowReturnsNoDuplicates() {

workflowExecutor.retry(workflow.getWorkflowId());

assertEquals(6, workflow.getTasks().size());
assertEquals(8, workflow.getTasks().size());
}


Expand All @@ -681,11 +700,12 @@ public void testRetryWorkflowMultipleRetries() {

Task task_1_1 = new Task();
task_1_1.setTaskId(UUID.randomUUID().toString());
task_1_1.setSeq(20);
task_1_1.setSeq(10);
task_1_1.setRetryCount(0);
task_1_1.setTaskType(TaskType.SIMPLE.toString());
task_1_1.setStatus(Status.FAILED);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

Task task_2_1 = new Task();
Expand All @@ -695,6 +715,7 @@ public void testRetryWorkflowMultipleRetries() {
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setStatus(Status.CANCELED);
task_2_1.setTaskDefName("task1");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");

workflow.getTasks().addAll(Arrays.asList(task_1_1, task_2_1));
Expand Down Expand Up @@ -766,6 +787,7 @@ public void testRetryWorkflowWithJoinTask() {
task_1_1.setTaskType(TaskType.SIMPLE.toString());
task_1_1.setStatus(Status.FAILED);
task_1_1.setTaskDefName("task1");
task_1_1.setWorkflowTask(new WorkflowTask());
task_1_1.setReferenceTaskName("task1_ref1");

Task task_2_1 = new Task();
Expand All @@ -775,6 +797,7 @@ public void testRetryWorkflowWithJoinTask() {
task_2_1.setStatus(Status.CANCELED);
task_2_1.setTaskType(TaskType.SIMPLE.toString());
task_2_1.setTaskDefName("task2");
task_2_1.setWorkflowTask(new WorkflowTask());
task_2_1.setReferenceTaskName("task2_ref1");

Task joinTask = new Task();
Expand Down
Expand Up @@ -79,7 +79,7 @@ public void setup() {
executionLockService = Mockito.mock(ExecutionLockService.class);
config = Mockito.mock(Configuration.class);
provider = spy(new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService,
workflowStatusListener, executionDAOFacade, config, executionLockService));
workflowStatusListener, executionDAOFacade, config, executionLockService, parametersUtils));
loopWorkflowTask1 = new WorkflowTask();
loopWorkflowTask1.setTaskReferenceName("task1");
loopWorkflowTask1.setName("task1");
Expand Down
Expand Up @@ -647,6 +647,7 @@ class SimpleWorkflowSpec extends Specification {
status == Workflow.WorkflowStatus.RUNNING
tasks.size() == 1
tasks[0].status == Task.Status.SCHEDULED
tasks[0].getInputData().get("p3") == tasks[0].getTaskId()
}
with(metadataService.getWorkflowDef(LINEAR_WORKFLOW_T1_T2, 1)) {
failureWorkflow
Expand All @@ -663,6 +664,7 @@ class SimpleWorkflowSpec extends Specification {
tasks.size() == 2
tasks[0].status == Task.Status.FAILED
tasks[1].status == Task.Status.SCHEDULED
tasks[1].getInputData().get("p3") == tasks[1].getTaskId()
}

when:"The first task 'integration_task_1' is polled and failed for the second time"
Expand All @@ -687,6 +689,7 @@ class SimpleWorkflowSpec extends Specification {
tasks[0].status == Task.Status.FAILED
tasks[1].status == Task.Status.FAILED
tasks[2].status == Task.Status.SCHEDULED
tasks[2].getInputData().get("p3") == tasks[2].getTaskId()
}

when:"The 'integration_task_1' task is polled and is completed"
Expand Down
Expand Up @@ -8,6 +8,7 @@
"inputParameters" : {
"p1" : "${workflow.input.param1}",
"p2" : "${workflow.input.param2}",
"p3" : "${CPEWF_TASK_ID}",
"someNullKey" : null
},
"type" : "SIMPLE",
Expand All @@ -25,7 +26,8 @@
"taskReferenceName" : "t2",
"inputParameters" : {
"tp1" : "${workflow.input.param1}",
"tp2" : "${t1.output.op}"
"tp2" : "${t1.output.op}",
"tp3" : "${CPEWF_TASK_ID}"
},
"type" : "SIMPLE",
"decisionCases" : { },
Expand Down

0 comments on commit 0e0e843

Please sign in to comment.