Skip to content
Permalink
Browse files
[IOTDB-2942] [IOTDB-2944] Delete duplicate data structure in node-com…
…mons and extend TDataNodeLocation (#5592)
  • Loading branch information
CRZbulabula committed Apr 20, 2022
1 parent aadd8ca commit 7ce6033609f0df3c7b809dfc048c1bfcb47aefb7
Showing 145 changed files with 1,849 additions and 2,163 deletions.
@@ -50,6 +50,11 @@
<artifactId>client-cpp-tools-thrift</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<profiles>
<profile>
@@ -164,7 +169,7 @@
<destinationFile>${project.build.directory}/thrift/rpc.thrift</destinationFile>
</fileSet>
<fileSet>
<sourceFile>../thrift/src/main/thrift/common.thrift</sourceFile>
<sourceFile>../thrift-commons/src/main/thrift/common.thrift</sourceFile>
<destinationFile>${project.build.directory}/thrift/common.thrift</destinationFile>
</fileSet>
</fileSets>
@@ -36,6 +36,11 @@
<artifactId>iotdb-thrift</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
@@ -92,6 +97,9 @@
<resource>
<directory>${basedir}/../thrift/target/generated-sources-python/iotdb/thrift/</directory>
</resource>
<resource>
<directory>${basedir}/../thrift-commons/target/generated-sources-python/iotdb/thrift/</directory>
</resource>
</resources>
</configuration>
</execution>
@@ -40,16 +40,6 @@
<artifactId>service-rpc</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift-confignode</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>node-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-consensus</artifactId>
@@ -152,18 +152,18 @@ data_node_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAl
# Datatype: long
# default_ttl=36000000

####################
### Region Configuration
####################
# Default number of SchemaRegion replicas
# Datatype: int
# schema_replication_factor=3

# The number of replicas of each region
# Default number of DataRegion replicas
# Datatype: int
region_replica_count=1
# data_replication_factor=3

# The number of SchemaRegions of each StorageGroup
# The initial number of SchemaRegions of each StorageGroup
# Datatype: int
# schema_region_count=1
# initial_schema_region_count=1

# The number of DataRegions of each StorageGroup
# The initial number of DataRegions of each StorageGroup
# Datatype: int
# data_region_count=1
# initial_data_region_count=1
@@ -18,9 +18,9 @@
*/
package org.apache.iotdb.confignode.cli;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
import org.apache.iotdb.confignode.persistence.DataNodeInfoPersistence;
import org.apache.iotdb.mpp.rpc.thrift.InternalService;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -59,7 +59,7 @@ private TemporaryClient() {
this.clients = new HashMap<>();
}

public void buildClient(int dataNodeId, Endpoint endpoint) {
public void buildClient(int dataNodeId, TEndPoint endpoint) {
for (int i = 0; i < retryNum; i++) {
try {
TTransport transport =
@@ -83,20 +83,22 @@ public void buildClient(int dataNodeId, Endpoint endpoint) {
}

private TCreateSchemaRegionReq genCreateSchemaRegionReq(
String storageGroup, RegionReplicaSet regionReplicaSet) {
String storageGroup, TRegionReplicaSet regionReplicaSet) {
TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
req.setStorageGroup(storageGroup);
req.setRegionReplicaSet(regionReplicaSet.convertToRPCTRegionReplicaSet());
req.setRegionReplicaSet(regionReplicaSet);
return req;
}

public void createSchemaRegion(
int dataNodeId, String storageGroup, RegionReplicaSet regionReplicaSet) {
int dataNodeId, String storageGroup, TRegionReplicaSet regionReplicaSet) {

if (clients.get(dataNodeId) == null) {
buildClient(
dataNodeId,
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint());
}

TCreateSchemaRegionReq req = genCreateSchemaRegionReq(storageGroup, regionReplicaSet);
@@ -107,19 +109,25 @@ public void createSchemaRegion(
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Create SchemaRegion on DataNode: {} success",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint());
return;
} else {
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint(),
status);
}
} catch (TException e) {
// TODO: Handler SocketTimeOutException
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint(),
e.toString());
try {
TimeUnit.MILLISECONDS.sleep(retryWait);
@@ -131,25 +139,27 @@ public void createSchemaRegion(
}
LOGGER.error(
"Create SchemaRegion on DataNode: {} failed.",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}

private TCreateDataRegionReq genCreateDataRegionReq(
String storageGroup, RegionReplicaSet regionReplicaSet, long TTL) {
String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {
TCreateDataRegionReq req = new TCreateDataRegionReq();
req.setStorageGroup(storageGroup);
req.setRegionReplicaSet(regionReplicaSet.convertToRPCTRegionReplicaSet());
req.setRegionReplicaSet(regionReplicaSet);
req.setTtl(TTL);
return req;
}

public void createDataRegion(
int dataNodeId, String storageGroup, RegionReplicaSet regionReplicaSet, long TTL) {
int dataNodeId, String storageGroup, TRegionReplicaSet regionReplicaSet, long TTL) {

if (clients.get(dataNodeId) == null) {
buildClient(
dataNodeId,
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint());
}

TCreateDataRegionReq req = genCreateDataRegionReq(storageGroup, regionReplicaSet, TTL);
@@ -160,19 +170,25 @@ public void createDataRegion(
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.info(
"Create DataRegion on DataNode: {} success",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint());
return;
} else {
LOGGER.error(
"Create DataRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint(),
status);
}
} catch (TException e) {
// TODO: Handler SocketTimeOutException
LOGGER.error(
"Create DataRegion on DataNode: {} failed, {}. Retrying...",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint(),
DataNodeInfoPersistence.getInstance()
.getOnlineDataNode(dataNodeId)
.getInternalEndPoint(),
e.toString());
try {
TimeUnit.MILLISECONDS.sleep(retryWait);
@@ -184,7 +200,7 @@ public void createDataRegion(
}
LOGGER.error(
"Create DataRegion on DataNode: {} failed.",
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getEndPoint());
DataNodeInfoPersistence.getInstance().getOnlineDataNode(dataNodeId).getInternalEndPoint());
}

private static class TemporaryClientHolder {
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.conf;

import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.rpc.RpcUtils;

@@ -43,8 +43,8 @@ public class ConfigNodeConf {
private String dataNodeConsensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";

/** Used for building the ConfigNode consensus group */
private Endpoint[] configNodeGroupAddressList =
Collections.singletonList(new Endpoint("0.0.0.0", 22278)).toArray(new Endpoint[0]);
private TEndPoint[] configNodeGroupAddressList =
Collections.singletonList(new TEndPoint("0.0.0.0", 22278)).toArray(new TEndPoint[0]);

/** Number of SeriesPartitionSlots per StorageGroup */
private int seriesPartitionSlotNum = 10000;
@@ -90,12 +90,17 @@ public class ConfigNodeConf {
/** Default TTL for storage groups that are not set TTL by statements, in ms. */
private long defaultTTL = Long.MAX_VALUE;

/** The number of replicas of each region */
private int regionReplicaCount = 3;
/** The number of SchemaRegions of each StorageGroup */
private int schemaRegionCount = 1;
/** The number of DataRegions of each StorageGroup */
private int dataRegionCount = 1;
/** Default number of SchemaRegion replicas */
private int schemaReplicationFactor = 3;

/** Default number of DataRegion replicas */
private int dataReplicationFactor = 3;

/** The initial number of SchemaRegions of each StorageGroup */
private int initialSchemaRegionCount = 1;

/** The initial number of DataRegions of each StorageGroup */
private int initialDataRegionCount = 1;

public ConfigNodeConf() {
// empty constructor
@@ -237,11 +242,11 @@ public void setDataNodeConsensusProtocolClass(String dataNodeConsensusProtocolCl
this.dataNodeConsensusProtocolClass = dataNodeConsensusProtocolClass;
}

public Endpoint[] getConfigNodeGroupAddressList() {
public TEndPoint[] getConfigNodeGroupAddressList() {
return configNodeGroupAddressList;
}

public void setConfigNodeGroupAddressList(Endpoint[] configNodeGroupAddressList) {
public void setConfigNodeGroupAddressList(TEndPoint[] configNodeGroupAddressList) {
this.configNodeGroupAddressList = configNodeGroupAddressList;
}

@@ -277,27 +282,35 @@ public void setDefaultTTL(long defaultTTL) {
this.defaultTTL = defaultTTL;
}

public int getRegionReplicaCount() {
return regionReplicaCount;
public int getSchemaReplicationFactor() {
return schemaReplicationFactor;
}

public void setSchemaReplicationFactor(int schemaReplicationFactor) {
this.schemaReplicationFactor = schemaReplicationFactor;
}

public int getDataReplicationFactor() {
return dataReplicationFactor;
}

public void setDataRegionCount(int dataRegionCount) {
this.dataRegionCount = dataRegionCount;
public void setDataReplicationFactor(int dataReplicationFactor) {
this.dataReplicationFactor = dataReplicationFactor;
}

public int getSchemaRegionCount() {
return schemaRegionCount;
public int getInitialSchemaRegionCount() {
return initialSchemaRegionCount;
}

public void setSchemaRegionCount(int schemaRegionCount) {
this.schemaRegionCount = schemaRegionCount;
public void setInitialSchemaRegionCount(int initialSchemaRegionCount) {
this.initialSchemaRegionCount = initialSchemaRegionCount;
}

public int getDataRegionCount() {
return dataRegionCount;
public int getInitialDataRegionCount() {
return initialDataRegionCount;
}

public void setRegionReplicaCount(int regionReplicaCount) {
this.regionReplicaCount = regionReplicaCount;
public void setInitialDataRegionCount(int initialDataRegionCount) {
this.initialDataRegionCount = initialDataRegionCount;
}
}
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.confignode.conf;

import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.utils.CommonUtils;

@@ -174,25 +174,31 @@ private void loadProps() {
Long.parseLong(
properties.getProperty("default_ttl", String.valueOf(conf.getDefaultTTL()))));

conf.setRegionReplicaCount(
conf.setSchemaReplicationFactor(
Integer.parseInt(
properties.getProperty(
"region_replica_count", String.valueOf(conf.getRegionReplicaCount()))));
"schema_replication_factor", String.valueOf(conf.getSchemaReplicationFactor()))));

conf.setSchemaRegionCount(
conf.setDataReplicationFactor(
Integer.parseInt(
properties.getProperty(
"schema_region_count", String.valueOf(conf.getSchemaRegionCount()))));
"data_replication_factor", String.valueOf(conf.getDataReplicationFactor()))));

conf.setDataRegionCount(
conf.setInitialSchemaRegionCount(
Integer.parseInt(
properties.getProperty(
"data_region_count", String.valueOf(conf.getDataRegionCount()))));
"initial_schema_region_count",
String.valueOf(conf.getInitialSchemaRegionCount()))));

conf.setInitialDataRegionCount(
Integer.parseInt(
properties.getProperty(
"initial_data_region_count", String.valueOf(conf.getInitialDataRegionCount()))));

String addresses = properties.getProperty("config_node_group_address_list", "0.0.0.0:22278");

String[] addressList = addresses.split(",");
Endpoint[] endpointList = new Endpoint[addressList.length];
TEndPoint[] endpointList = new TEndPoint[addressList.length];
for (int i = 0; i < addressList.length; i++) {
endpointList[i] = CommonUtils.parseNodeUrl(addressList[i]);
}

0 comments on commit 7ce6033

Please sign in to comment.