Skip to content

Commit

Permalink
[DSIP-46] Unify the logic task params data structure between front-en…
Browse files Browse the repository at this point in the history
…d and back-end
  • Loading branch information
ruanwenjun committed Jun 20, 2024
1 parent 3f3f0cd commit e261d05
Show file tree
Hide file tree
Showing 35 changed files with 611 additions and 996 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinition
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
log.error(
"Generate task definition list failed, the given task definition parameter is invalided, taskName: {}, taskDefinition: {}",
Expand Down Expand Up @@ -1615,12 +1614,11 @@ public Map<String, Object> checkProcessNodeList(String processTaskRelationJson,

// check whether the process definition json is normal
for (TaskNode taskNode : taskNodes) {
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskNode.getType())
.taskParams(taskNode.getTaskParams())
.dependence(taskNode.getDependence())
.switchResult(taskNode.getSwitchResult())
.build())) {
if (!TaskPluginManager.checkTaskParameters(
ParametersNode.builder()
.taskType(taskNode.getType())
.taskParams(taskNode.getParams())
.build())) {
log.error("Task node {} parameter invalid.", taskNode.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskNode.getName());
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,6 @@ public Map<String, Object> updateProcessInstance(User loginUser, long projectCod
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
log.error("Task parameters are invalid, taskDefinitionName:{}.", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public Map<String, Object> createTaskDefinition(User loginUser,
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionLog.getTaskType())
.taskParams(taskDefinitionLog.getTaskParams())
.dependence(taskDefinitionLog.getDependence())
.build())) {
log.warn("Task definition {} parameters are invalid.", taskDefinitionLog.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionLog.getName());
Expand Down Expand Up @@ -211,7 +210,6 @@ private void checkTaskDefinitionValid(User user, TaskDefinition taskDefinition,
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType())
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
.build())) {
throw new ServiceException(Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
}
Expand Down Expand Up @@ -324,7 +322,6 @@ public Map<String, Object> createTaskBindsWorkFlow(User loginUser,
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinition.getTaskType())
.taskParams(taskDefinition.getTaskParams())
.dependence(taskDefinition.getDependence())
.build())) {
log.error("Task definition {} parameters are invalid", taskDefinition.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinition.getName());
Expand Down Expand Up @@ -735,7 +732,6 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
if (!TaskPluginManager.checkTaskParameters(ParametersNode.builder()
.taskType(taskDefinitionToUpdate.getTaskType())
.taskParams(taskDefinitionToUpdate.getTaskParams())
.dependence(taskDefinitionToUpdate.getDependence())
.build())) {
log.warn("Task definition parameters are invalid, taskDefinitionName:{}.",
taskDefinitionToUpdate.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private void getProcessDefinitionCodeByDependentDefinitionList(List<DependentPro
List<Long> processDefinitionCodes) {
for (DependentProcessDefinition dependentProcessDefinition : dependentDefinitionList) {
for (DependentTaskModel dependentTaskModel : dependentProcessDefinition.getDependentParameters()
.getDependTaskList()) {
.getDependence().getDependTaskList()) {
for (DependentItem dependentItem : dependentTaskModel.getDependItemList()) {
if (!processDefinitionCodes.contains(dependentItem.getDefinitionCode())) {
processDefinitionCodes.add(dependentItem.getDefinitionCode());
Expand Down Expand Up @@ -225,7 +225,7 @@ private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode,
JSONUtils.parseObject(taskDefinitionLog.getDependence(), DependentParameters.class);
if (dependentParameters != null) {
List<DependentTaskModel> dependTaskList =
dependentParameters.getDependTaskList();
dependentParameters.getDependence().getDependTaskList();
if (!CollectionUtils.isEmpty(dependTaskList)) {
for (DependentTaskModel taskModel : dependTaskList) {
List<DependentItem> dependItemList = taskModel.getDependItemList();
Expand All @@ -247,9 +247,9 @@ private Set<Long> querySourceWorkFlowCodes(long projectCode, long workFlowCode,
/**
* Query and return tasks dependence with string format, is a wrapper of queryTaskDepOnTask and task query method.
*
* @param projectCode Project code want to query tasks dependence
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return Optional of formatter message
*/
@Override
Expand All @@ -271,7 +271,7 @@ public Optional<String> taskDepOnTaskMsg(long projectCode, long processDefinitio
/**
* Query tasks depend on process definition, include upstream or downstream
*
* @param projectCode Project code want to query tasks dependence
* @param projectCode Project code want to query tasks dependence
* @param processDefinitionCode Process definition code want to query tasks dependence
* @return Set of TaskMainInfo
*/
Expand All @@ -291,7 +291,7 @@ public Set<TaskMainInfo> queryTaskDepOnProcess(long projectCode, long processDef
* Query downstream tasks depend on a process definition or a task
*
* @param processDefinitionCode Process definition code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @param taskCode Task code want to query tasks dependence
* @return downstream dependent tasks
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.dao.entity;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
Expand Down Expand Up @@ -67,7 +66,7 @@ public class DependentProcessDefinition {
*/
public CycleEnum getDependentCycle(long upstreamProcessDefinitionCode) {
DependentParameters dependentParameters = this.getDependentParameters();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependTaskList();
List<DependentTaskModel> dependentTaskModelList = dependentParameters.getDependence().getDependTaskList();

for (DependentTaskModel dependentTaskModel : dependentTaskModelList) {
List<DependentItem> dependentItemList = dependentTaskModel.getDependItemList();
Expand Down Expand Up @@ -104,11 +103,7 @@ public CycleEnum cycle2CycleEnum(String cycle) {
}

public DependentParameters getDependentParameters() {
return JSONUtils.parseObject(getDependence(), DependentParameters.class);
}

public String getDependence() {
return JSONUtils.getNodeString(this.taskParams, Constants.DEPENDENCE);
return JSONUtils.parseObject(taskParams, DependentParameters.class);
}

public String getProcessDefinitionName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,14 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.TaskExecuteType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;

import org.apache.commons.lang3.StringUtils;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import lombok.Data;
Expand All @@ -50,7 +41,6 @@
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.core.type.TypeReference;

/**
* task instance
Expand Down Expand Up @@ -190,21 +180,6 @@ public class TaskInstance implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String cacheKey;

/**
* dependency
*/
@TableField(exist = false)
private DependentParameters dependency;

@TableField(exist = false)
private ConditionsParameters conditionsParameters;

/**
* switch dependency
*/
@TableField(exist = false)
private SwitchParameters switchDependency;

/**
* duration
*/
Expand Down Expand Up @@ -310,80 +285,6 @@ public void init(String host, Date startTime, String executePath) {
this.executePath = executePath;
}

public DependentParameters getDependency() {
if (this.dependency == null) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.dependency =
JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), DependentParameters.class);
}
return this.dependency;
}

public void setDependency(DependentParameters dependency) {
this.dependency = dependency;
}

public ConditionsParameters getConditionsParameters() {
if (this.conditionsParameters == null) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.conditionsParameters =
JSONUtils.parseObject((String) taskParamsMap.get(Constants.DEPENDENCE), ConditionsParameters.class);
}
return conditionsParameters;
}

public ConditionsParameters.ConditionResult getConditionResult() {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
String conditionResult = (String) taskParamsMap.getOrDefault(Constants.CONDITION_RESULT, "");
if (StringUtils.isNotEmpty(conditionResult)) {
return JSONUtils.parseObject(conditionResult, new TypeReference<ConditionsParameters.ConditionResult>() {
});
}
return null;
}

public void setConditionResult(ConditionsParameters conditionsParameters) {
if (conditionsParameters == null) {
return;
}
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
if (taskParamsMap == null) {
taskParamsMap = new HashMap<>();
}
taskParamsMap.put(Constants.CONDITION_RESULT, JSONUtils.toJsonString(conditionsParameters));
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}

public SwitchParameters getSwitchDependency() {
// todo: We need to directly use Jackson to deserialize the taskParam, rather than parse the map and get from
// field.
if (this.switchDependency == null) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
this.switchDependency =
JSONUtils.parseObject((String) taskParamsMap.get(Constants.SWITCH_RESULT), SwitchParameters.class);
}
return this.switchDependency;
}

public void setSwitchDependency(SwitchParameters switchDependency) {
Map<String, Object> taskParamsMap =
JSONUtils.parseObject(this.getTaskParams(), new TypeReference<Map<String, Object>>() {
});
taskParamsMap.put(Constants.SWITCH_RESULT, JSONUtils.toJsonString(switchDependency));
this.switchDependency = switchDependency;
this.setTaskParams(JSONUtils.toJsonString(taskParamsMap));
}

public boolean isTaskComplete() {

return this.getState().isSuccess()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,9 @@ public static void copyTaskInstance(TaskInstance source, TaskInstance target) {
target.setPid(source.getPid());
target.setAppLink(source.getAppLink());
target.setFlag(source.getFlag());
target.setDependency(source.getDependency());
// todo: we need to cpoy the task params and then copy switchDependency, since the setSwitchDependency rely on
// task params, this is really a very bad practice.
target.setTaskParams(source.getTaskParams());
target.setSwitchDependency(source.getSwitchDependency());
target.setDuration(source.getDuration());
target.setMaxRetryTimes(source.getMaxRetryTimes());
target.setRetryInterval(source.getRetryInterval());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SUB_PROCESS;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependentRelation;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
Expand Down Expand Up @@ -66,7 +65,6 @@ public void testTaskInstanceIsSubProcess() {
public void testTaskInstanceGetDependence() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskParams(JSONUtils.toJsonString(getDependentParameters()));
taskInstance.getDependency();
}

/**
Expand All @@ -82,8 +80,6 @@ private DependentParameters getDependentParameters() {
dependentItem.setDefinitionCode(222L);
dependentItem.setCycle("today");
dependentItems.add(dependentItem);
dependentParameters.setDependTaskList(dependTaskList);
dependentParameters.setRelation(DependentRelation.AND);
return dependentParameters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.dolphinscheduler.extract.worker.transportor.UpdateWorkflowHostResponse;
import org.apache.dolphinscheduler.plugin.task.api.enums.DependResult;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.VarPoolUtils;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
Expand Down Expand Up @@ -1093,7 +1092,7 @@ public TaskInstance newTaskInstance(ProcessInstance processInstance, TaskNode ta
taskInstance.setRetryInterval(taskNode.getRetryInterval());

// set task param
taskInstance.setTaskParams(taskNode.getTaskParams());
taskInstance.setTaskParams(taskNode.getParams());

// set task group and priority
taskInstance.setTaskGroupId(taskNode.getTaskGroupId());
Expand Down Expand Up @@ -1351,6 +1350,11 @@ private DependResult isTaskDepsComplete(Long taskCode) {
continue;
}

// always return success if current task is switch
if (taskNode.isSwitchTask()) {
continue;
}

if (!dependTaskSuccess(depsNode, taskCode)) {
return DependResult.FAILED;
}
Expand Down Expand Up @@ -1405,12 +1409,6 @@ private boolean dependTaskSuccess(Long dependNodeCode, Long nextNodeCode) {
}
return true;
}
if (dependentNode.isSwitchTask()) {
TaskInstance dependentTaskInstance = taskInstanceMap.get(validTaskMap.get(dependentNode.getCode()));
SwitchParameters switchParameters = dependentTaskInstance.getSwitchDependency();
return switchParameters.getDependTaskList().get(switchParameters.getResultConditionLocation()).getNextNode()
.contains(nextNodeCode);
}
Optional<TaskInstance> existTaskInstanceOptional = getTaskInstance(dependNodeCode);
if (!existTaskInstanceOptional.isPresent()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@

import lombok.extern.slf4j.Slf4j;

import com.fasterxml.jackson.core.type.TypeReference;

@Slf4j
public abstract class BaseAsyncLogicTask<T extends AbstractParameters> implements IAsyncLogicTask {

protected final TaskExecutionContext taskExecutionContext;
protected final T taskParameters;

protected BaseAsyncLogicTask(TaskExecutionContext taskExecutionContext, T taskParameters) {
protected BaseAsyncLogicTask(TaskExecutionContext taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
this.taskParameters = taskParameters;
this.taskParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), new TypeReference<T>() {
});
log.info("Success initialize task parameters: \n{}", JSONUtils.toPrettyJsonString(taskParameters));
}

Expand Down
Loading

0 comments on commit e261d05

Please sign in to comment.