Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1818 from elisherer/feaure/lambda-taskdef-support
Browse files Browse the repository at this point in the history
LAMBDA - add support for task definition's inputTemplate
  • Loading branch information
apanicker-nflx committed Aug 10, 2020
2 parents 7d1caa2 + 9cad0eb commit e0bc5db
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 6 deletions.
Expand Up @@ -222,8 +222,8 @@ public TaskMapper getHTTPTaskMapper(ParametersUtils parametersUtils, MetadataDAO
@StringMapKey(TASK_TYPE_LAMBDA)
@Singleton
@Named(TASK_MAPPERS_QUALIFIER)
public TaskMapper getLambdaTaskMapper(ParametersUtils parametersUtils) {
return new LambdaTaskMapper(parametersUtils);
public TaskMapper getLambdaTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
return new LambdaTaskMapper(parametersUtils, metadataDAO);
}

@ProvidesIntoMap
Expand Down
Expand Up @@ -13,13 +13,16 @@
package com.netflix.conductor.core.execution.mapper;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.tasks.Lambda;
import com.netflix.conductor.dao.MetadataDAO;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,9 +33,11 @@ public class LambdaTaskMapper implements TaskMapper {

public static final Logger LOGGER = LoggerFactory.getLogger(LambdaTaskMapper.class);
private final ParametersUtils parametersUtils;
private final MetadataDAO metadataDAO;

public LambdaTaskMapper(ParametersUtils parametersUtils) {
public LambdaTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
this.parametersUtils = parametersUtils;
this.metadataDAO = metadataDAO;
}

@Override
Expand All @@ -44,8 +49,12 @@ public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) {
Workflow workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();

TaskDef taskDefinition = Optional.ofNullable(taskMapperContext.getTaskDefinition())
.orElseGet(() -> Optional.ofNullable(metadataDAO.getTaskDef(taskToSchedule.getName()))
.orElse(null));

Map<String, Object> taskInput = parametersUtils
.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, null);
.getTaskInputV2(taskMapperContext.getTaskToSchedule().getInputParameters(), workflowInstance, taskId, taskDefinition);

Task lambdaTask = new Task();
lambdaTask.setTaskType(Lambda.TASK_NAME);
Expand Down
Expand Up @@ -122,7 +122,7 @@ public void init() {
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("LAMBDA", new LambdaTaskMapper(parametersUtils));
taskMappers.put("LAMBDA", new LambdaTaskMapper(parametersUtils, metadataDAO));

DeciderService deciderService = new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, taskMappers, config);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
Expand Down
Expand Up @@ -8,6 +8,7 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.dao.MetadataDAO;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -20,17 +21,21 @@
public class LambdaTaskMapperTest {

private ParametersUtils parametersUtils;
private MetadataDAO metadataDAO;

@Before
public void setUp() {
parametersUtils = mock(ParametersUtils.class);
metadataDAO = mock(MetadataDAO.class);
}

@Test
public void getMappedTasks() throws Exception {

WorkflowTask taskToSchedule = new WorkflowTask();
taskToSchedule.setName("lambda_task");
taskToSchedule.setType(TaskType.LAMBDA.name());
taskToSchedule.setTaskDefinition(new TaskDef("lambda_task"));
taskToSchedule.setScriptExpression("if ($.input.a==1){return {testValue: true}} else{return {testValue: false} }");

String taskId = IDGenerator.generate();
Expand All @@ -49,7 +54,37 @@ public void getMappedTasks() throws Exception {
.build();


List<Task> mappedTasks = new LambdaTaskMapper(parametersUtils).getMappedTasks(taskMapperContext);
List<Task> mappedTasks = new LambdaTaskMapper(parametersUtils, metadataDAO).getMappedTasks(taskMapperContext);

assertEquals(1, mappedTasks.size());
assertNotNull(mappedTasks);
assertEquals(TaskType.LAMBDA.name(), mappedTasks.get(0).getTaskType());
}

@Test
public void getMappedTasks_WithoutTaskDef() throws Exception {

WorkflowTask taskToSchedule = new WorkflowTask();
taskToSchedule.setType(TaskType.LAMBDA.name());
taskToSchedule.setScriptExpression("if ($.input.a==1){return {testValue: true}} else{return {testValue: false} }");

String taskId = IDGenerator.generate();

WorkflowDef wd = new WorkflowDef();
Workflow w = new Workflow();
w.setWorkflowDefinition(wd);

TaskMapperContext taskMapperContext = TaskMapperContext.newBuilder()
.withWorkflowDefinition(wd)
.withWorkflowInstance(w)
.withTaskDefinition(null)
.withTaskToSchedule(taskToSchedule)
.withRetryCount(0)
.withTaskId(taskId)
.build();


List<Task> mappedTasks = new LambdaTaskMapper(parametersUtils, metadataDAO).getMappedTasks(taskMapperContext);

assertEquals(1, mappedTasks.size());
assertNotNull(mappedTasks);
Expand Down

0 comments on commit e0bc5db

Please sign in to comment.