Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1047 from Netflix/enforce_external_payload_storag…
Browse files Browse the repository at this point in the history
…e_in_server

allow server to enforce external payload storage usage
  • Loading branch information
apanicker-nflx committed Mar 15, 2019
2 parents 1efc707 + bab5706 commit b510f39
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,14 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
.filter(task -> !executedTaskRefNames.contains(task.getReferenceTaskName()))
.collect(Collectors.toList());
if (!unScheduledTasks.isEmpty()) {
LOGGER.debug("Scheduling Tasks {} for workflow: {}", unScheduledTasks.stream()
LOGGER.debug("Scheduling Tasks: {} for workflow: {}", unScheduledTasks.stream()
.map(Task::getTaskDefName)
.collect(Collectors.toList()),
workflow.getWorkflowId());
outcome.tasksToBeScheduled.addAll(unScheduledTasks);
}
if (outcome.tasksToBeScheduled.isEmpty() && checkForWorkflowCompletion(workflow)) {
LOGGER.debug("Marking workflow as complete. workflow=" + workflow.getWorkflowId() + ", tasks=" + workflow.getTasks());
LOGGER.debug("Marking workflow: {} as complete.", workflow);
outcome.isComplete = true;
}

Expand All @@ -218,7 +218,7 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
private List<Task> startWorkflow(Workflow workflow) throws TerminateWorkflowException {
final WorkflowDef workflowDef = workflow.getWorkflowDefinition();

LOGGER.debug("Starting workflow {}, version{}, id {}", workflowDef.getName(), workflowDef.getVersion(), workflow.getWorkflowId());
LOGGER.debug("Starting workflow: {}", workflow);

//The tasks will be empty in case of new workflow
List<Task> tasks = workflow.getTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
Expand All @@ -54,13 +55,14 @@
import java.util.stream.Collectors;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.CANCELED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.FAILED_WITH_TERMINAL_ERROR;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf;
import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType.TASK_OUTPUT;
import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.NOT_FOUND;
Expand All @@ -84,6 +86,7 @@ public class WorkflowExecutor {
private final ExecutionDAOFacade executionDAOFacade;

private WorkflowStatusListener workflowStatusListener;
private ExternalPayloadStorageUtils externalPayloadStorageUtils;

private int activeWorkerLastPollInSecs;
public static final String DECIDER_QUEUE = "_deciderQueue";
Expand All @@ -96,6 +99,7 @@ public WorkflowExecutor(
MetadataMapperService metadataMapperService,
WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
Configuration config
) {
this.deciderService = deciderService;
Expand All @@ -106,6 +110,7 @@ public WorkflowExecutor(
this.executionDAOFacade = executionDAOFacade;
this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10);
this.workflowStatusListener = workflowStatusListener;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
}

/**
Expand Down Expand Up @@ -265,8 +270,6 @@ public String startWorkflow(
workflow.setWorkflowId(workflowId);
workflow.setCorrelationId(correlationId);
workflow.setWorkflowDefinition(workflowDefinition);
workflow.setInput(workflowInput);
workflow.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
workflow.setStatus(WorkflowStatus.RUNNING);
workflow.setParentWorkflowId(parentWorkflowId);
workflow.setParentWorkflowTaskId(parentWorkflowTaskId);
Expand All @@ -277,6 +280,14 @@ public String startWorkflow(
workflow.setEvent(event);
workflow.setTaskToDomain(taskToDomain);

workflow.setInput(workflowInput);
if (workflow.getInput() != null) {
externalPayloadStorageUtils.verifyAndUpload(workflow, WORKFLOW_INPUT);
} else {
workflow.setInput(null);
workflow.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
}

executionDAOFacade.createWorkflow(workflow);
LOGGER.info("A new instance of workflow {} created with workflow id {}", workflow.getWorkflowName(), workflowId);

Expand Down Expand Up @@ -387,9 +398,9 @@ public void rewind(String workflowId, boolean useLatestDefinitions) {
/**
* Gets the last instance of each failed task and reschedule each
* Gets all cancelled tasks and schedule all of them except JOIN (join should change status to INPROGRESS)
* Switch workflow back to RUNNING status and aall decider.
* Switch workflow back to RUNNING status and call decider.
*
* @param workflowId
* @param workflowId the id of the workflow to be retried
*/
public void retry(String workflowId) {
Workflow workflow = executionDAOFacade.getWorkflowById(workflowId, true);
Expand Down Expand Up @@ -668,42 +679,32 @@ public void updateTask(TaskResult taskResult) {
if (workflowInstance.getStatus().isTerminal()) {
// Workflow is in terminal state
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Workflow: {} is in terminal state Task: {} removed from Queue: {} during update task", workflowInstance, task, taskQueueName);
if (!task.getStatus().isTerminal()) {
task.setStatus(COMPLETED);
}
task.setOutputData(taskResult.getOutputData());
task.setOutputMessage(taskResult.getOutputMessage());
task.setExternalOutputPayloadStoragePath(taskResult.getExternalOutputPayloadStoragePath());
task.setReasonForIncompletion(taskResult.getReasonForIncompletion());
task.setWorkerId(taskResult.getWorkerId());
executionDAOFacade.updateTask(task);
String msg = String.format("Workflow %s is already completed as %s, task=%s, reason=%s",
workflowInstance.getWorkflowId(), workflowInstance.getStatus(), task.getTaskType(), workflowInstance.getReasonForIncompletion());
LOGGER.info(msg);
LOGGER.info("Workflow: {} has already finished execution. Task update for: {} ignored and removed from Queue: {}.", workflowInstance, taskResult.getTaskId(), taskQueueName);
Monitors.recordUpdateConflict(task.getTaskType(), workflowInstance.getWorkflowName(), workflowInstance.getStatus());
return;
}

if (task.getStatus().isTerminal()) {
// Task was already updated....
queueDAO.remove(taskQueueName, taskResult.getTaskId());
LOGGER.debug("Task: {} is in terminal state and is removed from the queue {} ", task, taskQueueName);
String msg = String.format("Task is already completed as %s@%d, workflow status=%s, workflowId=%s, taskId=%s",
task.getStatus(), task.getEndTime(), workflowInstance.getStatus(), workflowInstance.getWorkflowId(), task.getTaskId());
LOGGER.info(msg);
LOGGER.info("Task: {} has already finished execution with status:{} at {} within workflow: {}. Removed task from queue: {}", task.getTaskId(), task.getStatus(), task.getEndTime(), workflowInstance.getWorkflowId(), taskQueueName);
Monitors.recordUpdateConflict(task.getTaskType(), workflowInstance.getWorkflowName(), task.getStatus());
return;
}

task.setStatus(valueOf(taskResult.getStatus().name()));
task.setOutputData(taskResult.getOutputData());
task.setOutputMessage(taskResult.getOutputMessage());
task.setExternalOutputPayloadStoragePath(taskResult.getExternalOutputPayloadStoragePath());
task.setReasonForIncompletion(taskResult.getReasonForIncompletion());
task.setWorkerId(taskResult.getWorkerId());
task.setCallbackAfterSeconds(taskResult.getCallbackAfterSeconds());

task.setOutputData(taskResult.getOutputData());
if (task.getOutputData() != null) {
externalPayloadStorageUtils.verifyAndUpload(task, TASK_OUTPUT);
} else {
task.setExternalOutputPayloadStoragePath(taskResult.getExternalOutputPayloadStoragePath());
}

if (task.getStatus().isTerminal()) {
task.setEndTime(System.currentTimeMillis());
}
Expand Down Expand Up @@ -979,7 +980,7 @@ public void addTaskToQueue(Task task) {
public void executeSystemTask(WorkflowSystemTask systemTask, String taskId, int unackTimeout) {
try {
Task task = executionDAOFacade.getTaskById(taskId);
if (task == null){
if (task == null) {
LOGGER.error("TaskId: {} could not be found while executing SystemTask", taskId);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
case WORKFLOW_INPUT:
((Workflow) entity).setInput(null);
((Workflow) entity).setExternalInputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_INPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowType(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_INPUT.toString());
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowName(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_INPUT.toString());
break;
case WORKFLOW_OUTPUT:
((Workflow) entity).setOutput(null);
((Workflow) entity).setExternalOutputPayloadStoragePath(uploadHelper(payloadBytes, payloadSize, PayloadType.WORKFLOW_OUTPUT));
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowType(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_OUTPUT.toString());
Monitors.recordExternalPayloadStorageUsage(((Workflow) entity).getWorkflowName(), ExternalPayloadStorage.Operation.WRITE.toString(), PayloadType.WORKFLOW_OUTPUT.toString());
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.inject.matcher.Matchers;
import com.netflix.conductor.common.metadata.tasks.PollData;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
Expand All @@ -26,7 +25,19 @@
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.mapper.*;
import com.netflix.conductor.core.execution.mapper.DecisionTaskMapper;
import com.netflix.conductor.core.execution.mapper.DynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.LambdaTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.UserDefinedTaskMapper;
import com.netflix.conductor.core.execution.mapper.WaitTaskMapper;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.metadata.MetadataMapperService;
Expand All @@ -35,12 +46,9 @@
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.sun.corba.se.spi.orbutil.threadpool.Work;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -60,7 +68,10 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand All @@ -79,7 +90,6 @@ public class TestWorkflowExecutor {
private MetadataDAO metadataDAO;
private QueueDAO queueDAO;
private WorkflowStatusListener workflowStatusListener;
private DeciderService deciderService;

@Before
public void init() {
Expand All @@ -105,9 +115,9 @@ public void init() {
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("LAMBDA", new LambdaTaskMapper(parametersUtils));

deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, externalPayloadStorageUtils, config);
}

@Test
Expand Down

0 comments on commit b510f39

Please sign in to comment.