Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Remove tasks from Elasticsearch when workflow is removed #3300

Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ private void populateWorkflowOutput(Workflow workflow) {
* Removes a workflow from the system
*
* @param workflowId the id of the workflow to be deleted
* @param archiveWorkflow flag to indicate if the workflow should be archived before deletion
* @param archiveWorkflow flag to indicate if the workflow and associated tasks should be
* archived before deletion
*/
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
Validate.notBlank(workflowId, "Workflow id cannot be blank");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.*;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
Expand Down Expand Up @@ -332,19 +333,46 @@ public void removeFromPendingWorkflow(String workflowType, String workflowId) {
* Removes the workflow from the data store.
*
* @param workflowId the id of the workflow to be removed
* @param archiveWorkflow if true, the workflow will be archived in the {@link IndexDAO} after
* removal from {@link ExecutionDAO}
* @param archiveWorkflow if true, the workflow and associated tasks will be archived in the
* {@link IndexDAO} after removal from {@link ExecutionDAO}.
*/
public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
WorkflowModel workflow = getWorkflowModelFromDataStore(workflowId, true);

executionDAO.removeWorkflow(workflowId);

List<TaskModel> tasks = workflow.getTasks();
try {
removeWorkflowIndex(workflow, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException("Workflow can not be serialized to json", e);
}

executionDAO.removeWorkflow(workflowId);
tasks.forEach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why loop through the tasks twice? A regular for loop might be easier to read here, and couldn't you do both operations through a single loop of the tasks?

Copy link
Contributor Author

@NLRemco NLRemco Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I found it more readable and easier to have two separate loops with separate error handling where queue removal can be controlled separately from index removal.
When it comes to a traditional for loop: I wanted to keep in line with the style of the file. But this can always be changed.

Either way since it’s unnecessary to loop over something twice and now that the order of method has been changed allowing the loops to be merged, I’ll go ahead and merge the loops.

task -> {
try {
removeTaskIndex(workflow, task, archiveWorkflow);
} catch (JsonProcessingException e) {
throw new TransientException(
String.format(
"Task %s of workflow %s can not be serialized to json",
task.getTaskId(), workflow.getWorkflowId()),
e);
}
});

tasks.forEach(
task -> {
try {
queueDAO.remove(QueueUtils.getQueueName(task), task.getTaskId());
} catch (Exception e) {
LOGGER.info(
"Error removing task: {} of workflow: {} from {} queue",
workflowId,
task.getTaskId(),
QueueUtils.getQueueName(task),
e);
}
});

try {
queueDAO.remove(DECIDER_QUEUE, workflowId);
Expand Down Expand Up @@ -509,6 +537,29 @@ public void removeTask(String taskId) {
executionDAO.removeTask(taskId);
}

private void removeTaskIndex(WorkflowModel workflow, TaskModel task, boolean archiveTask)
throws JsonProcessingException {
if (archiveTask) {
if (task.getStatus().isTerminal()) {
// Only allow archival if task is in terminal state
// DO NOT archive async, since if archival errors out, task data will be lost
indexDAO.updateTask(
NLRemco marked this conversation as resolved.
Show resolved Hide resolved
workflow.getWorkflowId(),
task.getTaskId(),
new String[] {ARCHIVED_FIELD},
new Object[] {true});
} else {
throw new IllegalArgumentException(
String.format(
"Cannot archive task: %s of workflow: %s with status: %s",
task.getTaskId(), workflow.getWorkflowId(), task.getStatus()));
}
} else {
// Not archiving, remove task from index
indexDAO.asyncRemoveTask(workflow.getWorkflowId(), task.getTaskId());
}
}

public void extendLease(TaskModel taskModel) {
taskModel.setUpdateTime(System.currentTimeMillis());
executionDAO.updateTask(taskModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,23 @@ public CompletableFuture<Void> asyncUpdateWorkflow(
return CompletableFuture.completedFuture(null);
}

@Override
public void removeTask(String workflowId, String taskId) {}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.completedFuture(null);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.completedFuture(null);
}

@Override
public String get(String workflowInstanceId, String key) {
return null;
Expand Down
39 changes: 39 additions & 0 deletions core/src/main/java/com/netflix/conductor/dao/IndexDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,45 @@ SearchResult<String> searchTasks(
CompletableFuture<Void> asyncUpdateWorkflow(
String workflowInstanceId, String[] keys, Object[] values);

/**
* Remove the task index
*
* @param workflowId workflow containing task
* @param taskId task to be removed
*/
void removeTask(String workflowId, String taskId);

/**
* Remove the task index asynchronously
*
* @param workflowId workflow containing task
* @param taskId task to be removed
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
*/
void updateTask(String workflowId, String taskId, String[] keys, Object[] values);

/**
* Updates the index
*
* @param workflowId id of the workflow
* @param taskId id of the task
* @param keys keys to be updated
* @param values values. Number of keys and values MUST match.
* @return CompletableFuture of type void
*/
CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values);

/**
* Retrieves a specific field from the index
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Workflow getExecutionStatus(
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
void deleteWorkflow(
@NotEmpty(message = "WorkflowId cannot be null or empty.") String workflowId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Workflow getExecutionStatus(String workflowId, boolean includeTasks) {
* Removes the workflow from the system.
*
* @param workflowId WorkflowID of the workflow you want to remove from system.
* @param archiveWorkflow Archives the workflow.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
public void deleteWorkflow(String workflowId, boolean archiveWorkflow) {
executionService.removeWorkflow(workflowId, archiveWorkflow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,21 @@ public void testGetWorkflowsByCorrelationId() {
@Test
public void testRemoveWorkflow() {
WorkflowModel workflow = new WorkflowModel();
workflow.setWorkflowId("workflowId");
workflow.setStatus(WorkflowModel.Status.COMPLETED);

TaskModel task = new TaskModel();
task.setTaskId("taskId");
workflow.setTasks(Collections.singletonList(task));

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", false);
verify(indexDAO, never()).updateWorkflow(any(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(workflow.getWorkflowId());
verify(executionDAO, times(1)).removeWorkflow(anyString());
verify(executionDAO, never()).removeTask(anyString());
verify(indexDAO, never()).updateWorkflow(anyString(), any(), any());
verify(indexDAO, never()).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, times(1)).asyncRemoveWorkflow(anyString());
verify(indexDAO, times(1)).asyncRemoveTask(anyString(), anyString());
}

@Test
Expand All @@ -153,8 +163,12 @@ public void testArchiveWorkflow() throws Exception {

when(executionDAO.getWorkflow(anyString(), anyBoolean())).thenReturn(workflow);
executionDAOFacade.removeWorkflow("workflowId", true);
verify(indexDAO, times(1)).updateWorkflow(any(), any(), any());
verify(indexDAO, never()).removeWorkflow(any());
verify(executionDAO, times(1)).removeWorkflow(anyString());
verify(executionDAO, never()).removeTask(anyString());
verify(indexDAO, times(1)).updateWorkflow(anyString(), any(), any());
verify(indexDAO, times(15)).updateTask(anyString(), anyString(), any(), any());
verify(indexDAO, never()).removeWorkflow(anyString());
verify(indexDAO, never()).removeTask(anyString(), anyString());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@
import static com.netflix.conductor.TestUtils.getConstraintViolationMessages;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -184,12 +179,30 @@ public void testNotFoundExceptionGetExecutionStatus() {

@Test
public void testDeleteWorkflow() {
workflowService.deleteWorkflow("w123", true);
verify(executionService, times(1)).removeWorkflow(anyString(), anyBoolean());
workflowService.deleteWorkflow("w123", false);
verify(executionService, times(1)).removeWorkflow(anyString(), eq(false));
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidDeleteWorkflow() {
try {
workflowService.deleteWorkflow(null, false);
} catch (ConstraintViolationException ex) {
assertEquals(1, ex.getConstraintViolations().size());
Set<String> messages = getConstraintViolationMessages(ex.getConstraintViolations());
assertTrue(messages.contains("WorkflowId cannot be null or empty."));
throw ex;
}
}

@Test
public void testArchiveWorkflow() {
workflowService.deleteWorkflow("w123", true);
verify(executionService, times(1)).removeWorkflow(anyString(), eq(true));
}

@Test(expected = ConstraintViolationException.class)
public void testInvalidArchiveWorkflow() {
try {
workflowService.deleteWorkflow(null, true);
} catch (ConstraintViolationException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,98 @@ public CompletableFuture<Void> asyncUpdateWorkflow(
() -> updateWorkflow(workflowInstanceId, keys, values), executorService);
}

@Override
public void removeTask(String workflowId, String taskId) {
try {
long startTime = Instant.now().toEpochMilli();
String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride;

SearchResult<String> taskSearchResult =
searchTasks(
String.format(
"(taskId='%s') AND (workflowId='%s')", taskId, workflowId),
"*",
0,
1,
null);

if (taskSearchResult.getTotalHits() == 0) {
LOGGER.error("Task: {} does not belong to workflow: {}", taskId, workflowId);
Monitors.error(CLASS_NAME, "removeTask");
return;
}

DeleteRequest request = new DeleteRequest(taskIndexName, docType, taskId);
DeleteResponse response = elasticSearchClient.delete(request).actionGet();
long endTime = Instant.now().toEpochMilli();

if (response.getResult() != DocWriteResponse.Result.DELETED) {
LOGGER.error(
"Index removal failed - task not found by id: {} of workflow: {}",
taskId,
workflowId);
Monitors.error(CLASS_NAME, "removeTask");
return;
}
LOGGER.debug(
"Time taken {} for removing task:{} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("remove_task", docType, endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
} catch (Exception e) {
LOGGER.error(
"Failed to remove task: {} of workflow: {} from index", taskId, workflowId, e);
Monitors.error(CLASS_NAME, "removeTask");
}
}

@Override
public CompletableFuture<Void> asyncRemoveTask(String workflowId, String taskId) {
return CompletableFuture.runAsync(() -> removeTask(workflowId, taskId), executorService);
}

@Override
public void updateTask(String workflowId, String taskId, String[] keys, Object[] values) {
if (keys.length != values.length) {
throw new IllegalArgumentException("Number of keys and values do not match");
}

long startTime = Instant.now().toEpochMilli();
String docType = StringUtils.isBlank(docTypeOverride) ? TASK_DOC_TYPE : docTypeOverride;

UpdateRequest request = new UpdateRequest(taskIndexName, docType, taskId);
Map<String, Object> source =
IntStream.range(0, keys.length)
.boxed()
.collect(Collectors.toMap(i -> keys[i], i -> values[i]));
request.doc(source);
LOGGER.debug(
"Updating task: {} of workflow: {} in elasticsearch index: {}",
taskId,
workflowId,
taskIndexName);
elasticSearchClient.update(request).actionGet();
long endTime = Instant.now().toEpochMilli();
LOGGER.debug(
"Time taken {} for updating task: {} of workflow: {}",
endTime - startTime,
taskId,
workflowId);
Monitors.recordESIndexTime("update_task", docType, endTime - startTime);
Monitors.recordWorkerQueueSize(
"indexQueue", ((ThreadPoolExecutor) executorService).getQueue().size());
}

@Override
public CompletableFuture<Void> asyncUpdateTask(
String workflowId, String taskId, String[] keys, Object[] values) {
return CompletableFuture.runAsync(
() -> updateTask(workflowId, taskId, keys, values), executorService);
}

@Override
public String get(String workflowInstanceId, String fieldToGet) {
String docType = StringUtils.isBlank(docTypeOverride) ? WORKFLOW_DOC_TYPE : docTypeOverride;
Expand Down