Skip to content
Permalink
Browse files
[IOTDB-3433] add metric for cluster. (#6291)
Nice done
  • Loading branch information
SpriCoder committed Jun 16, 2022
1 parent 99279e0 commit 956216161304be9ff89b473d230c9bc13aedbb38
Show file tree
Hide file tree
Showing 15 changed files with 2,997 additions and 24 deletions.
@@ -664,4 +664,10 @@ public List<PartialPath> checkStorageGroupExist(List<PartialPath> storageGroups)
}
return noExistSg;
}

@Override
public void addMetrics() {
partitionManager.addMetrics();
nodeManager.addMetrics();
}
}
@@ -220,4 +220,6 @@ public interface Manager {
TSStatus createFunction(String udfName, String className, List<String> uris);

TSStatus dropFunction(String udfName);

void addMetrics();
}
@@ -179,6 +179,10 @@ public TSStatus applyConfigNode(ApplyConfigNodeReq applyConfigNodeReq) {
}
}

public void addMetrics() {
nodeInfo.addMetrics();
}

public List<TConfigNodeLocation> getOnlineConfigNodes() {
return nodeInfo.getOnlineConfigNodes();
}
@@ -448,6 +448,10 @@ public void clearDeletedRegions() {
}
}

public void addMetrics() {
partitionInfo.addMetrics();
}

/**
* Get TSeriesPartitionSlot
*
@@ -31,6 +31,11 @@
import org.apache.iotdb.confignode.consensus.request.write.ApplyConfigNodeReq;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodeReq;
import org.apache.iotdb.confignode.consensus.response.DataNodeInfosResp;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

@@ -104,6 +109,29 @@ public NodeInfo() {
new HashSet<>(ConfigNodeDescriptor.getInstance().getConf().getConfigNodeList());
}

public void addMetrics() {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
onlineConfigNodes,
o -> getOnlineDataNodeCount(),
Tag.NAME.toString(),
"online");
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
onlineDataNodes,
Map::size,
Tag.NAME.toString(),
"online");
}
}

/** @return true if the specific DataNode is now online */
public boolean isOnlineDataNode(TDataNodeLocation info) {
boolean result = false;
@@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
@@ -42,6 +43,11 @@
import org.apache.iotdb.confignode.consensus.response.SchemaPartitionResp;
import org.apache.iotdb.confignode.exception.StorageGroupNotExistsException;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -59,6 +65,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -98,6 +105,42 @@ public PartitionInfo() {
this.deletedRegionSet = Collections.synchronizedSet(new HashSet<>());
}

public void addMetrics() {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.STORAGE_GROUP.toString(),
MetricLevel.CORE,
storageGroupPartitionTables,
o -> o.size() / 2,
Tag.NAME.toString(),
"number");
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.REGION.toString(),
MetricLevel.IMPORTANT,
this,
o -> o.updateRegionMetric(TConsensusGroupType.SchemaRegion),
Tag.NAME.toString(),
"total",
Tag.TYPE.toString(),
TConsensusGroupType.SchemaRegion.toString());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.REGION.toString(),
MetricLevel.IMPORTANT,
this,
o -> o.updateRegionMetric(TConsensusGroupType.DataRegion),
Tag.NAME.toString(),
"total",
Tag.TYPE.toString(),
TConsensusGroupType.DataRegion.toString());
}
}

public int generateNextRegionGroupId() {
return nextRegionGroupId.getAndIncrement();
}
@@ -113,7 +156,9 @@ public int generateNextRegionGroupId() {
* @return SUCCESS_STATUS if the new StorageGroupPartitionInfo is created successfully.
*/
public TSStatus setStorageGroup(SetStorageGroupReq req) {
storageGroupPartitionTables.put(req.getSchema().getName(), new StorageGroupPartitionTable());
String storageGroupName = req.getSchema().getName();
storageGroupPartitionTables.put(
storageGroupName, new StorageGroupPartitionTable(storageGroupName));

LOGGER.info("Successfully set StorageGroup: {}", req.getSchema());

@@ -487,6 +532,69 @@ public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(
return storageGroupPartitionTables.get(storageGroup).getSortedRegionSlotsCounter(type);
}

/**
* Get total region number
*
* @param type SchemaRegion or DataRegion
* @return the number of SchemaRegion or DataRegion
*/
public int getTotalRegionCount(TConsensusGroupType type) {
Set<RegionGroup> regionGroups = new HashSet<>();
for (Map.Entry<String, StorageGroupPartitionTable> entry :
storageGroupPartitionTables.entrySet()) {
regionGroups.addAll(entry.getValue().getRegion(type));
}
return regionGroups.size();
}

/**
* update region-related metric
*
* @param type SchemaRegion or DataRegion
* @return the number of SchemaRegion or DataRegion
*/
private int updateRegionMetric(TConsensusGroupType type) {
Set<RegionGroup> regionGroups = new HashSet<>();
for (Map.Entry<String, StorageGroupPartitionTable> entry :
storageGroupPartitionTables.entrySet()) {
regionGroups.addAll(entry.getValue().getRegion(type));
}
int result = regionGroups.size();
// datanode location -> region number
Map<TDataNodeLocation, Integer> dataNodeLocationIntegerMap = new HashMap<>();
for (RegionGroup regionGroup : regionGroups) {
TRegionReplicaSet regionReplicaSet = regionGroup.getReplicaSet();
List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations();
for (TDataNodeLocation dataNodeLocation : dataNodeLocations) {
if (!dataNodeLocationIntegerMap.containsKey(dataNodeLocation)) {
dataNodeLocationIntegerMap.put(dataNodeLocation, 0);
}
dataNodeLocationIntegerMap.put(
dataNodeLocation, dataNodeLocationIntegerMap.get(dataNodeLocation) + 1);
}
}
for (Map.Entry<TDataNodeLocation, Integer> entry : dataNodeLocationIntegerMap.entrySet()) {
TDataNodeLocation dataNodeLocation = entry.getKey();
String name =
"EndPoint("
+ dataNodeLocation.getExternalEndPoint().ip
+ ":"
+ dataNodeLocation.getExternalEndPoint().port
+ ")";
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.REGION.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
type.toString())
.set(dataNodeLocationIntegerMap.get(dataNodeLocation));
}
return result;
}

public boolean processTakeSnapshot(File snapshotDir) throws TException, IOException {

File snapshotFile = new File(snapshotDir, snapshotFileName);
@@ -562,7 +670,8 @@ public void processLoadSnapshot(File snapshotDir) throws TException, IOException
int length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
StorageGroupPartitionTable storageGroupPartitionTable = new StorageGroupPartitionTable();
StorageGroupPartitionTable storageGroupPartitionTable =
new StorageGroupPartitionTable(storageGroup);
storageGroupPartitionTable.deserialize(fileInputStream, protocol);
storageGroupPartitionTables.put(storageGroup, storageGroupPartitionTable);
}
@@ -25,6 +25,11 @@
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.SchemaPartitionTable;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

@@ -36,16 +41,21 @@
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class StorageGroupPartitionTable {
private volatile boolean isPredeleted = false;
// The name of storage group
private String storageGroupName;

// Total number of SeriesPartitionSlots occupied by schema,
// determines whether a new Region needs to be created
private final AtomicInteger seriesPartitionSlotsCount;
@@ -61,7 +71,8 @@ public class StorageGroupPartitionTable {
// DataPartition
private final DataPartitionTable dataPartitionTable;

public StorageGroupPartitionTable() {
public StorageGroupPartitionTable(String storageGroupName) {
this.storageGroupName = storageGroupName;
this.seriesPartitionSlotsCount = new AtomicInteger(0);

this.schemaRegionParticle = new AtomicBoolean(true);
@@ -70,6 +81,58 @@ public StorageGroupPartitionTable() {

this.schemaPartitionTable = new SchemaPartitionTable();
this.dataPartitionTable = new DataPartitionTable();

addMetrics();
}

private void addMetrics() {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.REGION.toString(),
MetricLevel.NORMAL,
this,
o -> o.getRegionCount(TConsensusGroupType.SchemaRegion),
Tag.NAME.toString(),
storageGroupName,
Tag.TYPE.toString(),
TConsensusGroupType.SchemaRegion.toString());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.REGION.toString(),
MetricLevel.NORMAL,
this,
o -> o.getRegionCount(TConsensusGroupType.DataRegion),
Tag.NAME.toString(),
storageGroupName,
Tag.TYPE.toString(),
TConsensusGroupType.DataRegion.toString());
// TODO slot will be updated in the future
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.SLOT.toString(),
MetricLevel.NORMAL,
schemaPartitionTable,
o -> o.getSchemaPartitionMap().size(),
Tag.NAME.toString(),
storageGroupName,
Tag.TYPE.toString(),
"schemaSlotNumber");
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.SLOT.toString(),
MetricLevel.NORMAL,
dataPartitionTable,
o -> o.getDataPartitionMap().size(),
Tag.NAME.toString(),
storageGroupName,
Tag.TYPE.toString(),
"dataSlotNumber");
}
}

public boolean isPredeleted() {
@@ -101,6 +164,24 @@ public List<TRegionReplicaSet> getAllReplicaSets() {
return result;
}

/**
* Get regions currently owned by this StorageGroup
*
* @param type SchemaRegion or DataRegion
* @return The regions currently owned by this StorageGroup
*/
public Set<RegionGroup> getRegion(TConsensusGroupType type) {
Set<RegionGroup> regionGroups = new HashSet<>();
regionInfoMap
.values()
.forEach(
regionGroup -> {
if (regionGroup.getId().getType().equals(type)) {
regionGroups.add(regionGroup);
}
});
return regionGroups;
}
/**
* Get the number of Regions currently owned by this StorageGroup
*
@@ -283,6 +364,7 @@ public List<Pair<Long, TConsensusGroupId>> getSortedRegionSlotsCounter(TConsensu
public void serialize(OutputStream outputStream, TProtocol protocol)
throws IOException, TException {
ReadWriteIOUtils.write(isPredeleted, outputStream);
ReadWriteIOUtils.write(storageGroupName, outputStream);
ReadWriteIOUtils.write(seriesPartitionSlotsCount.get(), outputStream);

ReadWriteIOUtils.write(regionInfoMap.size(), outputStream);
@@ -298,6 +380,7 @@ public void serialize(OutputStream outputStream, TProtocol protocol)
public void deserialize(InputStream inputStream, TProtocol protocol)
throws IOException, TException {
isPredeleted = ReadWriteIOUtils.readBool(inputStream);
storageGroupName = ReadWriteIOUtils.readString(inputStream);
seriesPartitionSlotsCount.set(ReadWriteIOUtils.readInt(inputStream));

int length = ReadWriteIOUtils.readInt(inputStream);
@@ -82,6 +82,7 @@ private void setUp() throws StartupException, IOException {
JMXService.registerMBean(this, mbeanName);

registerManager.register(MetricsService.getInstance());
configManager.addMetrics();
registerUdfServices();

configNodeRPCService.initSyncedServiceImpl(configNodeRPCServiceProcessor);

0 comments on commit 9562161

Please sign in to comment.