Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
@ConfigurationProperties("master")
public class MasterConfig {
private int listenPort;
private int fetchCommandNum;
private int preExecThreads;
private int execThreads;
private int execTaskNum;
private int dispatchTaskNumber;
Expand All @@ -48,6 +50,22 @@ public void setListenPort(int listenPort) {
this.listenPort = listenPort;
}

public int getFetchCommandNum() {
return fetchCommandNum;
}

public void setFetchCommandNum(int fetchCommandNum) {
this.fetchCommandNum = fetchCommandNum;
}

public int getPreExecThreads() {
return preExecThreads;
}

public void setPreExecThreads(int preExecThreads) {
this.preExecThreads = preExecThreads;
}

public int getExecThreads() {
return execThreads;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,14 @@
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;

import org.apache.commons.collections4.CollectionUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -90,6 +95,11 @@ public class MasterSchedulerService extends Thread {
@Autowired
NettyExecutorManager nettyExecutorManager;

/**
* master prepare exec service
*/
private ThreadPoolExecutor masterPrepareExecService;

/**
* master exec service
*/
Expand Down Expand Up @@ -120,6 +130,7 @@ public class MasterSchedulerService extends Thread {
* constructor of MasterSchedulerService
*/
public void init() {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Pre-Exec-Thread", masterConfig.getPreExecThreads());
this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
Expand Down Expand Up @@ -175,81 +186,125 @@ public void run() {
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
*
* @throws Exception
*/
private void scheduleProcess() throws Exception {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}

// make sure to scan and delete command table in one transaction
Command command = findOneCommand();
if (command != null) {
logger.info("find one command: id: {}, type: {}", command.getId(), command.getCommandType());
try {
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command,
processDefinitionCacheMaps);
if (!masterConfig.isCacheProcessDefinition()
&& processDefinitionCacheMaps.size() > 0) {
processDefinitionCacheMaps.clear();
if (!masterConfig.isCacheProcessDefinition() && processDefinitionCacheMaps.size() > 0) {
processDefinitionCacheMaps.clear();
}

List<ProcessInstance> processInstances = command2ProcessInstance(commands);
if (CollectionUtils.isEmpty(processInstances)) {
return;
}

for (ProcessInstance processInstance : processInstances) {
if (processInstance == null) {
continue;
}

WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList);

this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
this.processTimeoutCheckList.put(processInstance.getId(), processInstance);
}
masterExecService.execute(workflowExecuteThread);
}
}

private List<ProcessInstance> command2ProcessInstance(List<Command> commands) {
if (CollectionUtils.isEmpty(commands)) {
return null;
}

ProcessInstance[] processInstances = new ProcessInstance[commands.size()];
CountDownLatch latch = new CountDownLatch(commands.size());
for (int i = 0; i < commands.size(); i++) {
int index = i;
this.masterPrepareExecService.execute(() -> {
Command command = commands.get(index);
// slot check again
if (!slotCheck(command)) {
latch.countDown();
return;
}
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, processService
, nettyExecutorManager
, processAlertManager
, masterConfig
, taskTimeoutCheckList);

this.processInstanceExecCacheManager.cache(processInstance.getId(), workflowExecuteThread);
if (processInstance.getTimeout() > 0) {
this.processTimeoutCheckList.put(processInstance.getId(), processInstance);

try {
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command,
processDefinitionCacheMaps);
if (processInstance != null) {
processInstances[index] = processInstance;
logger.info("handle command command {} end, create process instance {}",
command.getId(), processInstance.getId());
}
logger.info("handle command end, command {} process {} start...",
command.getId(), processInstance.getId());
masterExecService.execute(workflowExecuteThread);
} catch (Exception e) {
logger.error("scan command error ", e);
processService.moveToErrorCommand(command, e.toString());
} finally {
latch.countDown();
}
} catch (Exception e) {
logger.error("scan command error ", e);
processService.moveToErrorCommand(command, e.toString());
}
} else {
//indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
});
}

try {
// make sure to finish handling command each time before next scan
latch.await();
} catch (InterruptedException e) {
logger.error("countDownLatch await error ", e);
}

return Arrays.asList(processInstances);
}

private Command findOneCommand() {
private List<Command> findCommands() {
int pageNumber = 0;
Command result = null;
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
while (Stopper.isRunning()) {
if (ServerNodeManager.MASTER_SIZE == 0) {
return null;
return result;
}
List<Command> commandList = processService.findCommandPage(ServerNodeManager.MASTER_SIZE, pageNumber);
List<Command> commandList = processService.findCommandPage(pageSize, pageNumber);
if (commandList.size() == 0) {
return null;
return result;
}
for (Command command : commandList) {
int slot = ServerNodeManager.getSlot();
if (ServerNodeManager.MASTER_SIZE != 0
&& command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
result = command;
break;
if (slotCheck(command)) {
result.add(command);
}
}
if (result != null) {
logger.info("find command {}, slot:{} :",
result.getId(),
ServerNodeManager.getSlot());
if (CollectionUtils.isNotEmpty(result)) {
logger.info("find {} commands, slot:{}", result.size(), ServerNodeManager.getSlot());
break;
}
pageNumber += 1;
}
return result;
}

private boolean slotCheck(Command command) {
int slot = ServerNodeManager.getSlot();
if (ServerNodeManager.MASTER_SIZE != 0 && command.getId() % ServerNodeManager.MASTER_SIZE == slot) {
return true;
}
return false;
}

private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ spring:

master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master execute task number in parallel per process instance
Expand Down
Loading