Skip to content

Commit

Permalink
[IOTDB-5326] Add cluster_name parameter (#8708)
Browse files Browse the repository at this point in the history
  • Loading branch information
CRZbulabula committed Jan 3, 2023
1 parent 38438f9 commit 402e269
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 70 deletions.
Expand Up @@ -143,6 +143,9 @@ private void loadProps() {
}

private void loadProperties(Properties properties) throws BadNodeUrlException, IOException {
conf.setClusterName(
properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()).trim());

conf.setInternalAddress(
properties
.getProperty(IoTDBConstant.CN_INTERNAL_ADDRESS, conf.getInternalAddress())
Expand Down
Expand Up @@ -35,8 +35,6 @@ public class DataNodeRegisterResp implements DataSet {

private TSStatus status;
private List<TConfigNodeLocation> configNodeList;

private String clusterName;
private Integer dataNodeId;

private TRuntimeConfiguration runtimeConfiguration;
Expand All @@ -57,10 +55,6 @@ public void setConfigNodeList(List<TConfigNodeLocation> configNodeList) {
this.configNodeList = configNodeList;
}

public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}

public void setDataNodeId(Integer dataNodeId) {
this.dataNodeId = dataNodeId;
}
Expand Down Expand Up @@ -89,7 +83,6 @@ public TDataNodeRegisterResp convertToRpcDataNodeRegisterResp() {
resp.setConfigNodeList(configNodeList);

if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
resp.setClusterName(clusterName);
resp.setDataNodeId(dataNodeId);
resp.setRuntimeConfiguration(runtimeConfiguration);
}
Expand Down
Expand Up @@ -104,6 +104,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
Expand Down Expand Up @@ -291,16 +292,18 @@ public DataSet getSystemConfiguration() {
}

@Override
public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
public DataSet registerDataNode(TDataNodeRegisterReq req) {
TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
status =
ClusterNodeStartUtils.confirmNodeRegistration(
NodeType.DataNode,
registerDataNodePlan.getDataNodeConfiguration().getLocation(),
req.getClusterName(),
req.getDataNodeConfiguration().getLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.registerDataNode(registerDataNodePlan);
return nodeManager.registerDataNode(
new RegisterDataNodePlan(req.getDataNodeConfiguration()));
}
}

Expand Down Expand Up @@ -833,7 +836,10 @@ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
if (status == null) {
status =
ClusterNodeStartUtils.confirmNodeRegistration(
NodeType.ConfigNode, req.getConfigNodeLocation(), this);
NodeType.ConfigNode,
req.getClusterParameters().getClusterName(),
req.getConfigNodeLocation(),
this);
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return nodeManager.registerConfigNode(req);
}
Expand Down Expand Up @@ -871,10 +877,6 @@ public TSStatus checkConfigNodeGlobalConfig(TConfigNodeRegisterReq req) {
TSStatus errorStatus = new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode());
TClusterParameters clusterParameters = req.getClusterParameters();

if (!clusterParameters.getClusterName().equals(CONF.getClusterName())) {
return errorStatus.setMessage(errorPrefix + "cluster_name" + errorSuffix);
}

if (!clusterParameters
.getConfigNodeConsensusProtocolClass()
.equals(CONF.getConfigNodeConsensusProtocolClass())) {
Expand Down
Expand Up @@ -34,7 +34,6 @@
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
Expand All @@ -56,6 +55,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateSchemaTemplateReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRestartResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
Expand Down Expand Up @@ -196,7 +196,7 @@ public interface IManager {
*
* @return DataNodeConfigurationDataSet
*/
DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan);
DataSet registerDataNode(TDataNodeRegisterReq req);

/**
* Restart DataNode
Expand Down
Expand Up @@ -52,14 +52,33 @@ public class ClusterNodeStartUtils {
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("Accept Node restart.");

public static TSStatus confirmNodeRegistration(
NodeType nodeType, Object nodeLocation, ConfigManager configManager) {
NodeType nodeType, String clusterName, Object nodeLocation, ConfigManager configManager) {

final String CONF_FILE_NAME =
NodeType.ConfigNode.equals(nodeType)
? ConfigNodeConstant.CONF_FILE_NAME
: IoTDBConstant.DATA_NODE_CONF_FILE_NAME;
TSStatus status = new TSStatus();

/* Reject start if the cluster name is error */
if (!CLUSTER_NAME.equals(clusterName)) {
status.setCode(TSStatusCode.REJECT_NODE_START.getStatusCode());
status.setMessage(
String.format(
"Reject %s start. Because the ClusterName of the current %s and the target cluster are inconsistent. "
+ "ClusterName of the current Node: %s, ClusterName of the target cluster: %s."
+ POSSIBLE_SOLUTIONS
+ "\t1. Change the target_config_node_list parameter in %s to join the correct cluster."
+ "\n\t2. Change the cluster_name parameter in %s to match the target cluster",
nodeType.getNodeType(),
nodeType.getNodeType(),
clusterName,
CLUSTER_NAME,
CONF_FILE_NAME,
CONF_FILE_NAME));
return status;
}

/* Check if there exist conflict TEndPoints */
List<TEndPoint> conflictEndPoints;
switch (nodeType) {
Expand Down Expand Up @@ -119,11 +138,13 @@ public static TSStatus confirmNodeRestart(
"Reject %s restart. Because the ClusterName of the current %s and the target cluster are inconsistent. "
+ "ClusterName of the current Node: %s, ClusterName of the target cluster: %s."
+ POSSIBLE_SOLUTIONS
+ "\t1. Change the target_config_node_list parameter in %s to join the correct cluster.",
+ "\t1. Change the target_config_node_list parameter in %s to join the correct cluster."
+ "\n\t2. Change the cluster_name parameter in %s to match the target cluster",
nodeType.getNodeType(),
nodeType.getNodeType(),
clusterName,
CLUSTER_NAME,
CONF_FILE_NAME,
CONF_FILE_NAME));
return status;
}
Expand Down
Expand Up @@ -270,7 +270,6 @@ public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {

resp.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION);
resp.setConfigNodeList(getRegisteredConfigNodes());
resp.setClusterName(CONF.getClusterName());
resp.setDataNodeId(
registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId());
resp.setRuntimeConfiguration(getRuntimeConfiguration());
Expand Down Expand Up @@ -384,7 +383,6 @@ public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
configManager.getProcedureManager().addConfigNode(req);
return new TConfigNodeRegisterResp()
.setStatus(ClusterNodeStartUtils.ACCEPT_NODE_REGISTRATION)
.setClusterName(CONF.getClusterName())
.setConfigNodeId(nodeId);
}

Expand Down
Expand Up @@ -95,7 +95,7 @@ public void active() {
if (SystemPropertiesUtils.isRestarted()) {
LOGGER.info("{} is in restarting process...", ConfigNodeConstant.GLOBAL_NAME);

/* Always restore ClusterId and ConfigNodeId first */
/* Always restore ClusterName and ConfigNodeId first */
CONF.setClusterName(SystemPropertiesUtils.loadClusterNameWhenRestarted());
CONF.setConfigNodeId(SystemPropertiesUtils.loadConfigNodeIdWhenRestarted());

Expand All @@ -107,8 +107,9 @@ public void active() {
configManager.initConsensusManager();
setUpRPCService();
LOGGER.info(
"{} has successfully restarted and joined the cluster.",
ConfigNodeConstant.GLOBAL_NAME);
"{} has successfully restarted and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getClusterName());
return;
}

Expand Down Expand Up @@ -140,7 +141,9 @@ public void active() {
// The initial startup of Seed-ConfigNode finished

LOGGER.info(
"{} has successfully started and joined the cluster.", ConfigNodeConstant.GLOBAL_NAME);
"{} has successfully started and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getClusterName());
return;
}

Expand All @@ -152,9 +155,10 @@ public void active() {
// The initial startup of Non-Seed-ConfigNode is not yet finished,
// we should wait for leader's scheduling
LOGGER.info(
"{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster.",
"{} {} has registered successfully. Waiting for the leader's scheduling to join the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
CONF.getConfigNodeId());
CONF.getConfigNodeId(),
CONF.getClusterName());

boolean isJoinedCluster = false;
for (int retry = 0; retry < SCHEDULE_WAITING_RETRY_NUM; retry++) {
Expand Down
Expand Up @@ -44,7 +44,6 @@
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.CountStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.read.storagegroup.GetStorageGroupPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.storagegroup.SetDataReplicationFactorPlan;
Expand Down Expand Up @@ -202,9 +201,7 @@ public TSystemConfigurationResp getSystemConfiguration() {
@Override
public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req) {
TDataNodeRegisterResp resp =
((DataNodeRegisterResp)
configManager.registerDataNode(
new RegisterDataNodePlan(req.getDataNodeConfiguration())))
((DataNodeRegisterResp) configManager.registerDataNode(req))
.convertToRpcDataNodeRegisterResp();

// Print log to record the ConfigNode that performs the RegisterDatanodeRequest
Expand Down Expand Up @@ -488,7 +485,9 @@ public TSStatus notifyRegisterSuccess() {

// The initial startup of Non-Seed-ConfigNode finished
LOGGER.info(
"{} has successfully started and joined the cluster.", ConfigNodeConstant.GLOBAL_NAME);
"{} has successfully started and joined the cluster: {}.",
ConfigNodeConstant.GLOBAL_NAME,
ConfigNodeDescriptor.getInstance().getConf().getClusterName());
return StatusUtils.OK;
}

Expand Down

0 comments on commit 402e269

Please sign in to comment.