Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
WIP Fix for System tasks that fail due to external payload
Browse files Browse the repository at this point in the history
  • Loading branch information
nkamath-nflx committed Oct 4, 2019
1 parent 3e4d2de commit 89be559
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 14 deletions.
Expand Up @@ -18,11 +18,13 @@
*/
package com.netflix.conductor.core.execution.tasks;

import com.google.inject.Inject;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.common.run.Workflow.WorkflowStatus;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,8 +41,9 @@ public class SubWorkflow extends WorkflowSystemTask {
public static final String NAME = "SUB_WORKFLOW";
public static final String SUB_WORKFLOW_ID = "subWorkflowId";

public SubWorkflow() {
super(NAME);
@Inject
public SubWorkflow(ExternalPayloadStorageUtils externalPayloadStorageUtils) {
super(NAME, externalPayloadStorageUtils);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -72,12 +75,17 @@ public void start(Workflow workflow, Task task, WorkflowExecutor provider) {
}
}

@Override
public boolean execute(Workflow workflow, Task task, WorkflowExecutor provider) {
String workflowId = (String) task.getOutputData().get(SUB_WORKFLOW_ID);
private String getWorkflowId(Task task) {
String workflowId = (String) getOutputPayload(task).get(SUB_WORKFLOW_ID);
if (workflowId == null) {
workflowId = (String) task.getInputData().get(SUB_WORKFLOW_ID); //Backward compatibility
workflowId = (String) getInputPayload(task).get(SUB_WORKFLOW_ID); //Backward compatibility
}
return workflowId;
}

@Override
public boolean execute(Workflow workflow, Task task, WorkflowExecutor provider) {
String workflowId = getWorkflowId(task);

if(StringUtils.isEmpty(workflowId)) {
return false;
Expand All @@ -100,10 +108,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor provider)

@Override
public void cancel(Workflow workflow, Task task, WorkflowExecutor provider) {
String workflowId = (String) task.getOutputData().get(SUB_WORKFLOW_ID);
if(workflowId == null) {
workflowId = (String) task.getInputData().get(SUB_WORKFLOW_ID); //Backward compatibility
}
String workflowId = getWorkflowId(task);

if(StringUtils.isEmpty(workflowId)) {
return;
Expand Down
Expand Up @@ -21,11 +21,15 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;

/**
* @author Viren
Expand All @@ -36,13 +40,21 @@ public class WorkflowSystemTask {
private static Map<String, WorkflowSystemTask> registry = new HashMap<>();

private String name;
private ExternalPayloadStorageUtils externalPayloadStorageUtils;

public WorkflowSystemTask(String name) {
this.name = name;
this.externalPayloadStorageUtils = null;

registry.put(name, this);
SystemTaskWorkerCoordinator.add(this);
}

public WorkflowSystemTask(String name, ExternalPayloadStorageUtils externalPayloadStorageUtils) {
this(name);
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
}

/**
* Start the task execution
* @param workflow Workflow for which the task is being started
Expand Down Expand Up @@ -128,5 +140,30 @@ public static WorkflowSystemTask get(String type) {
public static Collection<WorkflowSystemTask> all() {
return registry.values();
}


private Map<String, Object> getPayload(Supplier<Map<String, Object>> defaultPayloadSupplier,
Supplier<String> payloadPathSupplier) {
Map<String, Object> payload = defaultPayloadSupplier.get();
if(payload == null) {
String externalPayloadPath = payloadPathSupplier.get();
if(externalPayloadPath == null) {
throw new TerminateWorkflowException("Could not find neither the payload not an external-path");
}
else if(externalPayloadStorageUtils == null) {
throw new TerminateWorkflowException("Found externalStorageLocation on a system-task which " +
"is not setup to download external payload");
} else {
payload = externalPayloadStorageUtils.downloadPayload(payloadPathSupplier.get());
}
}
return payload;
}

Map<String, Object> getInputPayload(Task task) {
return this.getPayload(task::getInputData, task::getExternalInputPayloadStoragePath);
}

Map<String, Object> getOutputPayload(Task task) {
return this.getPayload(task::getOutputData, task::getExternalOutputPayloadStoragePath);
}
}
Expand Up @@ -29,7 +29,7 @@ public class TestSubWorkflow {
@Before
public void setup() {
workflowExecutor = mock(WorkflowExecutor.class);
subWorkflow = new SubWorkflow();
subWorkflow = new SubWorkflow(null);
}

@Test
Expand Down Expand Up @@ -260,7 +260,7 @@ public void testCancelWithoutWorkflowId() {

@Test
public void testIsAsync() {
SubWorkflow subWorkflow = new SubWorkflow();
SubWorkflow subWorkflow = new SubWorkflow(null);
assertFalse(subWorkflow.isAsync());
}
}
Expand Up @@ -32,7 +32,7 @@ public class TestSystemTasks {

@Test
public void test(){
new SubWorkflow();
new SubWorkflow(null);
assertTrue(SystemTaskType.is(SystemTaskType.JOIN.name()));
assertTrue(SystemTaskType.is(SystemTaskType.FORK.name()));
assertTrue(SystemTaskType.is(SystemTaskType.DECISION.name()));
Expand Down

0 comments on commit 89be559

Please sign in to comment.