Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #3775 from manan164/wait_tasks_fix
Browse files Browse the repository at this point in the history
Make wait task async.
  • Loading branch information
v1r3n committed Sep 16, 2023
2 parents ac50271 + 4d0060b commit 50b6fd4
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
Expand All @@ -27,6 +32,11 @@
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
import static com.netflix.conductor.core.execution.tasks.Wait.DURATION_INPUT;
import static com.netflix.conductor.core.execution.tasks.Wait.UNTIL_INPUT;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDate;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDuration;
import static com.netflix.conductor.model.TaskModel.Status.FAILED_WITH_TERMINAL_ERROR;

/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link
Expand Down Expand Up @@ -69,6 +79,52 @@ public List<TaskModel> getMappedTasks(TaskMapperContext taskMapperContext) {
waitTask.setInputData(waitTaskInput);
waitTask.setStartTime(System.currentTimeMillis());
waitTask.setStatus(TaskModel.Status.IN_PROGRESS);
setCallbackAfter(waitTask);
return List.of(waitTask);
}

void setCallbackAfter(TaskModel task) {
String duration =
Optional.ofNullable(task.getInputData().get(DURATION_INPUT)).orElse("").toString();
String until =
Optional.ofNullable(task.getInputData().get(UNTIL_INPUT)).orElse("").toString();

if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) {
task.setReasonForIncompletion(
"Both 'duration' and 'until' specified. Please provide only one input");
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
return;
}

if (StringUtils.isNotBlank(duration)) {

Duration timeDuration = parseDuration(duration);
long waitTimeout = System.currentTimeMillis() + (timeDuration.getSeconds() * 1000);
task.setWaitTimeout(waitTimeout);
long seconds = timeDuration.getSeconds();
task.setCallbackAfterSeconds(seconds);

} else if (StringUtils.isNotBlank(until)) {
try {

Date expiryDate = parseDate(until);
long timeInMS = expiryDate.getTime();
long now = System.currentTimeMillis();
long seconds = ((timeInMS - now) / 1000) + 1;
if (seconds < 0) {
seconds = 0;
}
task.setCallbackAfterSeconds(seconds);
task.setWaitTimeout(timeInMS);

} catch (ParseException parseException) {
task.setReasonForIncompletion(
"Invalid/Unsupported Wait Until format. Provided: " + until);
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
}
} else {
// If there is no time duration specified then the WAIT task should wait forever
task.setCallbackAfterSeconds(Integer.MAX_VALUE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,13 @@
*/
package com.netflix.conductor.core.execution.tasks;

import java.text.ParseException;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDate;
import static com.netflix.conductor.core.utils.DateTimeUtils.parseDuration;
import static com.netflix.conductor.model.TaskModel.Status.*;

@Component(TASK_TYPE_WAIT)
Expand All @@ -39,46 +31,6 @@ public Wait() {
super(TASK_TYPE_WAIT);
}

@Override
public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {

String duration =
Optional.ofNullable(task.getInputData().get(DURATION_INPUT)).orElse("").toString();
String until =
Optional.ofNullable(task.getInputData().get(UNTIL_INPUT)).orElse("").toString();

if (StringUtils.isNotBlank(duration) && StringUtils.isNotBlank(until)) {
task.setReasonForIncompletion(
"Both 'duration' and 'until' specified. Please provide only one input");
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
return;
}

if (StringUtils.isNotBlank(duration)) {

Duration timeDuration = parseDuration(duration);
long waitTimeout = System.currentTimeMillis() + (timeDuration.getSeconds() * 1000);
task.setWaitTimeout(waitTimeout);

long seconds = timeDuration.getSeconds();
task.setCallbackAfterSeconds(seconds);
} else if (StringUtils.isNotBlank(until)) {
try {
Date expiryDate = parseDate(until);
long timeInMS = expiryDate.getTime();
long now = System.currentTimeMillis();
long seconds = (timeInMS - now) / 1000;
task.setWaitTimeout(timeInMS);

} catch (ParseException parseException) {
task.setReasonForIncompletion(
"Invalid/Unsupported Wait Until format. Provided: " + until);
task.setStatus(FAILED_WITH_TERMINAL_ERROR);
}
}
task.setStatus(IN_PROGRESS);
}

@Override
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
task.setStatus(TaskModel.Status.CANCELED);
Expand All @@ -98,4 +50,8 @@ public boolean execute(

return false;
}

public boolean isAsync() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,9 @@ public void testScheduleTask() {
doAnswer(answer).when(queueDAO).push(any(), any(), anyInt(), anyLong());

boolean stateChanged = workflowExecutor.scheduleTask(workflow, tasks);
assertEquals(2, startedTaskCount.get());
assertEquals(1, queuedTaskCount.get());
// Wait task is no async to it will be queued.
assertEquals(1, startedTaskCount.get());
assertEquals(2, queuedTaskCount.get());
assertTrue(stateChanged);
assertFalse(httpTask.isStarted());
assertTrue(http2Task.isStarted());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,19 @@
*/
package com.netflix.conductor.core.execution.mapper;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Test;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -29,6 +33,9 @@
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public class WaitTaskMapperTest {
Expand Down Expand Up @@ -65,4 +72,145 @@ public void getMappedTasks() {
assertEquals(1, mappedTasks.size());
assertEquals(TASK_TYPE_WAIT, mappedTasks.get(0).getTaskType());
}

@Test
public void testWaitForever() {

WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setName("Wait_task");
workflowTask.setType(TaskType.WAIT.name());
String taskId = new IDGenerator().generate();

ParametersUtils parametersUtils = mock(ParametersUtils.class);
WorkflowModel workflow = new WorkflowModel();
WorkflowDef workflowDef = new WorkflowDef();
workflow.setWorkflowDefinition(workflowDef);

TaskMapperContext taskMapperContext =
TaskMapperContext.newBuilder()
.withWorkflowModel(workflow)
.withTaskDefinition(new TaskDef())
.withWorkflowTask(workflowTask)
.withTaskInput(new HashMap<>())
.withRetryCount(0)
.withTaskId(taskId)
.build();

WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils);
// When
List<TaskModel> mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext);
assertEquals(1, mappedTasks.size());
assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS);
assertTrue(mappedTasks.get(0).getOutputData().isEmpty());
}

@Test
public void testWaitUntil() {

String dateFormat = "yyyy-MM-dd HH:mm";
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
LocalDateTime now = LocalDateTime.now();
String formatted = formatter.format(now);
System.out.println(formatted);

WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setName("Wait_task");
workflowTask.setType(TaskType.WAIT.name());
String taskId = new IDGenerator().generate();
Map<String, Object> input = Map.of(Wait.UNTIL_INPUT, formatted);
workflowTask.setInputParameters(input);

ParametersUtils parametersUtils = mock(ParametersUtils.class);
doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any());

WorkflowModel workflow = new WorkflowModel();
WorkflowDef workflowDef = new WorkflowDef();
workflow.setWorkflowDefinition(workflowDef);

TaskMapperContext taskMapperContext =
TaskMapperContext.newBuilder()
.withWorkflowModel(workflow)
.withTaskDefinition(new TaskDef())
.withWorkflowTask(workflowTask)
.withTaskInput(Map.of(Wait.UNTIL_INPUT, formatted))
.withRetryCount(0)
.withTaskId(taskId)
.build();

WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils);
// When
List<TaskModel> mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext);
assertEquals(1, mappedTasks.size());
assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS);
assertEquals(mappedTasks.get(0).getCallbackAfterSeconds(), 0L);
}

@Test
public void testWaitDuration() {

WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setName("Wait_task");
workflowTask.setType(TaskType.WAIT.name());
String taskId = new IDGenerator().generate();
Map<String, Object> input = Map.of(Wait.DURATION_INPUT, "1s");
workflowTask.setInputParameters(input);

ParametersUtils parametersUtils = mock(ParametersUtils.class);
doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any());
WorkflowModel workflow = new WorkflowModel();
WorkflowDef workflowDef = new WorkflowDef();
workflow.setWorkflowDefinition(workflowDef);

TaskMapperContext taskMapperContext =
TaskMapperContext.newBuilder()
.withWorkflowModel(workflow)
.withTaskDefinition(new TaskDef())
.withWorkflowTask(workflowTask)
.withTaskInput(Map.of(Wait.DURATION_INPUT, "1s"))
.withRetryCount(0)
.withTaskId(taskId)
.build();

WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils);
// When
List<TaskModel> mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext);
assertEquals(1, mappedTasks.size());
assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.IN_PROGRESS);
assertTrue(mappedTasks.get(0).getCallbackAfterSeconds() <= 1L);
}

@Test
public void testInvalidWaitConfig() {

WorkflowTask workflowTask = new WorkflowTask();
workflowTask.setName("Wait_task");
workflowTask.setType(TaskType.WAIT.name());
String taskId = new IDGenerator().generate();
Map<String, Object> input =
Map.of(Wait.DURATION_INPUT, "1s", Wait.UNTIL_INPUT, "2022-12-12");
workflowTask.setInputParameters(input);

ParametersUtils parametersUtils = mock(ParametersUtils.class);
doReturn(input).when(parametersUtils).getTaskInputV2(any(), any(), any(), any());
WorkflowModel workflow = new WorkflowModel();
WorkflowDef workflowDef = new WorkflowDef();
workflow.setWorkflowDefinition(workflowDef);

TaskMapperContext taskMapperContext =
TaskMapperContext.newBuilder()
.withWorkflowModel(workflow)
.withTaskDefinition(new TaskDef())
.withWorkflowTask(workflowTask)
.withTaskInput(
Map.of(Wait.DURATION_INPUT, "1s", Wait.UNTIL_INPUT, "2022-12-12"))
.withRetryCount(0)
.withTaskId(taskId)
.build();

WaitTaskMapper waitTaskMapper = new WaitTaskMapper(parametersUtils);
// When
List<TaskModel> mappedTasks = waitTaskMapper.getMappedTasks(taskMapperContext);
assertEquals(1, mappedTasks.size());
assertEquals(mappedTasks.get(0).getStatus(), TaskModel.Status.FAILED_WITH_TERMINAL_ERROR);
}
}

0 comments on commit 50b6fd4

Please sign in to comment.