Skip to content

Commit

Permalink
support to set agent port by user (#29)
Browse files Browse the repository at this point in the history
support to set agent port by user
  • Loading branch information
liruixl committed Apr 1, 2022
1 parent 0071d6c commit cba05d4
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 69 deletions.
Expand Up @@ -39,4 +39,6 @@ public class AgentInstallEventConfigInfo {

private long agentNodeId;

private int agentPort;

}
Expand Up @@ -109,15 +109,17 @@ public void createClusterResourceOperation(CoreUserEntity user, ClusterInfoEntit
}
}

public void configClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, String packageInfo, String installInfo) {
public void configClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, String packageInfo,
String installInfo, int agentPort) {
log.info("Config cluster {} resource info operation.", clusterInfoEntity.getId());
clusterInfoEntity.setInstallInfo(installInfo);
clusterRepository.save(clusterInfoEntity);

resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(), packageInfo, installInfo);
resourceClusterManager.configOperation(clusterInfoEntity.getResourceClusterId(), packageInfo,
installInfo, agentPort);
}

public void startClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, long requestId) {
public void startClusterResourceOperation(ClusterInfoEntity clusterInfoEntity, long requestId) throws Exception {
log.info("Start cluster {} resource cluster operation.", clusterInfoEntity.getId());
resourceClusterManager.startOperation(clusterInfoEntity.getResourceClusterId(), requestId);
}
Expand Down
Expand Up @@ -84,20 +84,23 @@ public void updateOperation(long resourceClusterId, int userId,
}
}

public void configOperation(long resourceClusterId, String packageInfo, String installInfo) {
public void configOperation(long resourceClusterId, String packageInfo, String installInfo, int agentPort) {
// TODO:The path can be set separately for each machine later
log.info("config resource cluster {}", resourceClusterId);
ResourceClusterEntity resourceClusterEntity = resourceClusterRepository.findById(resourceClusterId).get();
resourceClusterEntity.setRegistryInfo(packageInfo);
resourceClusterRepository.save(resourceClusterEntity);

List<ResourceNodeEntity> nodeEntities = nodeRepository.getByResourceClusterId(resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
nodeEntity.setAgentInstallDir(installInfo);
nodeEntity.setAgentPort(agentPort);
nodeRepository.save(nodeEntity);
}
}

public void startOperation(long resourceClusterId, long requestId) {
public void startOperation(long resourceClusterId, long requestId) throws Exception {

log.info("start resource cluster {} all nodes agent", resourceClusterId);
ResourceClusterEntity clusterEntity = resourceClusterRepository.findById(resourceClusterId).get();
PMResourceClusterAccessInfo accessInfo = JSON.parseObject(clusterEntity.getAccessInfo(),
Expand All @@ -109,7 +112,18 @@ public void startOperation(long resourceClusterId, long requestId) {
configInfo.setSshPort(accessInfo.getSshPort());
configInfo.setSshKey(accessInfo.getSshKey());

log.debug("install agent for resource cluster {} all nodes");
log.debug("check agent port for resource cluster {} all nodes", resourceClusterId);

// before install and start agent, to check whether port is available or not,
// it can not guarantee the port must not be used when starting the agent,
// but it may expose this problem early if the port has been uses.
for (ResourceNodeEntity nodeEntity : nodeEntities) {
if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo)) {
throw new Exception(nodeEntity.getHost() + ":" + nodeEntity.getAgentPort() + " is already in use");
}
}

log.debug("install agent for resource cluster {} all nodes", resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, requestId);
}
Expand Down
Expand Up @@ -67,26 +67,55 @@ public void deleteOperation(long resourceClusterId, String host) {
nodeRepository.deleteByResourceClusterIdAndHost(resourceClusterId, host);
}

public boolean isAvailableAgentPort(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo)
throws Exception {
// agent port check, eg: Spring Boot Param server.port=8008
log.info("check {} node port {}:{}", node.getId(), node.getHost(), node.getAgentPort());
String sshkey = String.format("sshkey-%d-%d", node.getId(), node.getAgentPort());
File sshKeyFile = SSH.buildSshKeyFile(sshkey);
SSH.writeSshKeyFile(configInfo.getSshKey(), sshKeyFile);

// only check listen port
String checkPortCmd = String.format("netstat -tunlp | grep -w %d", node.getAgentPort());
SSH checkPortSSH = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), node.getHost(), checkPortCmd);
if (checkPortSSH.run()) {
String netInfo = checkPortSSH.getStdoutResponse();
log.info("agent node {} port check return output\n: {}", node.getId(), netInfo);

if (netInfo != null && !netInfo.trim().isEmpty()) {
log.error("agent node {} port {} already in use:\n {}", node.getId(), node.getAgentPort(), netInfo);
return false;
}
} else if (checkPortSSH.getExitCode() != 1) { //exit 1 when grep failed, other exit code is exception
log.warn("run check port cmd failed");
throw new Exception("check agent port scrpit execution exception");
}
return true;
}

public void installAgentOperation(ResourceNodeEntity node, AgentInstallEventConfigInfo configInfo, long requestId) {
log.info("install node {} agent for request {}", node.getId(), requestId);
configInfo.setAgentNodeId(node.getId());
configInfo.setInstallDir(node.getAgentInstallDir());
configInfo.setHost(node.getHost());
configInfo.setAgentPort(node.getAgentPort());

long eventId = node.getCurrentEventId();

log.info("event {}: to install and start {} node agent {}:{} in {}", eventId, node.getId(),
node.getHost(), node.getAgentPort(), node.getAgentInstallDir());
// Check whether the current node is already installing agent or agent installation has failed
HeartBeatEventEntity agentInstallAgentEntity;
if (eventId < 1L) {
log.debug("first install agent for node {}", node.getId());
// fisrt time install agent
// first time install agent
// create HeartBeatEvent
HeartBeatEventEntity eventEntity = new HeartBeatEventEntity(HeartBeatEventType.AGENT_INSTALL.name(),
HeartBeatEventResultType.INIT.name(), JSON.toJSONString(configInfo), requestId);

agentInstallAgentEntity = heartBeatEventRepository.save(eventEntity);
eventId = agentInstallAgentEntity.getId();

log.info("first time to install agent, create heart beat event {}", eventId);
node.setCurrentEventId(eventId);
nodeRepository.save(node);
} else {
Expand Down Expand Up @@ -189,61 +218,25 @@ public void installAgentOperation(ResourceNodeEntity node, AgentInstallEventConf

// agent start
// AGENT_START stage
String agentInstallHome = configInfo.getInstallDir() + File.separator + "agent";

// 1 port check, eg: server.port=8008
// grep = application.properties | grep -w server.port | awk -F '=' '{print $2}'
String confFile = agentInstallHome + File.separator + AGENT_CONFIG_PATH;
String portGetFormat = "grep = %s | grep -w server.port | awk -F '=' '{print $2}'";
String portGetCmd = String.format(portGetFormat, confFile);

SSH portGetSSH = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), portGetCmd);

int agentPort = -1;
if (portGetSSH.run()) {
String portStr = portGetSSH.getStdoutResponse();
log.info("agent {} port get return output: {}", configInfo.getAgentNodeId(), portStr);

if (portStr == null || portStr.isEmpty()) {
log.warn("agent {} server.port is not set", configInfo.getAgentNodeId());
} else {
try {
agentPort = Integer.parseInt(portStr.trim());
} catch (NumberFormatException e) {
log.warn("agent port format is not Integer");
}
try {
if (!isAvailableAgentPort(node, configInfo)) {
log.error("port {}:{} already in use", configInfo.getHost(), configInfo.getAgentPort());
updateFailResult(String.format("agent port %s:%d already in use",
configInfo.getHost(), configInfo.getAgentPort()),
AgentInstallEventStage.AGENT_DEPLOY.getStage(), agentInstallAgentEntity);
return;
}

} else {
log.warn("run agent port get cmd failed:{}, skip the check and use default port",
portGetSSH.getErrorResponse());
} catch (Exception e) {
e.printStackTrace();
log.error("check agent port exception, skip port check");
}

if (agentPort > 0) {
log.info("agent start port is {}", agentPort);
// only check listen port
String checkPortCmd = String.format("netstat -tunlp | grep -w %s", agentPort);
SSH checkPortSSH = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), checkPortCmd);
if (checkPortSSH.run()) {
String netInfo = checkPortSSH.getStdoutResponse();
log.info("agent {} port check return output: {}", configInfo.getAgentNodeId(), netInfo);

if (netInfo != null && !netInfo.trim().isEmpty()) {
log.error("port {} already in use, {}", agentPort, netInfo);
updateFailResult("port already in use",
AgentInstallEventStage.AGENT_START.getStage(), agentInstallAgentEntity);
return;
}
} else {
log.warn("run check port cmd failed");
}
}
String agentInstallHome = configInfo.getInstallDir() + File.separator + "agent";

// 2 run start shell
String command = "cd %s && sh %s --server %s --agent %s";
String cmd = String.format(command, agentInstallHome, AGENT_START_SCRIPT, getServerAddr(), configInfo.getAgentNodeId());
log.info("to start agent with port {}", configInfo.getAgentPort());
String command = "cd %s && sh %s --server %s --agent %d --port %d";
String cmd = String.format(command, agentInstallHome, AGENT_START_SCRIPT,
getServerAddr(), configInfo.getAgentNodeId(), configInfo.getAgentPort());
SSH startSsh = new SSH(configInfo.getSshUser(), configInfo.getSshPort(),
sshKeyFile.getAbsolutePath(), configInfo.getHost(), cmd);
if (!startSsh.run()) {
Expand Down
Expand Up @@ -104,11 +104,12 @@ private ModelControlResponse handleCreateResourceClusterEvent(CoreUserEntity use

// CONFIG_AND_START_RESOURCE_CLUSTER
private ModelControlResponse handleConfigAndStartResourceClusterEvent(CoreUserEntity user,
DorisClusterCreationRequest request) {
DorisClusterCreationRequest request)
throws Exception {
long clusterId = request.getClusterId();
ClusterInfoEntity clusterInfoEntity = clusterInfoRepository.findById(clusterId).get();
dorisClusterManager.configClusterResourceOperation(clusterInfoEntity, request.getReqInfo().getPackageInfo(),
request.getReqInfo().getInstallInfo());
request.getReqInfo().getInstallInfo(), request.getReqInfo().getAgentPort());
dorisClusterManager.startClusterResourceOperation(clusterInfoEntity, request.getRequestId());

return getResponse(request, false);
Expand All @@ -135,7 +136,7 @@ private ModelControlResponse handleScheduleDorisClusterEvent(CoreUserEntity user

// CONFIG_AND_DEPLOY_DORIS_CLUSTER
private ModelControlResponse handleConfigAndDeployDorisClusterEvent(CoreUserEntity user,
DorisClusterCreationRequest request) {
DorisClusterCreationRequest request) {
long clusterId = request.getClusterId();

ClusterInfoEntity clusterInfoEntity = clusterInfoRepository.findById(clusterId).get();
Expand All @@ -147,7 +148,7 @@ private ModelControlResponse handleConfigAndDeployDorisClusterEvent(CoreUserEnti

// DORIS_CLUSTER_DEPLOYED
private ModelControlResponse handleDorisClusterDeployedEvent(CoreUserEntity user,
DorisClusterCreationRequest request) throws Exception {
DorisClusterCreationRequest request) throws Exception {
long clusterId = request.getClusterId();
dorisClusterManager.checkClusterInstancesOperation(clusterId);
return getResponse(request, false);
Expand Down
Expand Up @@ -118,7 +118,7 @@ private ModelControlResponse handleAccessDorisClusterEvent(CoreUserEntity user,

// CREATE_AND_START_RESOURCE_CLUSTER
private ModelControlResponse handleCreateAndStartResourceClusterEvent(CoreUserEntity user,
DorisClusterTakeOverRequest request) throws Exception {
DorisClusterTakeOverRequest request) throws Exception {
long clusterId = request.getClusterId();
log.info("handle take over cluster {} CREATE_AND_START_RESOURCE_CLUSTER request {} event",
clusterId, request.getRequestId());
Expand Down Expand Up @@ -147,8 +147,10 @@ private ModelControlResponse handleCreateAndStartResourceClusterEvent(CoreUserEn
log.debug("The node list IP of Doris cluster is {}", nodeIps);

dorisClusterManager.createClusterResourceOperation(user, clusterInfo, request.getReqInfo().getAuthInfo(), nodeIps);
dorisClusterManager.configClusterResourceOperation(clusterInfo, "", request.getReqInfo().getInstallInfo());
dorisClusterManager.configClusterResourceOperation(clusterInfo, "",
request.getReqInfo().getInstallInfo(), request.getReqInfo().getAgentPort());

// TODO sshInfo and iplist can check agent port
List<ResourceNodeEntity> nodeEntities =
nodeRepository.getByResourceClusterId(clusterInfo.getResourceClusterId());
Set<Long> feNodeIds = new HashSet<>();
Expand Down
Expand Up @@ -37,6 +37,8 @@ public class DorisClusterCreationReq extends ModelControlReq {

private String installInfo;

private int agentPort;

// Step 4: Install agent

// Step 5:Planning resource node
Expand Down
Expand Up @@ -39,6 +39,8 @@ public class DorisClusterTakeOverReq extends ModelControlReq {

private String installInfo;

private int agentPort;

// Step 4: check Install agent
// Step 5: create cluster module and instance, check agent instance
}
Expand Up @@ -34,6 +34,7 @@ public abstract class BaseCommand {
protected String[] resultCommand;
protected String stdoutResponse;
protected String errorResponse;
protected int exitCode;

protected abstract void buildCommand();

Expand All @@ -45,6 +46,10 @@ public String getErrorResponse() {
return this.errorResponse;
}

public int getExitCode() {
return this.exitCode;
}

public boolean run() {
buildCommand();
log.info("run command: {}", StringUtils.join(resultCommand, " "));
Expand All @@ -59,7 +64,8 @@ public boolean run() {

stdoutResponse = stdoutBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
errorResponse = errorBufferedReader.lines().parallel().collect(Collectors.joining(System.lineSeparator()));
final int exitCode = process.waitFor();

exitCode = process.waitFor();
if (exitCode == 0) {
return true;
} else {
Expand Down
13 changes: 11 additions & 2 deletions manager/manager-bin/agent/bin/agent_start.sh
Expand Up @@ -27,17 +27,20 @@ OPTS=$(getopt \
-o '' \
-l 'server:' \
-l 'agent:' \
-l 'port:' \
-- "$@")

eval set -- "$OPTS"

#host:port
SERVER=
AGENT=
PORT=
while true; do
case "$1" in
--server) SERVER=$2 ; shift 2;;
--agent) AGENT=$2 ; shift 2;;
--port) PORT=$2 ; shift 2;;
--) shift ; break ;;
*) echo "Internal error" ; exit 1 ;;
esac
Expand All @@ -52,6 +55,12 @@ if [ x"$AGENT" == x"" ]; then
echo "--agent node id can not empty!"
exit 1
fi

if [ x"$PORT" == x"" ]; then
echo "--port agent port can not empty!"
exit 1
fi

export AGENT_HOME=`cd "$curdir/.."; pwd`

#
Expand All @@ -60,7 +69,7 @@ export AGENT_HOME=`cd "$curdir/.."; pwd`
# LOG_DIR
# PID_DIR
export JAVA_OPTS="-Xmx1024m"
export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT"
export SERVER_PARAMS="--manager.server.endpoint=$SERVER --agent.node.id=$AGENT --server.port=$PORT"
export LOG_DIR="$AGENT_HOME/log"
export PID_DIR=`cd "$curdir"; pwd`

Expand Down Expand Up @@ -88,4 +97,4 @@ fi
nohup $JAVA $JAVA_OPTS -jar ${AGENT_HOME}/lib/dm-agent.jar $SERVER_PARAMS >> $LOG_DIR/agent.out 2>&1 &
echo `date` >> $LOG_DIR/agent.out

echo $! > $pidfile
echo $! > $pidfile

0 comments on commit cba05d4

Please sign in to comment.