diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/ArchivingWorkflowModule.java b/contribs/src/main/java/com/netflix/conductor/contribs/ArchivingWorkflowModule.java new file mode 100644 index 0000000000..3f56b49b5f --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/ArchivingWorkflowModule.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.conductor.contribs; + +import com.google.inject.AbstractModule; +import com.netflix.conductor.contribs.listener.ArchivingWorkflowStatusListener; +import com.netflix.conductor.core.execution.WorkflowStatusListener; + +/** + * @author pavel.halabala + */ +public class ArchivingWorkflowModule extends AbstractModule { + + @Override + protected void configure() { + bind(WorkflowStatusListener.class).to(ArchivingWorkflowStatusListener.class); + } +} diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListener.java b/contribs/src/main/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListener.java new file mode 100644 index 0000000000..b53d3b6d03 --- /dev/null +++ b/contribs/src/main/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListener.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.conductor.contribs.listener; + +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.execution.WorkflowStatusListener; +import com.netflix.conductor.core.orchestration.ExecutionDAOFacade; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.inject.Inject; + +/** + * Provides default implementation of workflow archiving immediately after workflow is completed or terminated. + * + * @author pavel.halabala + */ +public class ArchivingWorkflowStatusListener implements WorkflowStatusListener { + + private static final Logger LOGGER = LoggerFactory.getLogger(ArchivingWorkflowStatusListener.class); + + private final ExecutionDAOFacade executionDAOFacade; + + @Inject + public ArchivingWorkflowStatusListener(ExecutionDAOFacade executionDAOFacade) { + this.executionDAOFacade = executionDAOFacade; + } + + @Override + public void onWorkflowCompleted(Workflow workflow) { + LOGGER.info("Archiving workflow {} on completion ", workflow.getWorkflowId()); + this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true); + } + + @Override + public void onWorkflowTerminated(Workflow workflow) { + LOGGER.info("Archiving workflow {} on termination", workflow.getWorkflowId()); + this.executionDAOFacade.removeWorkflow(workflow.getWorkflowId(), true); + } +} diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java new file mode 100644 index 0000000000..9a2f870a6b --- /dev/null +++ b/contribs/src/test/java/com/netflix/conductor/contribs/listener/ArchivingWorkflowStatusListenerTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2020 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.conductor.contribs.listener; + +import com.netflix.conductor.common.run.Workflow; +import com.netflix.conductor.core.orchestration.ExecutionDAOFacade; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.UUID; + +import static org.mockito.Mockito.*; + +/** + * @author pavel.halabala + */ +public class ArchivingWorkflowStatusListenerTest { + + Workflow wf; + ExecutionDAOFacade executionDAOFacade; + ArchivingWorkflowStatusListener cut; + + @Before + public void before() { + wf = new Workflow(); + wf.setWorkflowId(UUID.randomUUID().toString()); + + executionDAOFacade = Mockito.mock(ExecutionDAOFacade.class); + cut = new ArchivingWorkflowStatusListener(executionDAOFacade); + } + + @Test + public void testArchiveOnWorkflowCompleted() { + cut.onWorkflowCompleted(wf); + verify(executionDAOFacade, times(1)) + .removeWorkflow(wf.getWorkflowId(), true); + verifyNoMoreInteractions(executionDAOFacade); + } + + @Test + public void testArchiveOnWorkflowTerminated() { + cut.onWorkflowTerminated(wf); + verify(executionDAOFacade, times(1)) + .removeWorkflow(wf.getWorkflowId(), true); + verifyNoMoreInteractions(executionDAOFacade); + } +} diff --git a/docs/docs/server.md b/docs/docs/server.md index 6e7421e201..e39eb39acb 100644 --- a/docs/docs/server.md +++ b/docs/docs/server.md @@ -142,3 +142,34 @@ Optionally, configure the default timeouts: zk.sessionTimeoutMs zk.connectionTimeoutMs ``` + +## Default Workflow Archiving Module Configuration + +Conductor server does not perform automated workflow execution data cleaning by default. Archiving module (if enabled) +removes all execution data from conductor persistence storage immediately upon workflow completion or termination, +but keeps archived index data in elastic search. + +To benefit form archiving module you have to do the following: + +### 1. Enable Archiving Module + +Set property in server configuration. + +```properties +# Comma-separated additional conductor modules +conductor.additional.modules=com.netflix.conductor.contribs.ArchivingWorkflowModule +``` + +### 2. Enable Workflow Status Listener + +Archiving module is triggered only if workflow status listener is enabled on workflow definition level. To enable it +you have to set `workflowStatusListenerEnabled` property to `true`. See sample workflow definition below: + +```json +{ + "name": "e2e_approval_v4", + "description": "Approval Process", + "workflowStatusListenerEnabled": true, + "tasks": [] +} +```