From bd93b8303e43db3e8e0b884c34a64910e17bbca4 Mon Sep 17 00:00:00 2001 From: Alex Lich Date: Tue, 15 Jan 2019 10:00:09 -0800 Subject: [PATCH] fix retries logic for failed http tasks --- .../netflix/conductor/contribs/http/TestHttpTask.java | 4 +++- .../conductor/core/execution/DeciderService.java | 10 +++++++++- .../conductor/core/execution/TestDeciderOutcomes.java | 6 +++--- .../conductor/core/execution/TestDeciderService.java | 3 ++- .../conductor/core/execution/TestWorkflowExecutor.java | 2 +- 5 files changed, 18 insertions(+), 7 deletions(-) diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java b/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java index ab3cfe69b8..a6dc3546f4 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/http/TestHttpTask.java @@ -31,6 +31,7 @@ import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; +import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -288,11 +289,12 @@ public void testOptional() { workflow.getTasks().add(task); QueueDAO queueDAO = mock(QueueDAO.class); + MetadataDAO metadataDAO = mock(MetadataDAO.class); ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); ParametersUtils parametersUtils = mock(ParametersUtils.class); Map taskMappers = new HashMap<>(); - new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers).decide(workflow); + new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers).decide(workflow); System.out.println(workflow.getTasks()); System.out.println(workflow.getStatus()); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index e6c4a3e511..71f6307f9e 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -30,6 +30,7 @@ import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.QueueUtils; +import com.netflix.conductor.dao.MetadataDAO; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import org.apache.commons.lang3.StringUtils; @@ -70,6 +71,7 @@ public class DeciderService { private final QueueDAO queueDAO; private final ParametersUtils parametersUtils; private final ExternalPayloadStorageUtils externalPayloadStorageUtils; + private final MetadataDAO metadataDAO; private final Map taskMappers; @@ -78,10 +80,11 @@ public class DeciderService { private final Predicate isNonPendingTask = task -> !task.isRetried() && !task.getStatus().equals(SKIPPED) && !task.isExecuted() || SystemTaskType.isBuiltIn(task.getTaskType()); @Inject - public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO, + public DeciderService(ParametersUtils parametersUtils, QueueDAO queueDAO, MetadataDAO metadataDAO, ExternalPayloadStorageUtils externalPayloadStorageUtils, @Named("TaskMappers") Map taskMappers) { this.queueDAO = queueDAO; + this.metadataDAO = metadataDAO; this.parametersUtils = parametersUtils; this.taskMappers = taskMappers; this.externalPayloadStorageUtils = externalPayloadStorageUtils; @@ -350,6 +353,11 @@ private String getNextTasksToBeScheduled(Workflow workflow, Task task) { Task retry(TaskDef taskDefinition, WorkflowTask workflowTask, Task task, Workflow workflow) throws TerminateWorkflowException { int retryCount = task.getRetryCount(); + + if(taskDefinition == null) { + taskDefinition = metadataDAO.getTaskDef(task.getTaskType()); + } + if (!task.getStatus().isRetriable() || SystemTaskType.isBuiltIn(task.getTaskType()) || taskDefinition == null || taskDefinition.getRetryCount() <= retryCount) { WorkflowStatus status = task.getStatus().equals(TIMED_OUT) ? WorkflowStatus.TIMED_OUT : WorkflowStatus.FAILED; updateWorkflowOutput(workflow, task); diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java index 1232b4eb73..f7c3c66d98 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderOutcomes.java @@ -45,7 +45,6 @@ import com.netflix.conductor.dao.QueueDAO; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; import java.io.InputStream; import java.util.Arrays; @@ -59,7 +58,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -87,6 +85,8 @@ public class TestDeciderOutcomes { public void init() { metadataDAO = mock(MetadataDAO.class); QueueDAO queueDAO = mock(QueueDAO.class); + MetadataDAO metadataDAO = mock(MetadataDAO.class); + ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); Configuration configuration = mock(Configuration.class); when(configuration.getTaskInputPayloadSizeThresholdKB()).thenReturn(10L); @@ -111,7 +111,7 @@ public void init() { taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils)); taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO)); - this.deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); + this.deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers); } @Test diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java index 796dd4f721..e70f99b52f 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestDeciderService.java @@ -113,6 +113,7 @@ public void setup() { metadataDAO = mock(MetadataDAO.class); externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); QueueDAO queueDAO = mock(QueueDAO.class); + MetadataDAO metadataDAO = mock(MetadataDAO.class); TaskDef taskDef = new TaskDef(); @@ -136,7 +137,7 @@ public void setup() { taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils)); taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO)); - deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); + deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers); } @Test diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 158e55e6bd..862dde8de8 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -114,7 +114,7 @@ public void init() { taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils)); taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO)); - deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); + deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers); MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO); workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config); }