Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge branch 'dev' of https://github.com/Netflix/conductor into bugfi…
Browse files Browse the repository at this point in the history
…x/support_env_variable_task_input_param
  • Loading branch information
falu2010-netflix committed Jan 5, 2019
2 parents 5803463 + 49df9a3 commit df88d81
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package com.netflix.conductor.cassandra;

import com.datastax.driver.core.ConsistencyLevel;
import com.netflix.conductor.core.config.Configuration;

public interface CassandraConfiguration extends Configuration {
Expand Down Expand Up @@ -40,6 +41,12 @@ public interface CassandraConfiguration extends Configuration {
String CASSANDRA_SHARD_SIZE_PROPERTY_KEY = "workflow.cassandra.shard.size";
int CASSANDRA_SHARD_SIZE_DEFAULT_VALUE = 100;

String CASSANDRA_READ_CONSISTENCY_LEVEL = "workflow.cassandra.read.consistency.level";
String CASSANDRA_READ_CONSISTENCY_LEVEL_DEFAULT_VALUE = "LOCAL_QUORUM";

String CASSANDRA_WRITE_CONSISTENCY_LEVEL = "workflow.cassandra.write.consistency.level";
String CASSANDRA_WRITE_CONSISTENCY_LEVEL_DEFAULT_VALUE = "LOCAL_QUORUM";

default String getHostAddress() {
return getProperty(CASSANDRA_HOST_ADDRESS_PROPERTY_NAME, CASSANDRA_HOST_ADDRESS_DEFAULT_VALUE);
}
Expand Down Expand Up @@ -71,4 +78,12 @@ default String getReplicationFactorKey() {
default int getReplicationFactorValue() {
return getIntProperty(CASSANDRA_REPLICATION_FACTOR_VALUE_PROPERTY_NAME, CASSANDRA_REPLICATION_FACTOR_VALUE_DEFAULT_VALUE);
}

default ConsistencyLevel getReadConsistencyLevel() {
return ConsistencyLevel.valueOf(getProperty(CASSANDRA_READ_CONSISTENCY_LEVEL, CASSANDRA_READ_CONSISTENCY_LEVEL_DEFAULT_VALUE));
}

default ConsistencyLevel getWriteConsistencyLevel() {
return ConsistencyLevel.valueOf(getProperty(CASSANDRA_WRITE_CONSISTENCY_LEVEL, CASSANDRA_WRITE_CONSISTENCY_LEVEL_DEFAULT_VALUE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.dao.cassandra;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
Expand Down Expand Up @@ -78,23 +79,23 @@ public class CassandraExecutionDAO extends CassandraBaseDAO implements Execution
public CassandraExecutionDAO(Session session, ObjectMapper objectMapper, CassandraConfiguration config, Statements statements) {
super(session, objectMapper, config);

this.insertWorkflowStatement = session.prepare(statements.getInsertWorkflowStatement());
this.insertTaskStatement = session.prepare(statements.getInsertTaskStatement());
this.insertWorkflowStatement = session.prepare(statements.getInsertWorkflowStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.insertTaskStatement = session.prepare(statements.getInsertTaskStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());

this.selectTotalStatement = session.prepare(statements.getSelectTotalStatement());
this.selectTaskStatement = session.prepare(statements.getSelectTaskStatement());
this.selectWorkflowStatement = session.prepare(statements.getSelectWorkflowStatement());
this.selectWorkflowWithTasksStatement = session.prepare(statements.getSelectWorkflowWithTasksStatement());
this.selectTaskLookupStatement = session.prepare(statements.getSelectTaskFromLookupTableStatement());
this.selectTotalStatement = session.prepare(statements.getSelectTotalStatement()).setConsistencyLevel(config.getReadConsistencyLevel());
this.selectTaskStatement = session.prepare(statements.getSelectTaskStatement()).setConsistencyLevel(config.getReadConsistencyLevel());
this.selectWorkflowStatement = session.prepare(statements.getSelectWorkflowStatement()).setConsistencyLevel(config.getReadConsistencyLevel());
this.selectWorkflowWithTasksStatement = session.prepare(statements.getSelectWorkflowWithTasksStatement()).setConsistencyLevel(config.getReadConsistencyLevel());
this.selectTaskLookupStatement = session.prepare(statements.getSelectTaskFromLookupTableStatement()).setConsistencyLevel(config.getReadConsistencyLevel());

this.updateWorkflowStatement = session.prepare(statements.getUpdateWorkflowStatement());
this.updateTotalTasksStatement = session.prepare(statements.getUpdateTotalTasksStatement());
this.updateTotalPartitionsStatement = session.prepare(statements.getUpdateTotalPartitionsStatement());
this.updateTaskLookupStatement = session.prepare(statements.getUpdateTaskLookupStatement());
this.updateWorkflowStatement = session.prepare(statements.getUpdateWorkflowStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.updateTotalTasksStatement = session.prepare(statements.getUpdateTotalTasksStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.updateTotalPartitionsStatement = session.prepare(statements.getUpdateTotalPartitionsStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.updateTaskLookupStatement = session.prepare(statements.getUpdateTaskLookupStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());

this.deleteWorkflowStatement = session.prepare(statements.getDeleteWorkflowStatement());
this.deleteTaskStatement = session.prepare(statements.getDeleteTaskStatement());
this.deleteTaskLookupStatement = session.prepare(statements.getDeleteTaskLookupStatement());
this.deleteWorkflowStatement = session.prepare(statements.getDeleteWorkflowStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.deleteTaskStatement = session.prepare(statements.getDeleteTaskStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
this.deleteTaskLookupStatement = session.prepare(statements.getDeleteTaskLookupStatement()).setConsistencyLevel(config.getWriteConsistencyLevel());
}

@Override
Expand Down Expand Up @@ -319,16 +320,18 @@ public void removeWorkflow(String workflowId) {
Workflow workflow = getWorkflow(workflowId, true);

// TODO: calculate number of shards and iterate
try {
recordCassandraDaoRequests("removeWorkflow", "n/a", workflow.getWorkflowName());
session.execute(deleteWorkflowStatement.bind(UUID.fromString(workflowId), DEFAULT_SHARD_ID));
} catch (Exception e) {
Monitors.error(CLASS_NAME, "removeWorkflow");
String errorMsg = String.format("Failed to remove workflow: %s", workflowId);
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg);
if (workflow != null) {
try {
recordCassandraDaoRequests("removeWorkflow", "n/a", workflow.getWorkflowName());
session.execute(deleteWorkflowStatement.bind(UUID.fromString(workflowId), DEFAULT_SHARD_ID));
} catch (Exception e) {
Monitors.error(CLASS_NAME, "removeWorkflow");
String errorMsg = String.format("Failed to remove workflow: %s", workflowId);
LOGGER.error(errorMsg, e);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, errorMsg);
}
workflow.getTasks().forEach(this::removeTaskLookup);
}
workflow.getTasks().forEach(this::removeTaskLookup);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.netflix.conductor.config;

import com.datastax.driver.core.ConsistencyLevel;
import com.netflix.conductor.cassandra.CassandraConfiguration;

import java.util.Map;
Expand Down Expand Up @@ -160,4 +161,14 @@ public String getReplicationFactorKey() {
public int getReplicationFactorValue() {
return CASSANDRA_REPLICATION_FACTOR_VALUE_DEFAULT_VALUE;
}

@Override
public ConsistencyLevel getReadConsistencyLevel() {
return ConsistencyLevel.LOCAL_ONE;
}

@Override
public ConsistencyLevel getWriteConsistencyLevel() {
return ConsistencyLevel.LOCAL_ONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
Expand All @@ -52,6 +53,7 @@
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_EVENT;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_FORK_JOIN;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_FORK_JOIN_DYNAMIC;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_HTTP;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_JOIN;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SIMPLE;
import static com.netflix.conductor.common.metadata.workflow.TaskType.TASK_TYPE_SUB_WORKFLOW;
Expand Down Expand Up @@ -178,4 +180,12 @@ public TaskMapper getUserDefinedTaskMapper(ParametersUtils parametersUtils, Meta
public TaskMapper getSimpleTaskMapper(ParametersUtils parametersUtils) {
return new SimpleTaskMapper(parametersUtils);
}

@ProvidesIntoMap
@StringMapKey(TASK_TYPE_HTTP)
@Singleton
@Named(TASK_MAPPERS_QUALIFIER)
public TaskMapper getHTTPTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
return new HTTPTaskMapper(parametersUtils, metadataDAO);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.conductor.core.execution.mapper;

import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.TaskType;
import com.netflix.conductor.common.metadata.workflow.WorkflowDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.execution.ParametersUtils;
import com.netflix.conductor.core.execution.TerminateWorkflowException;
import com.netflix.conductor.dao.MetadataDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* An implementation of {@link TaskMapper} to map a {@link WorkflowTask} of type {@link TaskType#HTTP}
* to a {@link Task} of type {@link TaskType#HTTP} with {@link Task.Status#SCHEDULED}
*/
public class HTTPTaskMapper implements TaskMapper {

public static final Logger logger = LoggerFactory.getLogger(com.netflix.conductor.core.execution.mapper.HTTPTaskMapper.class);

private final ParametersUtils parametersUtils;
private final MetadataDAO metadataDAO;

public HTTPTaskMapper(ParametersUtils parametersUtils, MetadataDAO metadataDAO) {
this.parametersUtils = parametersUtils;
this.metadataDAO = metadataDAO;
}

/**
* This method maps a {@link WorkflowTask} of type {@link TaskType#HTTP}
* to a {@link Task} in a {@link Task.Status#SCHEDULED} state
*
* @param taskMapperContext: A wrapper class containing the {@link WorkflowTask}, {@link WorkflowDef}, {@link Workflow} and a string representation of the TaskId
* @return a List with just one HTTP task
* @throws TerminateWorkflowException In case if the task definition does not exist
*/
@Override
public List<Task> getMappedTasks(TaskMapperContext taskMapperContext) throws TerminateWorkflowException {

logger.debug("TaskMapperContext {} in HTTPTaskMapper", taskMapperContext);

WorkflowTask taskToSchedule = taskMapperContext.getTaskToSchedule();
Workflow workflowInstance = taskMapperContext.getWorkflowInstance();
String taskId = taskMapperContext.getTaskId();
int retryCount = taskMapperContext.getRetryCount();

TaskDef taskDefinition = Optional.ofNullable(taskMapperContext.getTaskDefinition())
.orElseGet(() -> Optional.ofNullable(metadataDAO.getTaskDef(taskToSchedule.getName()))
.orElseThrow(() -> {
String reason = String.format("Invalid task specified. Cannot find task by name %s in the task definitions", taskToSchedule.getName());
return new TerminateWorkflowException(reason);
}));

Map<String, Object> input = parametersUtils.getTaskInputV2(taskToSchedule.getInputParameters(), workflowInstance, taskId, taskDefinition);

Task httpTask = new Task();
httpTask.setTaskType(taskToSchedule.getType());
httpTask.setTaskDefName(taskToSchedule.getName());
httpTask.setReferenceTaskName(taskToSchedule.getTaskReferenceName());
httpTask.setWorkflowInstanceId(workflowInstance.getWorkflowId());
httpTask.setWorkflowType(workflowInstance.getWorkflowName());
httpTask.setCorrelationId(workflowInstance.getCorrelationId());
httpTask.setScheduledTime(System.currentTimeMillis());
httpTask.setTaskId(taskId);
httpTask.setInputData(input);
httpTask.setStatus(Task.Status.SCHEDULED);
httpTask.setRetryCount(retryCount);
httpTask.setCallbackAfterSeconds(taskToSchedule.getStartDelay());
httpTask.setWorkflowTask(taskToSchedule);
httpTask.setRateLimitPerFrequency(taskDefinition.getRateLimitPerFrequency());
httpTask.setRateLimitFrequencyInSeconds(taskDefinition.getRateLimitFrequencyInSeconds());
return Collections.singletonList(httpTask);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright 2018 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,16 +115,30 @@ private boolean isDynamicTaskValid(WorkflowTask workflowTask, ConstraintValidato
private boolean isDynamicForkJoinValid(WorkflowTask workflowTask, ConstraintValidatorContext context) {
boolean valid = true;

if (workflowTask.getDynamicForkTasksParam() == null) {
String message = String.format(PARAM_REQUIRED_STRING_FORMAT, "dynamicForkTasksParam", TaskType.FORK_JOIN_DYNAMIC, workflowTask.getName());
//For DYNAMIC_FORK_JOIN_TASK support dynamicForkJoinTasksParam or combination of dynamicForkTasksParam and dynamicForkTasksInputParamName.
// Both are not allowed.
if (workflowTask.getDynamicForkJoinTasksParam() != null &&
(workflowTask.getDynamicForkTasksParam() != null || workflowTask.getDynamicForkTasksInputParamName() != null)) {
String message = String.format("dynamicForkJoinTasksParam or combination of dynamicForkTasksInputParamName and dynamicForkTasksParam cam be used for taskType: %s taskName: %s", TaskType.FORK_JOIN_DYNAMIC, workflowTask.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;
return false;
}
if (workflowTask.getDynamicForkTasksInputParamName() == null) {
String message = String.format(PARAM_REQUIRED_STRING_FORMAT, "dynamicForkTasksInputParamName", TaskType.FORK_JOIN_DYNAMIC, workflowTask.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;

if (workflowTask.getDynamicForkJoinTasksParam() != null) {
return valid;
} else {
if (workflowTask.getDynamicForkTasksParam() == null) {
String message = String.format(PARAM_REQUIRED_STRING_FORMAT, "dynamicForkTasksParam", TaskType.FORK_JOIN_DYNAMIC, workflowTask.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;
}
if (workflowTask.getDynamicForkTasksInputParamName() == null) {
String message = String.format(PARAM_REQUIRED_STRING_FORMAT, "dynamicForkTasksInputParamName", TaskType.FORK_JOIN_DYNAMIC, workflowTask.getName());
context.buildConstraintViolationWithTemplate(message).addConstraintViolation();
valid = false;
}
}

return valid;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
Expand Down Expand Up @@ -108,6 +109,7 @@ public void init() {
taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

this.deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
Expand Down Expand Up @@ -133,6 +134,7 @@ public void setup() {
taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.netflix.conductor.core.execution.mapper.EventTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinDynamicTaskMapper;
import com.netflix.conductor.core.execution.mapper.ForkJoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.HTTPTaskMapper;
import com.netflix.conductor.core.execution.mapper.JoinTaskMapper;
import com.netflix.conductor.core.execution.mapper.SimpleTaskMapper;
import com.netflix.conductor.core.execution.mapper.SubWorkflowTaskMapper;
Expand Down Expand Up @@ -111,6 +112,7 @@ public void init() {
taskMappers.put("SUB_WORKFLOW", new SubWorkflowTaskMapper(parametersUtils, metadataDAO));
taskMappers.put("EVENT", new EventTaskMapper(parametersUtils));
taskMappers.put("WAIT", new WaitTaskMapper(parametersUtils));
taskMappers.put("HTTP", new HTTPTaskMapper(parametersUtils, metadataDAO));

deciderService = new DeciderService(parametersUtils, queueDAO, externalPayloadStorageUtils, taskMappers);
MetadataMapperService metadataMapperService = new MetadataMapperService(metadataDAO);
Expand Down
Loading

0 comments on commit df88d81

Please sign in to comment.