diff --git a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java index f68765fa17..e7bd12584e 100644 --- a/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java +++ b/common/src/main/java/com/netflix/conductor/common/metadata/workflow/WorkflowDef.java @@ -18,6 +18,11 @@ */ package com.netflix.conductor.common.metadata.workflow; +import com.github.vmg.protogen.annotations.ProtoField; +import com.github.vmg.protogen.annotations.ProtoMessage; +import com.google.common.base.MoreObjects; +import com.netflix.conductor.common.metadata.Auditable; + import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -26,11 +31,6 @@ import java.util.Objects; import java.util.Optional; -import com.github.vmg.protogen.annotations.ProtoField; -import com.github.vmg.protogen.annotations.ProtoMessage; -import com.google.common.base.MoreObjects; -import com.netflix.conductor.common.metadata.Auditable; - /** * @author Viren * @@ -66,6 +66,9 @@ public class WorkflowDef extends Auditable { @ProtoField(id = 9) private boolean restartable = true; + @ProtoField(id = 10) + private boolean workflowStatusListenerEnabled = false; + /** * @return the name */ @@ -201,6 +204,22 @@ public void setSchemaVersion(int schemaVersion) { this.schemaVersion = schemaVersion; } + /** + * + * @return true is workflow listener will be invoked when workflow gets into a terminal state + */ + public boolean isWorkflowStatusListenerEnabled() { + return workflowStatusListenerEnabled; + } + + /** + * Specify if workflow listener is enabled to invoke a callback for completed or terminated workflows + * @param workflowStatusListenerEnabled + */ + public void setWorkflowStatusListenerEnabled(boolean workflowStatusListenerEnabled) { + this.workflowStatusListenerEnabled = workflowStatusListenerEnabled; + } + public String key(){ return getKey(name, version); } @@ -287,6 +306,7 @@ public String toString() { .add("failureWorkflow", failureWorkflow) .add("schemaVersion", schemaVersion) .add("restartable", restartable) + .add("workflowStatusListenerEnabled", workflowStatusListenerEnabled) .toString(); } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index d468f0d6f9..d7722acc11 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -90,6 +90,8 @@ public class WorkflowExecutor { private final MetadataMapperService metadataMapperService; private final ParametersUtils parametersUtils; + private WorkflowStatusListener workflowStatusListener; + private int activeWorkerLastPollInSecs; public static final String DECIDER_QUEUE = "_deciderQueue"; @@ -103,6 +105,7 @@ public WorkflowExecutor( QueueDAO queueDAO, MetadataMapperService metadataMapperService, ParametersUtils parametersUtils, + WorkflowStatusListener workflowStatusListener, Configuration config ) { this.deciderService = deciderService; @@ -113,6 +116,7 @@ public WorkflowExecutor( this.metadataMapperService = metadataMapperService; this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10); this.parametersUtils = parametersUtils; + this.workflowStatusListener = workflowStatusListener; } /** @@ -535,6 +539,10 @@ void completeWorkflow(Workflow wf) { Monitors.recordWorkflowCompletion(workflow.getWorkflowName(), workflow.getEndTime() - workflow.getStartTime(), wf.getOwnerApp()); queueDAO.remove(DECIDER_QUEUE, workflow.getWorkflowId()); //remove from the sweep queue logger.debug("Removed workflow {} from decider queue", wf.getWorkflowId()); + + if(wf.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) { + workflowStatusListener.onWorkflowCompleted(wf); + } } public void terminateWorkflow(String workflowId, String reason) { @@ -629,6 +637,10 @@ public void terminateWorkflow(Workflow workflow, String reason, String failureWo // Send to atlas Monitors.recordWorkflowTermination(workflow.getWorkflowName(), workflow.getStatus(), workflow.getOwnerApp()); + + if(workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()) { + workflowStatusListener.onWorkflowTerminated(workflow); + } } /** diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java new file mode 100644 index 0000000000..d9e6aa09d8 --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutorModule.java @@ -0,0 +1,14 @@ +package com.netflix.conductor.core.execution; + +import com.google.inject.AbstractModule; + +/** + * Default implementation for the workflow status listener + * + */ +public class WorkflowExecutorModule extends AbstractModule { + @Override + protected void configure() { + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class);//default implementation + } +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java new file mode 100644 index 0000000000..a70a4ea27e --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListener.java @@ -0,0 +1,12 @@ +package com.netflix.conductor.core.execution; + +import com.netflix.conductor.common.run.Workflow; + +/** + * Listener for the completed and terminated workflows + * + */ +public interface WorkflowStatusListener { + void onWorkflowCompleted(Workflow workflow); + void onWorkflowTerminated(Workflow workflow); +} diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java new file mode 100644 index 0000000000..25091cd54d --- /dev/null +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowStatusListenerStub.java @@ -0,0 +1,24 @@ +package com.netflix.conductor.core.execution; + +import com.netflix.conductor.common.run.Workflow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stub listener default implementation + */ +public class WorkflowStatusListenerStub implements WorkflowStatusListener { + + private static final Logger LOG = LoggerFactory.getLogger(WorkflowStatusListenerStub.class); + + @Override + public void onWorkflowCompleted(Workflow workflow) { + LOG.debug("Workflow {} is completed", workflow.getWorkflowId()); + } + + @Override + public void onWorkflowTerminated(Workflow workflow) { + LOG.debug("Workflow {} is terminated", workflow.getWorkflowId()); + } + +} diff --git a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java index 04cc50f5ca..4f48a1d5ed 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/TestWorkflowExecutor.java @@ -68,6 +68,8 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -79,6 +81,7 @@ public class TestWorkflowExecutor { private ExecutionDAO executionDAO; private MetadataDAO metadataDAO; private QueueDAO queueDAO; + private WorkflowStatusListener workflowStatusListener; @Before public void init() { @@ -86,6 +89,7 @@ public void init() { executionDAO = mock(ExecutionDAO.class); metadataDAO = mock(MetadataDAO.class); queueDAO = mock(QueueDAO.class); + workflowStatusListener = mock(WorkflowStatusListener.class); ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class); ObjectMapper objectMapper = new ObjectMapper(); ParametersUtils parametersUtils = new ParametersUtils(); @@ -103,7 +107,8 @@ public void init() { DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers); MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO); - workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService, parametersUtils, config); + workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, executionDAO, queueDAO, metadataMapperService, + parametersUtils, workflowStatusListener, config); } @Test @@ -266,8 +271,63 @@ public void testCompleteWorkflow() { assertEquals(1, updateWorkflowCalledCounter.get()); assertEquals(1, updateTasksCalledCounter.get()); assertEquals(1, removeQueueEntryCalledCounter.get()); + + verify(workflowStatusListener, times(0)).onWorkflowCompleted(any(Workflow.class)); + + def.setWorkflowStatusListenerEnabled(true); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflowExecutor.completeWorkflow(workflow); + verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); } + @Test + @SuppressWarnings("unchecked") + public void testTerminatedWorkflow() { + WorkflowDef def = new WorkflowDef(); + def.setName("test"); + + Workflow workflow = new Workflow(); + workflow.setWorkflowDefinition(def); + workflow.setWorkflowId("1"); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflow.setOwnerApp("junit_test"); + workflow.setStartTime(10L); + workflow.setEndTime(100L); + workflow.setOutput(Collections.EMPTY_MAP); + + when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow); + + AtomicInteger updateWorkflowCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + updateWorkflowCalledCounter.incrementAndGet(); + return null; + }).when(executionDAO).updateWorkflow(any()); + + AtomicInteger updateTasksCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + updateTasksCalledCounter.incrementAndGet(); + return null; + }).when(executionDAO).updateTasks(any()); + + AtomicInteger removeQueueEntryCalledCounter = new AtomicInteger(0); + doAnswer(invocation -> { + removeQueueEntryCalledCounter.incrementAndGet(); + return null; + }).when(queueDAO).remove(anyString(), anyString()); + + workflowExecutor.terminateWorkflow("workflowId", "reason"); + assertEquals(Workflow.WorkflowStatus.TERMINATED, workflow.getStatus()); + assertEquals(1, updateWorkflowCalledCounter.get()); + assertEquals(1, removeQueueEntryCalledCounter.get()); + + verify(workflowStatusListener, times(0)).onWorkflowTerminated(any(Workflow.class)); + + def.setWorkflowStatusListenerEnabled(true); + workflow.setStatus(Workflow.WorkflowStatus.RUNNING); + workflowExecutor.completeWorkflow(workflow); + verify(workflowStatusListener, times(1)).onWorkflowCompleted(any(Workflow.class)); + } + @Test public void testGetFailedTasksToRetry() { //setup diff --git a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java index 978401accf..c7675f2e6f 100644 --- a/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java +++ b/grpc/src/main/java/com/netflix/conductor/grpc/AbstractProtoMapper.java @@ -1013,6 +1013,7 @@ public WorkflowDefPb.WorkflowDef toProto(WorkflowDef from) { } to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.isRestartable() ); + to.setWorkflowStatusListenerEnabled( from.isWorkflowStatusListenerEnabled() ); return to.build(); } @@ -1031,6 +1032,7 @@ public WorkflowDef fromProto(WorkflowDefPb.WorkflowDef from) { to.setFailureWorkflow( from.getFailureWorkflow() ); to.setSchemaVersion( from.getSchemaVersion() ); to.setRestartable( from.getRestartable() ); + to.setWorkflowStatusListenerEnabled( from.getWorkflowStatusListenerEnabled() ); return to; } diff --git a/grpc/src/main/proto/model/workflowdef.proto b/grpc/src/main/proto/model/workflowdef.proto index 9e5be4f627..1224b6267d 100644 --- a/grpc/src/main/proto/model/workflowdef.proto +++ b/grpc/src/main/proto/model/workflowdef.proto @@ -18,4 +18,5 @@ message WorkflowDef { string failure_workflow = 7; int32 schema_version = 8; bool restartable = 9; + bool workflow_status_listener_enabled = 10; } diff --git a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java index 7cba287e1b..350665a9b9 100644 --- a/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java +++ b/server/src/main/java/com/netflix/conductor/bootstrap/ModulesProvider.java @@ -7,6 +7,7 @@ import com.netflix.conductor.contribs.http.RestClientManager; import com.netflix.conductor.contribs.json.JsonJqTransform; import com.netflix.conductor.core.config.Configuration; +import com.netflix.conductor.core.execution.WorkflowExecutorModule; import com.netflix.conductor.core.utils.DummyPayloadStorage; import com.netflix.conductor.core.utils.S3PayloadStorage; import com.netflix.conductor.dao.RedisWorkflowModule; @@ -91,6 +92,8 @@ private List selectModulesToLoad() { modules.add(new ElasticSearchV5Module()); + modules.add(new WorkflowExecutorModule()); + if (configuration.getJerseyEnabled()) { modules.add(new JerseyModule()); modules.add(new SwaggerModule()); diff --git a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java index 695a989087..d2ca90ce66 100644 --- a/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java +++ b/test-harness/src/test/java/com/netflix/conductor/tests/utils/TestModule.java @@ -19,6 +19,8 @@ import com.netflix.conductor.common.utils.JsonMapperProvider; import com.netflix.conductor.core.config.Configuration; import com.netflix.conductor.core.config.CoreModule; +import com.netflix.conductor.core.execution.WorkflowStatusListenerStub; +import com.netflix.conductor.core.execution.WorkflowStatusListener; import com.netflix.conductor.dao.ExecutionDAO; import com.netflix.conductor.dao.IndexDAO; import com.netflix.conductor.dao.MetadataDAO; @@ -60,6 +62,8 @@ protected void configure() { bind(QueueDAO.class).to(DynoQueueDAO.class); bind(IndexDAO.class).to(MockIndexDAO.class); + bind(WorkflowStatusListener.class).to(WorkflowStatusListenerStub.class); + install(new CoreModule()); bind(UserTask.class).asEagerSingleton(); bind(ObjectMapper.class).toProvider(JsonMapperProvider.class);