Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

fix retries logic for failed http tasks #942

Merged
1 commit merged into from
Jan 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -288,11 +289,12 @@ public void testOptional() {
workflow.getTasks().add(task);

QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
ParametersUtils parametersUtils = mock(ParametersUtils.class);

Map<String, TaskMapper> taskMappers = new HashMap<>();
new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers).decide(workflow);
new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers).decide(workflow);

System.out.println(workflow.getTasks());
System.out.println(workflow.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class DeciderService {
private final QueueDAO queueDAO;
private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
private final MetadataDAO metadataDAO;

private final Map<String, TaskMapper> taskMappers;

Expand All @@ -78,10 +80,11 @@ public class DeciderService {
private final Predicate<Task> isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted() || SystemTaskType.isBuiltIn(task.getTaskType());

@Inject
public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO,
public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO, MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
@Named("TaskMappers") Map<String, TaskMapper> taskMappers) {
this.queueDAO = queueDAO;
this.metadataDAO = metadataDAO;
this.parametersUtils = parametersUtils;
this.taskMappers = taskMappers;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
Expand Down Expand Up @@ -350,6 +353,11 @@ private String getNextTasksToBeScheduled(Workflow workflow, Task task) {
Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflowException {

int retryCount = task.getRetryCount();

if(taskDefinition == null) {
taskDefinition = metadataDAO.getTaskDef(task.getTaskType());
}

if (!task.getStatus().isRetriable() || SystemTaskType.isBuiltIn(task.getTaskType()) || taskDefinition == null || taskDefinition.getRetryCount() <= retryCount) {
WorkflowStatus status = task.getStatus().equals(TIMED_OUT) ? WorkflowStatus.TIMED_OUT : WorkflowStatus.FAILED;
updateWorkflowOutput(workflow, task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.netflix.conductor.dao.QueueDAO;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.io.InputStream;
import java.util.Arrays;
Expand All @@ -59,7 +58,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -87,6 +85,8 @@ public class TestDeciderOutcomes {
public void init() {
metadataDAO = mock(MetadataDAO.class);
QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);

ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
Configuration configuration = mock(Configuration.class);
when(configuration.getTaskInputPayloadSizeThresholdKB()).thenReturn(10L);
Expand All @@ -111,7 +111,7 @@ public void init() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

this.deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
this.deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public void setup() {
metadataDAO = mock(MetadataDAO.class);
externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
QueueDAO queueDAO = mock(QueueDAO.class);
MetadataDAO metadataDAO = mock(MetadataDAO.class);

TaskDef taskDef = new TaskDef();

Expand All @@ -136,7 +137,7 @@ public void setup() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void init() {
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config);
}
Expand Down