Skip to content

Commit

Permalink
move dependent and conditions to master server (#2423)
Browse files Browse the repository at this point in the history
* move condition and dependent to master

* move conditions to master

* move conditions and dependent task to master

* move conditions to master

* update test

* add log

* add test for dependent task

* add test for dependent task

* update

* update

* refactor complexity code

* refactor complexity code

* add conditions task test

* add conditions task test

* update

* update host to host:port

* update logback.xml

Co-authored-by: baoliang <baoliang@analysys.com.cn>
  • Loading branch information
lenboo and baoliang committed Apr 20, 2020
1 parent ca66cfe commit 4946e88
Show file tree
Hide file tree
Showing 19 changed files with 690 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public enum TaskType {
*/
SHELL(0, "shell"),
SQL(1, "sql"),
SUB_PROCESS(2, "sub process"),
SUB_PROCESS(2, "sub_process"),
PROCEDURE(3, "procedure"),
MR(4, "mr"),
SPARK(5, "spark"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class DependentUtilsTest {
private static final Logger logger = LoggerFactory.getLogger(ShellExecutorTest.class);

@Test
public void getDependResultForRelation() {
//failed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,6 @@ public void setAppLink(String appLink) {
}


public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}

public String getDependency(){

Expand Down Expand Up @@ -458,6 +455,18 @@ public List<String> getResources() {
return resources;
}

public boolean isSubProcess(){
return TaskType.SUB_PROCESS.equals(TaskType.valueOf(this.taskType));
}

public boolean isDependTask(){
return TaskType.DEPENDENT.equals(TaskType.valueOf(this.taskType));
}

public boolean isConditionsTask(){
return TaskType.CONDITIONS.equals(TaskType.valueOf(this.taskType));
}

public void setResources(List<String> resources) {
this.resources = resources;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,4 +361,48 @@ public static ProcessDag getProcessDag(List<TaskNode> taskNodeList) {
processDag.setNodes(taskNodeList);
return processDag;
}

/**
* is there have conditions after the parent node
* @param parentNodeName
* @return
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
DAG<String, TaskNode, TaskNodeRelation> dag
){
boolean result = false;
Set<String> subsequentNodes = dag.getSubsequentNodes(parentNodeName);
if(CollectionUtils.isEmpty(subsequentNodes)){
return result;
}
for(String nodeName : subsequentNodes){
TaskNode taskNode = dag.getNode(nodeName);
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){
return true;
}
}
return result;
}

/**
* is there have conditions after the parent node
* @param parentNodeName
* @return
*/
public static boolean haveConditionsAfterNode(String parentNodeName,
List<TaskNode> taskNodes
){
boolean result = false;
if(CollectionUtils.isEmpty(taskNodes)){
return result;
}
for(TaskNode taskNode : taskNodes){
List<String> preTasksList = JSONUtils.toList(taskNode.getPreTasks(), String.class);
if(preTasksList.contains(parentNodeName) && taskNode.isConditionsTask()){
return true;
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,21 @@ public void testTaskInstanceIsSubProcess() {
TaskInstance taskInstance = new TaskInstance();

//sub process
taskInstance.setTaskType("sub process");
taskInstance.setTaskType("SUB_PROCESS");
Assert.assertTrue(taskInstance.isSubProcess());

//not sub process
taskInstance.setTaskType("http");
taskInstance.setTaskType("HTTP");
Assert.assertFalse(taskInstance.isSubProcess());

//sub process
taskInstance.setTaskType("CONDITIONS");
Assert.assertTrue(taskInstance.isConditionsTask());

//sub process
taskInstance.setTaskType("DEPENDENT");
Assert.assertTrue(taskInstance.isDependTask());


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task.conditions;
package org.apache.dolphinscheduler.server.master.runner;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ConditionsTask extends AbstractTask {
public class ConditionsTaskExecThread extends MasterBaseTaskExecThread {


/**
Expand All @@ -48,66 +44,51 @@ public class ConditionsTask extends AbstractTask {
private DependentParameters dependentParameters;

/**
* process dao
*/
private ProcessService processService;

/**
* taskInstance
*/
private TaskInstance taskInstance;

/**
*
* complete task map
*/
private Map<String, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();


/**
* taskExecutionContext
* condition result
*/
private TaskExecutionContext taskExecutionContext;
private DependResult conditionResult;

/**
* constructor
* @param taskExecutionContext taskExecutionContext
* constructor of MasterBaseTaskExecThread
*
* @param logger logger
* @param taskInstance task instance
*/
public ConditionsTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
public ConditionsTaskExecThread(TaskInstance taskInstance) {
super(taskInstance);
}

@Override
public void init() throws Exception {
logger.info("conditions task initialize");

this.processService = SpringApplicationContext.getBean(ProcessService.class);

this.dependentParameters = JSONUtils.parseObject(taskExecutionContext.
getDependenceTaskExecutionContext()
.getDependence(),
DependentParameters.class);

this.taskInstance = processService.findTaskInstanceById(taskExecutionContext.getTaskInstanceId());

if(taskInstance == null){
throw new Exception("cannot find the task instance!");
}

List<TaskInstance> taskInstanceList = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());
for(TaskInstance task : taskInstanceList){
this.completeTaskList.putIfAbsent(task.getName(), task.getState());
public Boolean submitWaitComplete() {
try{
this.taskInstance = submit();
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getProcessDefinitionId(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
logger.info("dependent task start");
waitTaskQuit();
updateTaskState();
}catch (Exception e){
logger.error("conditions task run exception" , e);
}
return true;
}

@Override
public void handle() throws Exception {

String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT,
taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
private void waitTaskQuit() {
List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(
taskInstance.getProcessInstanceId()
);
for(TaskInstance task : taskInstances){
completeTaskList.putIfAbsent(task.getName(), task.getState());
}

List<DependResult> modelResultList = new ArrayList<>();
for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){
Expand All @@ -119,14 +100,43 @@ public void handle() throws Exception {
DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);
modelResultList.add(modelResult);
}
DependResult result = DependentUtils.getDependResultForRelation(
conditionResult = DependentUtils.getDependResultForRelation(
dependentParameters.getRelation(), modelResultList
);
logger.info("the conditions task depend result : {}", result);
exitStatusCode = (result == DependResult.SUCCESS) ?
Constants.EXIT_CODE_SUCCESS : Constants.EXIT_CODE_FAILURE;
logger.info("the conditions task depend result : {}", conditionResult);
}

/**
*
*/
private void updateTaskState() {
ExecutionStatus status;
if(this.cancel){
status = ExecutionStatus.KILL;
}else{
status = (conditionResult == DependResult.SUCCESS) ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE;
}
taskInstance.setState(status);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}

private void initTaskParameters() {
this.taskInstance.setLogPath(getTaskLogPath(taskInstance));
this.taskInstance.setHost(OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort());
taskInstance.setState(ExecutionStatus.RUNNING_EXEUTION);
taskInstance.setStartTime(new Date());
this.processService.saveTaskInstance(taskInstance);

this.dependentParameters = JSONUtils.parseObject(this.taskInstance.getDependency(), DependentParameters.class);
}


/**
* depend result for depend item
* @param item
* @return
*/
private DependResult getDependResultForItem(DependentItem item){

DependResult dependResult = DependResult.SUCCESS;
Expand All @@ -137,16 +147,13 @@ private DependResult getDependResultForItem(DependentItem item){
}
ExecutionStatus executionStatus = completeTaskList.get(item.getDepTasks());
if(executionStatus != item.getStatus()){
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus().toString(), executionStatus.toString());
logger.info("depend item : {} expect status: {}, actual status: {}" ,item.getDepTasks(), item.getStatus(), executionStatus);
dependResult = DependResult.FAILED;
}
logger.info("depend item: {}, depend result: {}",
item.getDepTasks(), dependResult);
logger.info("dependent item complete {} {},{}",
Constants.DEPENDENT_SPLIT, item.getDepTasks(), dependResult);
return dependResult;
}

@Override
public AbstractParameters getParameters() {
return null;
}
}

}

0 comments on commit 4946e88

Please sign in to comment.