Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #942 from Netflix/feature/fix_http_task_retries
Browse files Browse the repository at this point in the history
fix retries logic for failed http tasks
  • Loading branch information
Alex committed Jan 15, 2019
2 parents af05588 + bd93b83 commit 955bb52
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -288,11 +289,12 @@ public void testOptional() {
workflow.getTasks().add(task);

QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
ParametersUtils parametersUtils = mock(ParametersUtils.class);

Map<String, TaskMapper> taskMappers = new HashMap<>();
new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers).decide(workflow);
new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers).decide(workflow);

System.out.println(workflow.getTasks());
System.out.println(workflow.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class DeciderService {
private final QueueDAO queueDAO;
private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
private final MetadataDAO metadataDAO;

private final Map<String, TaskMapper> taskMappers;

Expand All @@ -78,10 +80,11 @@ public class DeciderService {
private final Predicate<Task> isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted() || SystemTaskType.isBuiltIn(task.getTaskType());

@Inject
public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO,
public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO, MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
@Named("TaskMappers") Map<String, TaskMapper> taskMappers) {
this.queueDAO = queueDAO;
this.metadataDAO = metadataDAO;
this.parametersUtils = parametersUtils;
this.taskMappers = taskMappers;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
Expand Down Expand Up @@ -350,6 +353,11 @@ private String getNextTasksToBeScheduled(Workflow workflow, Task task) {
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflowException {

int retryCount = task.getRetryCount();

if(taskDefinition == null) {
taskDefinition = metadataDAO.getTaskDef(task.getTaskType());
}

if (!task.getStatus().isRetriable() || SystemTaskType.isBuiltIn(task.getTaskType()) || taskDefinition == null || taskDefinition.getRetryCount() <= retryCount) {
WorkflowStatus status = task.getStatus().equals(TIMED_OUT) ? WorkflowStatus.TIMED_OUT : WorkflowStatus.FAILED;
updateWorkflowOutput(workflow, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.InputStream;
import java.util.Arrays;
Expand All @@ -59,7 +58,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -87,6 +85,8 @@ public class TestDeciderOutcomes {
public void init() {
metadataDAO = mock(MetadataDAO.class);
QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);

ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
Configuration configuration = mock(Configuration.class);
when(configuration.getTaskInputPayloadSizeThresholdKB()).thenReturn(10L);
Expand All @@ -111,7 +111,7 @@ public void init() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

this.deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
this.deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void setup() {
metadataDAO = mock(MetadataDAO.class);
externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);

TaskDef taskDef = new TaskDef();

Expand All @@ -136,7 +137,7 @@ public void setup() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void init() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config);
}
Expand Down

0 comments on commit 955bb52

Please sign in to comment.