Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1409 from Netflix/reset_workflow_upon_restart
Browse files Browse the repository at this point in the history
reset workflow upon restarts
  • Loading branch information
apanicker-nflx committed Nov 22, 2019
2 parents 65725a5 + fb81429 commit f1ca640
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
Expand Up @@ -488,8 +488,9 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {
throw new ApplicationException(CONFLICT, String.format("Workflow: %s is non-restartable", workflow));
}

// Remove the workflow from the primary datastore (archive in indexer) and re-create it
executionDAOFacade.removeWorkflow(workflowId, true);
// Reset the workflow in the primary datastore and archive in indexer; then re-create it
executionDAOFacade.resetWorkflow(workflowId);

workflow.getTasks().clear();
workflow.setReasonForIncompletion(null);
workflow.setStartTime(System.currentTimeMillis());
Expand Down
Expand Up @@ -262,6 +262,32 @@ public void removeWorkflow(String workflowId, boolean archiveWorkflow) {
}
}

/**
* Reset the workflow state by removing from the {@link ExecutionDAO} and
* archiving this workflow in the {@link IndexDAO}.
*
* @param workflowId the workflow id to be reset
*/
public void resetWorkflow(String workflowId) {
try {
Workflow workflow = getWorkflowById(workflowId, true);
if (config.enableAsyncIndexing()) {
indexDAO.asyncUpdateWorkflow(workflowId,
new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD},
new Object[]{objectMapper.writeValueAsString(workflow), true});
} else {
indexDAO.updateWorkflow(workflowId,
new String[]{RAW_JSON_FIELD, ARCHIVED_FIELD},
new Object[]{objectMapper.writeValueAsString(workflow), true});
}
executionDAO.removeWorkflow(workflowId);
} catch (ApplicationException ae) {
throw ae;
} catch (Exception e) {
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, "Error resetting workflow state: " + workflowId, e);
}
}

public List<Task> createTasks(List<Task> tasks) {
return executionDAO.createTasks(tasks);
}
Expand Down
Expand Up @@ -195,7 +195,7 @@ public static void recordTaskResponseTimeout(String taskType) {
}

public static void recordTaskPendingTime(String taskType, String workflowType, long duration) {
getTimer(classQualifier, "task_pending_time", "workflowName", workflowType, "taskType", taskType).record(duration, TimeUnit.MILLISECONDS);
gauge(classQualifier, "task_pending_time", duration, "workflowName", workflowType, "taskType", taskType);
}

public static void recordWorkflowTermination(String workflowType, WorkflowStatus status, String ownerApp) {
Expand Down

0 comments on commit f1ca640

Please sign in to comment.