Skip to content
Permalink
Browse files
node agent async start and stop (#37)
node agent async start and stop
  • Loading branch information
liruixl committed Apr 8, 2022
1 parent b68db6a commit f2170493a21a706e602495d2ff8c04a5e2588cde
Showing 12 changed files with 463 additions and 82 deletions.
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.manager.common.heartbeat.config;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class AgentUnInstallEventConfigInfo {
private String sshUser;

private int sshPort;

private String sshKey;

private String host;

private String installDir;

private long agentNodeId;

private int agentPort;
}
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.manager.common.heartbeat.stage;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@AllArgsConstructor
@NoArgsConstructor
@Getter
public enum AgentUnInstallEventStage {
AGENT_STOP("agent stop failed", "agent stop succeeded", 1, true);

private String error;

private String message;

private int stage;

private boolean isLast;

}
@@ -26,6 +26,7 @@
import org.apache.doris.stack.control.ModelControlLevel;
import org.apache.doris.stack.control.ModelControlRequestType;
import org.apache.doris.stack.control.manager.DorisClusterManager;
import org.apache.doris.stack.control.manager.ResourceClusterManager;
import org.apache.doris.stack.dao.ClusterInfoRepository;
import org.apache.doris.stack.dao.ClusterUserMembershipRepository;
import org.apache.doris.stack.dao.CoreUserRepository;
@@ -95,6 +96,9 @@ public class DorisManagerUserSpaceComponent extends BaseService {
@Autowired
private ClusterInfoRepository clusterInfoRepository;

@Autowired
private ResourceClusterManager resourceClusterManager;

@Autowired
private PaloLoginClient paloLoginClient;

@@ -447,24 +451,27 @@ public void deleteSpace(long spaceId) throws Exception {
// delete cluster information
clusterInfoRepository.deleteById(spaceId);

try {
// delete cluster configuration
log.debug("delete cluster {} config infos.", spaceId);
settingComponent.deleteAdminSetting(spaceId);
// delete cluster configuration
log.debug("delete cluster {} config infos.", spaceId);
settingComponent.deleteAdminSetting(spaceId);

deleteClusterPermissionInfo(clusterInfo);
deleteClusterPermissionInfo(clusterInfo);

// delete user information
log.debug("delete cluster {} all user membership.", spaceId);
clusterUserMembershipRepository.deleteByClusterId(spaceId);
// delete user information
log.debug("delete cluster {} all user membership.", spaceId);
clusterUserMembershipRepository.deleteByClusterId(spaceId);

// TODO: In order to be compatible with the deleted content of spatial information before, it is put here.
// If the interface that releases both cluster and physical resources is implemented later,
// it will be unified in the current doriscluster processing operation
clusterManager.deleteClusterOperation(clusterInfo);
} catch (Exception e) {
log.warn("delete space {} related information failed", spaceId, e);
// TODO: In order to be compatible with the deleted content of spatial information before, it is put here.
// If the interface that releases both cluster and physical resources is implemented later,
// it will be unified in the current doriscluster processing operation
clusterManager.deleteClusterOperation(clusterInfo);

if (clusterInfo.getResourceClusterId() < 1L) {
log.info("resource cluster has not been created");
return;
}
resourceClusterManager.deleteAgentsOperation(clusterInfo.getResourceClusterId());
resourceClusterManager.deleteOperation(clusterInfo.getResourceClusterId());
}

private void deleteClusterPermissionInfo(ClusterInfoEntity clusterInfo) throws Exception {
@@ -489,7 +496,8 @@ private void deleteClusterPermissionInfo(ClusterInfoEntity clusterInfo) throws E
managerMetaSyncComponent.deleteClusterMetadata(clusterInfo);

log.debug("Delete cluster {} analyzer user {}.", spaceId, allUserGroup.getPaloUserName());
queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS, ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo, allUserGroup.getPaloUserName());
queryClient.deleteUser(ConstantDef.DORIS_DEFAULT_NS, ConstantDef.MYSQL_DEFAULT_SCHEMA, clusterInfo,
allUserGroup.getPaloUserName());
}

// After deleting the user's space, set clusterid to 0
@@ -80,13 +80,13 @@ public class DorisClusterManager {
private JdbcSampleClient jdbcClient;

// Ensure the data atomicity of creating user space, so add transactions
@Transactional
@Transactional(rollbackFor = Exception.class)
public long initOperation(NewUserSpaceCreateReq spaceInfo, String creator) throws Exception {
return userSpaceComponent.create(spaceInfo, creator);
}

// Ensure the atomicity of data in user space, so add transactions
@Transactional
@Transactional(rollbackFor = Exception.class)
public void updateClusterOperation(CoreUserEntity user, long clusterId,
NewUserSpaceCreateReq spaceInfo) throws Exception {
userSpaceComponent.update(user, clusterId, spaceInfo);
@@ -319,7 +319,7 @@ public void reStartClusterOperation(long clusterId, long requestId) throws Excep

public void deleteClusterOperation(ClusterInfoEntity clusterInfo)throws Exception {
long clusterId = clusterInfo.getId();
log.info("Delete cluster {} instances operation.", clusterId);
log.info("Delete {} cluster {} instances operation.", clusterId, clusterInfo.getName());
deleteClusterOperation(clusterId);
}

@@ -330,5 +330,4 @@ private void deleteClusterOperation(long clusterId)throws Exception {
clusterModuleManager.deleteOperation(moduleEntity);
}
}

}
@@ -19,7 +19,9 @@

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.doris.manager.common.heartbeat.config.AgentInstallEventConfigInfo;
import org.apache.doris.manager.common.heartbeat.config.AgentUnInstallEventConfigInfo;
import org.apache.doris.stack.dao.ResourceClusterRepository;
import org.apache.doris.stack.dao.ResourceNodeRepository;
import org.apache.doris.stack.entity.ResourceClusterEntity;
@@ -29,7 +31,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

@Slf4j
@Component
@@ -74,6 +80,8 @@ public void updateOperation(long resourceClusterId, int userId,
List<String> reduceList = ListUtil.getReduceList(hosts, existHosts);
log.debug("resource cluster {} reduce nodes", reduceList);
for (String host : reduceList) {
// node agent maybe not installed yet
// only delete cluster node db info
nodeAndAgentManager.deleteOperation(resourceClusterId, host);
}

@@ -116,15 +124,51 @@ public void startOperation(long resourceClusterId, long requestId) throws Except

// before install and start agent, to check whether port is available or not,
// it can not guarantee the port must not be used when starting the agent,
// but it may expose this problem early if the port has been uses.
// but it may expose this problem early if the port has been used.
List<Pair<ResourceNodeEntity, CompletableFuture<Boolean>>> nodeFutures = new ArrayList<>();
for (ResourceNodeEntity nodeEntity : nodeEntities) {
if (!nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo)) {
throw new Exception(nodeEntity.getHost() + ":" + nodeEntity.getAgentPort() + " is already in use");
CompletableFuture<Boolean> portCheckFuture = CompletableFuture.supplyAsync(() -> {
try {
nodeAndAgentManager.checkSshConnect(nodeEntity, configInfo);
return nodeAndAgentManager.isAvailableAgentPort(nodeEntity, configInfo);
} catch (Exception e) {
log.error("check node {} exception: {}", nodeEntity.getHost(), e.getMessage());
throw new CompletionException(e);
}
});
nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
}

boolean checkFailed = false;
StringBuilder exStrBuilder = new StringBuilder();
for (Pair<ResourceNodeEntity, CompletableFuture<Boolean>> nodeFuture: nodeFutures) {
ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
CompletableFuture<Boolean> future = nodeFuture.getRight();
try {
boolean isAvailablePort = future.get();
if (!isAvailablePort) {
checkFailed = true;
log.error("node {}:{} port already in use", nodeEntity.getHost(), nodeEntity.getAgentPort());
throw new Exception(String.format("node %s:%d port already in use",
nodeEntity.getHost(), nodeEntity.getAgentPort()));
}
} catch (Exception e) {
checkFailed = true;
log.error("node {}:{} check exception {}", nodeEntity.getHost(), nodeEntity.getAgentPort(), e);
exStrBuilder.append(String.format("%s:%d, %s",
nodeEntity.getHost(), nodeEntity.getAgentPort(), e));
exStrBuilder.append("\n");
}
}

if (checkFailed) {
log.error("check node exception list: {}\n", exStrBuilder);
throw new Exception(exStrBuilder.toString());
}

log.debug("install agent for resource cluster {} all nodes", resourceClusterId);
for (ResourceNodeEntity nodeEntity : nodeEntities) {
log.info("start to install agent to {} node {}", nodeEntity.getId(), nodeEntity.getHost());
nodeAndAgentManager.installAgentOperation(nodeEntity, configInfo, requestId);
}
}
@@ -139,6 +183,94 @@ public void checkNodesAgentOperation(long resourceClusterId) throws Exception {
+ "The next step cannot be carried out temporarily");
}
}
}

public void deleteOperation(long resourceClusterId) throws Exception {
log.info("to delete resource cluster {} info", resourceClusterId);

Optional<ResourceClusterEntity> resourceClusterOpt = resourceClusterRepository.findById(resourceClusterId);
if (!resourceClusterOpt.isPresent()) {
log.error("resource cluster {} does not exist", resourceClusterId);
throw new Exception("resource cluster" + resourceClusterId + "does not exist");
}

ResourceClusterEntity clusterEntity = resourceClusterOpt.get();

List<ResourceNodeEntity> nodeList = nodeRepository.getByResourceClusterId(resourceClusterId);
for (ResourceNodeEntity node : nodeList) {
log.info("delete node {}, host: {}", node.getId(), node.getHost());
nodeAndAgentManager.deleteOperation(node.getId());
}

log.info("delete resource cluster {}", resourceClusterId);
resourceClusterRepository.delete(clusterEntity);
}

public void deleteAgentsOperation(long resourceClusterId) throws Exception {
log.info("delete resource cluster {} all nodes agent", resourceClusterId);

Optional<ResourceClusterEntity> resourceClusterOpt = resourceClusterRepository.findById(resourceClusterId);
if (!resourceClusterOpt.isPresent()) {
throw new Exception("resource cluster " + resourceClusterId + " does not exist");
}

ResourceClusterEntity clusterEntity = resourceClusterOpt.get();
PMResourceClusterAccessInfo accessInfo = JSON.parseObject(clusterEntity.getAccessInfo(),
PMResourceClusterAccessInfo.class);

List<ResourceNodeEntity> nodeEntities = nodeRepository.getByResourceClusterId(resourceClusterId);

List<ResourceNodeEntity> agentInstalledNodes = new ArrayList<>();
for (ResourceNodeEntity nodeEntity : nodeEntities) {
if (!nodeAndAgentManager.checkAgentOperation(nodeEntity)) {
log.warn("the agent has not been installed on {} node {}", nodeEntity.getId(), nodeEntity.getHost());
} else {
agentInstalledNodes.add(nodeEntity);
}
}

// we check something before uninstall agent
// to guarantee uninstall operation must be executed
List<Pair<ResourceNodeEntity, CompletableFuture<Void>>> nodeFutures = new ArrayList<>();
for (ResourceNodeEntity nodeEntity : agentInstalledNodes) {
CompletableFuture<Void> portCheckFuture = CompletableFuture.runAsync(() -> {
AgentInstallEventConfigInfo installConfig = new AgentInstallEventConfigInfo();
installConfig.setSshUser(accessInfo.getSshUser());
installConfig.setSshPort(accessInfo.getSshPort());
installConfig.setSshKey(accessInfo.getSshKey());

try {
log.info("check ssh connect and stop script before uninstall agent on node {}", nodeEntity.getId());
nodeAndAgentManager.checkSshConnect(nodeEntity, installConfig);
nodeAndAgentManager.checkStopScriptExist(nodeEntity, installConfig);
} catch (Exception e) {
log.error("check node {} exception: {}", nodeEntity.getHost(), e.getMessage());
throw new CompletionException(e);
}
});
nodeFutures.add(Pair.of(nodeEntity, portCheckFuture));
}

for (Pair<ResourceNodeEntity, CompletableFuture<Void>> nodeFuture: nodeFutures) {
ResourceNodeEntity nodeEntity = nodeFuture.getLeft();
CompletableFuture<Void> future = nodeFuture.getRight();
try {
future.get();
} catch (Exception e) {
log.error("check {} node {} stop script exception {}", nodeEntity.getId(), nodeEntity.getHost(), e);
throw new Exception("check node stop script failed" + e);
}
}

// async delete agent
for (ResourceNodeEntity nodeEntity : nodeEntities) {
AgentUnInstallEventConfigInfo uninstallConfig = new AgentUnInstallEventConfigInfo(
accessInfo.getSshUser(), accessInfo.getSshPort(), accessInfo.getSshKey(),
nodeEntity.getHost(), nodeEntity.getAgentInstallDir(),
nodeEntity.getId(), nodeEntity.getAgentPort());

log.info("to stop agent of {} node {}", nodeEntity.getId(), nodeEntity.getHost());
nodeAndAgentManager.deleteAgentOperation(nodeEntity, uninstallConfig);
}
}
}

0 comments on commit f217049

Please sign in to comment.