Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public enum TaskDependType {
*/
TASK_ONLY(0, "task only"),
TASK_PRE(1, "task pre"),
TASK_POST(2, "task post");
TASK_POST(2, "task post"),
TASK_DEPENDENT(3, "task dependent");

TaskDependType(int code, String descp) {
this.code = code;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,15 @@ public void addHistoryCmd(CommandType cmd) {
}
}

/**
* check this process is start task dependent backward data
*
* @return whether complement data
*/
public boolean isDependentData() {
return this.taskDependType == TaskDependType.TASK_DEPENDENT;
}

/**
* check this process is start complement data
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,10 @@ int countByCode(@Param("projectCode") long projectCode,
@Param("preTaskCode") long preTaskCode,
@Param("postTaskCode") long postTaskCode);

/**
*
* @param taskCode taskCode
* @return list
*/
List<ProcessTaskRelation> queryDistinctDefineCodeByTaskCode(@Param("taskCode") long taskCode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,11 @@ public interface TaskDefinitionLogMapper extends BaseMapper<TaskDefinitionLog> {
* @return the paging task definition version list
*/
IPage<TaskDefinitionLog> queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code") long code, @Param("projectCode") long projectCode);

/**
*
* @param taskType taskType
* @return task definition list
*/
List<TaskDefinition> queryDefinitionListByTaskType(@Param("taskType") String taskType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ public static List<TaskNode> generateFlowNodeListByStartNode(List<TaskNode> task
List<String> startNodeList = startNodeNameList;

if (taskDependType != TaskDependType.TASK_POST && CollectionUtils.isEmpty(startNodeList)) {
logger.error("start node list is empty! cannot continue run the process ");
return destFlowNodeList;
if (taskDependType != TaskDependType.TASK_DEPENDENT) {
logger.error("start node list is empty! cannot continue run the process ");
return destFlowNodeList;
}
}

List<TaskNode> destTaskNodeList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,14 @@
and post_task_code = #{postTaskCode}
</if>
</select>
<select id="queryDistinctDefineCodeByTaskCode" resultType="org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation">
select
process_definition_code
from t_ds_process_task_relation
WHERE pre_task_code = #{taskCode}
<if test="taskCode != 0">
or post_task_code = #{taskCode}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to split this sql instread of or query, which will dismatch index.

</if>
group by process_definition_code
</select>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,9 @@
</if>
order by version desc
</select>
<select id="queryDefinitionListByTaskType" resultType="org.apache.dolphinscheduler.dao.entity.TaskDefinition">
select distinct code, name, version, task_params
from t_ds_task_definition_log
where task_type = #{taskType}
</select>
</mapper>
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,9 @@ private boolean processStateChangeHandler(StateEvent stateEvent) {
if (processComplementData()) {
return true;
}
if (processDependentData()) {
return true;
}
if (stateEvent.getExecutionStatus().typeIsFinished()) {
endProcess();
}
Expand Down Expand Up @@ -638,6 +641,44 @@ private boolean needComplementProcess() {
return false;
}

//backward dependent execution
private boolean processDependentData() throws Exception {
if (!processInstance.isDependentData()) {
return false;
}
if (processInstance.getState().typeIsFinished()) {
endProcess();
List<ProcessDefinition> postDependentProcessList = processService.
getPostDependentProcessDefinitionByCode(processInstance.getProcessDefinitionCode());
if (postDependentProcessList.size() > 0) {
for (ProcessDefinition postDependentProcess : postDependentProcessList) {
Command command = new Command();
command.setCommandType(CommandType.START_PROCESS);
command.setProcessDefinitionCode(postDependentProcess.getCode());
Map<String, String> cmdParam = JSONUtils.toMap(processInstance.getCommandParam());
cmdParam.remove(Constants.CMD_PARAM_START_NODES);
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
command.setTaskDependType(TaskDependType.TASK_DEPENDENT);
command.setFailureStrategy(processInstance.getFailureStrategy());
command.setWarningType(processInstance.getWarningType());
command.setWarningGroupId(processInstance.getWarningGroupId());
command.setScheduleTime(processInstance.getScheduleTime());
command.setStartTime(new Date());
command.setExecutorId(processInstance.getExecutorId());
command.setUpdateTime(new Date());
command.setProcessInstancePriority(processInstance.getProcessInstancePriority());
command.setWorkerGroup(processInstance.getWorkerGroup());
command.setEnvironmentCode(processInstance.getEnvironmentCode());
command.setDryRun(processInstance.getDryRun());
command.setProcessInstanceId(0);
command.setProcessDefinitionVersion(postDependentProcess.getVersion());
processService.createCommand(command);
}
}
}
return true;
}

/**
* process start handle
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public class DependentTaskProcessor extends BaseTaskProcessor {

DependResult result;

ProcessInstance processInstance;
TaskDefinition taskDefinition;

private MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
Expand All @@ -39,6 +43,7 @@
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskDependType;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.graph.DAG;
Expand Down Expand Up @@ -2716,4 +2721,33 @@ private void deleteCommandWithCheck(int commandId) {
throw new ServiceException("delete command fail, id:" + commandId);
}
}

public List<ProcessDefinition> getPostDependentProcessDefinitionByCode(long processDefinitionCode) throws JsonProcessingException {
List<ProcessDefinition> result = new ArrayList<>();
List<TaskDefinition> definitionList = taskDefinitionLogMapper.queryDefinitionListByTaskType(TaskType.DEPENDENT.getDesc());
for (TaskDefinition taskDefinition : definitionList) {
String dependence = taskDefinition.getDependence();
ArrayNode dependTaskList = (ArrayNode) new ObjectMapper().readTree(dependence).get("dependTaskList");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use constant "dependTaskList"

for (JsonNode dependTask : dependTaskList) {
ArrayNode dependItemList = (ArrayNode) dependTask.get("dependItemList");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use constant

for (JsonNode dependentItem : dependItemList) {
long taskDefinitionCode = taskDefinition.getCode();
Long dependentProcessDefinitionCode = dependentItem.get("definitionCode").asLong();
if (dependentProcessDefinitionCode.equals(processDefinitionCode)) {
List<ProcessTaskRelation> dependentTaskRelationList = processTaskRelationMapper.
queryDistinctDefineCodeByTaskCode(taskDefinitionCode);
if (dependentTaskRelationList.size() > 0) {
for (ProcessTaskRelation dependentTaskRelation : dependentTaskRelationList) {
ProcessDefinition processDefine = processDefineMapper.queryByCode(dependentTaskRelation.getProcessDefinitionCode());
if (!result.contains(processDefine)) {
result.add(processDefine);
}
}
}
}
}
}
}
return result;
}
}
11 changes: 10 additions & 1 deletion dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/dag.vue
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@
'setIsEditDag',
'setName',
'setLocations',
'resetLocalParam'
'resetLocalParam',
'setDependResult'
]),
/**
* Toggle full screen
Expand Down Expand Up @@ -562,6 +563,7 @@
.then((res) => {
this.$message(this.$t('Refresh status succeeded'))
const { taskList } = res.data
const list = res.list
if (taskList) {
this.taskInstances = taskList
taskList.forEach((taskInstance) => {
Expand All @@ -572,6 +574,13 @@
})
})
}
if (list) {
list.forEach((dependent) => {
if (dependent.dependentResult) {
this.setDependResult(JSON.parse(dependent.dependentResult))
}
})
}
})
.finally(() => {
this.loading(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import mListBox from './_source/listBox'
import mDependItemList from './_source/dependItemList'
import disabledState from '@/module/mixin/disabledState'
import { mapState } from 'vuex'

export default {
name: 'dependent',
Expand Down Expand Up @@ -140,15 +141,15 @@
},
created () {
let o = this.backfillItem
let dependentResult = $(`#${o.id}`).data('dependent-result') || {}
let dependentResult = this.dependResult || {}
// Does not represent an empty object backfill
if (!_.isEmpty(o)) {
this.relation = _.cloneDeep(o.dependence.relation) || 'AND'
this.dependTaskList = _.cloneDeep(o.dependence.dependTaskList) || []
let defaultState = this.isDetails ? 'WAITING' : ''
// Process instance return status display matches by key
_.map(this.dependTaskList, v => _.map(v.dependItemList, v1 => {
v1.state = dependentResult[`${v1.definitionId}-${v1.depTaskCode}-${v1.cycle}-${v1.dateValue}`] || defaultState
v1.state = dependentResult[`${v1.definitionCode}-${v1.depTaskCode}-${v1.cycle}-${v1.dateValue}`] || defaultState
}))
}
},
Expand All @@ -157,6 +158,9 @@
destroyed () {
},
computed: {
...mapState('dag', [
'dependResult'
]),
cacheDependent () {
return {
relation: this.relation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<el-radio :label="'TASK_POST'">{{$t('Backward execution')}}</el-radio>
<el-radio :label="'TASK_PRE'">{{$t('Forward execution')}}</el-radio>
<el-radio :label="'TASK_ONLY'">{{$t('Execute only the current node')}}</el-radio>
<el-radio :label="'TASK_DEPENDENT'">{{$t('Backward Dependent execution')}}</el-radio>
</el-radio-group>
</div>
</div>
Expand Down
7 changes: 7 additions & 0 deletions dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ export default {
io.get(`projects/${state.projectCode}/process-instances/${payload}/tasks`, {
processInstanceId: payload
}, res => {
res.list = _.map(res.data.taskList, v => {
return _.cloneDeep({
code: v.taskCode,
taskType: v.taskType,
dependentResult: v.dependentResult
})
})
state.taskInstances = res.data.taskList
resolve(res)
}).catch(e => {
Expand Down
8 changes: 8 additions & 0 deletions dolphinscheduler-ui/src/js/conf/home/store/dag/mutations.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ export default {
state.isDetails = payload
},

/**
* set depend result
*/
setDependResult (state, payload) {
state.dependResult = Object.assign(state.dependResult, {}, payload)
},

/**
* reset params
*/
Expand All @@ -129,6 +136,7 @@ export default {
state.runFlag = (payload && payload.runFlag) || ''
state.locations = (payload && payload.locations) || {}
state.connects = (payload && payload.connects) || []
state.dependResult = (payload && payload.dependResult) || {}
},
/**
* add task
Expand Down
3 changes: 2 additions & 1 deletion dolphinscheduler-ui/src/js/conf/home/store/dag/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,6 @@ export default {
// Operating state
isDetails: false,
startup: {},
taskInstances: []
taskInstances: [],
dependResult: {}
}
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ export default {
'Backward execution': 'Backward execution',
'Forward execution': 'Forward execution',
'Execute only the current node': 'Execute only the current node',
'Backward Dependent execution': 'Backward Dependent execution',
'Notification strategy': 'Notification strategy',
'Notification group': 'Notification group',
'Please select a notification group': 'Please select a notification group',
Expand Down
1 change: 1 addition & 0 deletions dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ export default {
'Backward execution': '向后执行',
'Forward execution': '向前执行',
'Execute only the current node': '仅执行当前节点',
'Backward Dependent execution': '向后依赖执行',
'Notification strategy': '通知策略',
'Notification group': '通知组',
'Please select a notification group': '请选择通知组',
Expand Down