Skip to content

Commit

Permalink
[Improvement-15707][Master] Work out the issue that the workflow with…
Browse files Browse the repository at this point in the history
… a task dependency couldn't work well. (#15712)
  • Loading branch information
calvinjiang committed Mar 14, 2024
1 parent 738da1c commit eab71f1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,14 @@ ProcessInstance queryLastRunningProcess(@Param("processDefinitionCode") Long def
* query last manual process instance
*
* @param definitionCode definitionCode
* @param taskCode taskCode
* @param startTime startTime
* @param endTime endTime
* @param testFlag testFlag
* @return process instance
*/
ProcessInstance queryLastManualProcess(@Param("processDefinitionCode") Long definitionCode,
@Param("taskCode") Long taskCode,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime,
@Param("testFlag") int testFlag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ public interface ProcessInstanceDao extends IDao<ProcessInstance> {
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param taskCode taskCode
* @param dateInterval dateInterval
* @return process instance
*/
ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag);
ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
int testFlag);

/**
* query first schedule process instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ public ProcessInstance queryLastSchedulerProcessInterval(Long definitionCode, Da
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param taskCode taskCode
* @param dateInterval dateInterval
* @return process instance
*/
@Override
public ProcessInstance queryLastManualProcessInterval(Long definitionCode, DateInterval dateInterval,
public ProcessInstance queryLastManualProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
int testFlag) {
return mybatisMapper.queryLastManualProcess(definitionCode,
taskCode,
dateInterval.getStartTime(),
dateInterval.getEndTime(),
testFlag);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,25 @@
order by start_time desc limit 1
</select>
<select id="queryLastManualProcess" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
select
<include refid="baseSql"/>
from t_ds_process_instance
where process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
and schedule_time is null
<if test="startTime!=null and endTime != null ">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
order by end_time desc limit 1
select t1.*
from
(
select
<include refid="baseSql"/>
from t_ds_process_instance
where
process_definition_code=#{processDefinitionCode} and test_flag=#{testFlag}
and schedule_time is null
<if test="startTime!=null and endTime != null ">
and start_time <![CDATA[ >= ]]> #{startTime} and start_time <![CDATA[ <= ]]> #{endTime}
</if>
) as t1
<if test="taskCode != null and taskCode!=0 and taskCode!=-1">
inner join
t_ds_task_instance as t2
on t1.id = t2.process_instance_id and t2.task_code=#{taskCode}
</if>
order by t1.end_time desc limit 1
</select>

<select id="queryFirstScheduleProcessInstance" resultType="org.apache.dolphinscheduler.dao.entity.ProcessInstance">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,15 @@ public void testQueryLastManualProcess() {
Date start = new Date(2019 - 1900, 1 - 1, 01, 0, 0, 0);
Date end = new Date(2019 - 1900, 1 - 1, 01, 5, 0, 0);
ProcessInstance processInstance1 =
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end,
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), null, start,
end,
processInstance.getTestFlag());
Assertions.assertEquals(processInstance1.getId(), processInstance.getId());

start = new Date(2019 - 1900, 1 - 1, 01, 1, 0, 0);
processInstance1 =
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), start, end,
processInstanceMapper.queryLastManualProcess(processInstance.getProcessDefinitionCode(), null, start,
end,
processInstance.getTestFlag());
Assertions.assertNull(processInstance1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,9 @@ private DependResult calculateResultForTasks(DependentItem dependentItem,

DependResult result = DependResult.FAILED;
for (DateInterval dateInterval : dateIntervals) {
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionCode(),
dateInterval, testFlag);
ProcessInstance processInstance =
findLastProcessInterval(dependentItem.getDefinitionCode(), dependentItem.getDepTaskCode(),
dateInterval, testFlag);
if (processInstance == null) {
return DependResult.WAITING;
}
Expand Down Expand Up @@ -311,16 +312,18 @@ private void addItemVarPool(String varPoolStr, Long endTime) {
* 2. schedule run and schedule time between the interval
*
* @param definitionCode definition code
* @param taskCode task code
* @param dateInterval date interval
* @return ProcessInstance
*/
private ProcessInstance findLastProcessInterval(Long definitionCode, DateInterval dateInterval, int testFlag) {
private ProcessInstance findLastProcessInterval(Long definitionCode, Long taskCode, DateInterval dateInterval,
int testFlag) {

ProcessInstance lastSchedulerProcess =
processInstanceDao.queryLastSchedulerProcessInterval(definitionCode, dateInterval, testFlag);

ProcessInstance lastManualProcess =
processInstanceDao.queryLastManualProcessInterval(definitionCode, dateInterval, testFlag);
processInstanceDao.queryLastManualProcessInterval(definitionCode, taskCode, dateInterval, testFlag);

if (lastManualProcess == null) {
return lastSchedulerProcess;
Expand Down

0 comments on commit eab71f1

Please sign in to comment.