Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
fix for system task with external payload storage
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Oct 24, 2019
1 parent 3f66857 commit 63c9407
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 45 deletions.
Expand Up @@ -308,7 +308,7 @@ void updateWorkflowOutput(final Workflow workflow, @Nullable Task task) {
}

workflow.setOutput(output);
externalPayloadStorageUtils.verifyAndUpload(workflow, PayloadType.WORKFLOW_OUTPUT);
externalizeWorkflowData(workflow);
}

private boolean checkForWorkflowCompletion(final Workflow workflow) throws TerminateWorkflowException {
Expand Down Expand Up @@ -477,6 +477,11 @@ void externalizeTaskData(Task task) {
externalPayloadStorageUtils.verifyAndUpload(task, PayloadType.TASK_OUTPUT);
}

void externalizeWorkflowData(Workflow workflow) {
externalPayloadStorageUtils.verifyAndUpload(workflow, PayloadType.WORKFLOW_INPUT);
externalPayloadStorageUtils.verifyAndUpload(workflow, PayloadType.WORKFLOW_OUTPUT);
}

@VisibleForTesting
void checkForTimeout(TaskDef taskDef, Task task) {

Expand Down
Expand Up @@ -20,8 +20,7 @@
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.valueOf;
import static com.netflix.conductor.common.metadata.workflow.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType.TASK_OUTPUT;
import static com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TERMINATE;
import static com.netflix.conductor.core.execution.ApplicationException.Code.CONFLICT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.INVALID_INPUT;
import static com.netflix.conductor.core.execution.ApplicationException.Code.NOT_FOUND;
Expand Down Expand Up @@ -50,13 +49,11 @@
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -67,7 +64,6 @@
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand All @@ -89,7 +85,6 @@ public class WorkflowExecutor {
private final ExecutionDAOFacade executionDAOFacade;

private WorkflowStatusListener workflowStatusListener;
private ExternalPayloadStorageUtils externalPayloadStorageUtils;

private int activeWorkerLastPollInSecs;
public static final String DECIDER_QUEUE = "_deciderQueue";
Expand All @@ -103,7 +98,6 @@ public WorkflowExecutor(
MetadataMapperService metadataMapperService,
WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
Configuration config
) {
this.deciderService = deciderService;
Expand All @@ -114,7 +108,6 @@ public WorkflowExecutor(
this.executionDAOFacade = executionDAOFacade;
this.activeWorkerLastPollInSecs = config.getIntProperty("tasks.active.worker.lastpoll", 10);
this.workflowStatusListener = workflowStatusListener;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
}

/**
Expand Down Expand Up @@ -386,7 +379,7 @@ public String startWorkflow(

workflow.setInput(workflowInput);
if (workflow.getInput() != null) {
externalPayloadStorageUtils.verifyAndUpload(workflow, WORKFLOW_INPUT);
deciderService.externalizeWorkflowData(workflow);
} else {
workflow.setInput(null);
workflow.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
Expand Down Expand Up @@ -807,7 +800,7 @@ public void updateTask(TaskResult taskResult) {
task.setOutputData(taskResult.getOutputData());

if (task.getOutputData() != null) {
externalPayloadStorageUtils.verifyAndUpload(task, TASK_OUTPUT);
deciderService.externalizeTaskData(task);
} else {
task.setExternalOutputPayloadStoragePath(taskResult.getExternalOutputPayloadStoragePath());
}
Expand Down Expand Up @@ -953,9 +946,16 @@ public boolean decide(String workflowId) {
for (Task task : outcome.tasksToBeScheduled) {
if (isSystemTask.and(isNonTerminalTask).test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());

Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
try {
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflow, task, this)) {
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
// FIXME: temporary hack to workaround TERMINATE task
if (TERMINATE.name().equals(task.getTaskType())) {
workflow.setStatus(workflowInstance.getStatus());
workflow.setOutput(workflowInstance.getOutput());
deciderService.externalizeWorkflowData(workflow);
}
deciderService.externalizeTaskData(task);
tasksToBeUpdated.add(task);
stateChanged = true;
}
Expand Down
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -13,9 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*
*/
package com.netflix.conductor.core.execution;

import com.netflix.conductor.core.WorkflowContext;
Expand Down Expand Up @@ -44,7 +41,7 @@
@Singleton
public class WorkflowSweeper {

private static Logger logger = LoggerFactory.getLogger(WorkflowSweeper.class);
private static final Logger logger = LoggerFactory.getLogger(WorkflowSweeper.class);

private ExecutorService executorService;

Expand Down
Expand Up @@ -124,7 +124,7 @@ public void init() {

DeciderService deciderService = new DeciderService(parametersUtils, queueDAO, metadataDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, externalPayloadStorageUtils, config);
workflowExecutor = new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService, workflowStatusListener, executionDAOFacade, config);
}

@Test
Expand Down
@@ -1,5 +1,21 @@
/*
* Copyright 2019 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution.tasks;

import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.TaskType;
Expand All @@ -12,22 +28,16 @@
import com.netflix.conductor.core.execution.WorkflowStatusListener;
import com.netflix.conductor.core.metadata.MetadataMapperService;
import com.netflix.conductor.core.orchestration.ExecutionDAOFacade;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.dao.QueueDAO;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

/**
* @author Manan
*
Expand All @@ -50,7 +60,6 @@ public class DoWhileTest {
MetadataMapperService metadataMapperService;
WorkflowStatusListener workflowStatusListener ;
ExecutionDAOFacade executionDAOFacade;
ExternalPayloadStorageUtils externalPayloadStorageUtils;
Configuration config;
ParametersUtils parametersUtils;

Expand All @@ -65,10 +74,9 @@ public void setup() {
metadataMapperService = Mockito.mock(MetadataMapperService.class);
workflowStatusListener = Mockito.mock(WorkflowStatusListener.class);
executionDAOFacade = Mockito.mock(ExecutionDAOFacade.class);
externalPayloadStorageUtils = Mockito.mock(ExternalPayloadStorageUtils.class);
config = Mockito.mock(Configuration.class);
provider = spy(new WorkflowExecutor(deciderService, metadataDAO, queueDAO, metadataMapperService,
workflowStatusListener, executionDAOFacade, externalPayloadStorageUtils, config));
workflowStatusListener, executionDAOFacade, config));
loopWorkflowTask1 = new WorkflowTask();
loopWorkflowTask1.setTaskReferenceName("task1__1");
loopWorkflowTask1.setName("task1__1");
Expand Down

0 comments on commit 63c9407

Please sign in to comment.