Skip to content
Permalink
Browse files
[IOTDB-3316] Add 'SHOW CLUSTER' implementation for IoTDB cluster mode (
  • Loading branch information
Beyyes committed May 31, 2022
1 parent ecbea66 commit b940a652ddb03868428ad8774b18cf08b31d7c73
Showing 14 changed files with 300 additions and 1 deletion.
@@ -43,7 +43,7 @@ ddlStatement
| dropFunction | dropTrigger | dropContinuousQuery | dropSchemaTemplate
| setTTL | unsetTTL | startTrigger | stopTrigger | setSchemaTemplate | unsetSchemaTemplate
| showStorageGroup | showDevices | showTimeseries | showChildPaths | showChildNodes
| showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL
| showFunctions | showTriggers | showContinuousQueries | showTTL | showAllTTL | showCluster
| showSchemaTemplates | showNodesInSchemaTemplate
| showPathsUsingSchemaTemplate | showPathsSetSchemaTemplate
| countStorageGroup | countDevices | countTimeseries | countNodes
@@ -303,6 +303,11 @@ showAllTTL
: SHOW ALL TTL
;

// Show Cluster
showCluster
: SHOW CLUSTER
;

// Show Schema Template
showSchemaTemplates
: SHOW SCHEMA? TEMPLATES
@@ -117,6 +117,10 @@ CLEAR
: C L E A R
;

CLUSTER
: C L U S T E R
;

CONCAT
: C O N C A T
;
@@ -19,6 +19,8 @@
package org.apache.iotdb.confignode.service.thrift;

import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -53,6 +55,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
@@ -80,6 +83,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -90,6 +94,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/** ConfigNodeRPCServer exposes the interface that interacts with the DataNode */
public class ConfigNodeRPCServiceProcessor implements ConfigIService.Iface {
@@ -137,6 +142,21 @@ public TDataNodeInfoResp getDataNodeInfo(int dataNodeID) throws TException {
return resp;
}

@Override
public TClusterNodeInfos getAllClusterNodeInfos() throws TException {
List<TConfigNodeLocation> configNodeLocations =
configManager.getNodeManager().getOnlineConfigNodes();
List<TDataNodeLocation> dataNodeInfoLocations =
configManager.getNodeManager().getOnlineDataNodes(-1).stream()
.map(TDataNodeInfo::getLocation)
.collect(Collectors.toList());

return new TClusterNodeInfos(
new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
configNodeLocations,
dataNodeInfoLocations);
}

@Override
public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
TStorageGroupSchema storageGroupSchema = req.getStorageGroup();
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.confignode.service.thrift;

import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -45,6 +46,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
@@ -221,6 +223,32 @@ public void testRegisterAndQueryDataNode() throws TException {
Assert.assertEquals(dataNodeLocation, infoMap.get(1).getLocation());
}

@Test
public void getAllClusterNodeInfosTest() throws TException {
registerDataNodes();

TClusterNodeInfos clusterNodes = processor.getAllClusterNodeInfos();

List<TConfigNodeLocation> configNodeInfos = clusterNodes.getConfigNodeList();
Assert.assertEquals(1, configNodeInfos.size());
TConfigNodeLocation configNodeLocation =
new TConfigNodeLocation(new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0", 22278));
Assert.assertEquals(configNodeLocation, configNodeInfos.get(0));

List<TDataNodeLocation> dataNodeInfos = clusterNodes.getDataNodeList();
Assert.assertEquals(3, dataNodeInfos.size());
TDataNodeLocation dataNodeLocation = new TDataNodeLocation();
for (int i = 0; i < 3; i++) {
dataNodeLocation.setDataNodeId(i);
dataNodeLocation.setExternalEndPoint(new TEndPoint("0.0.0.0", 6667 + i));
dataNodeLocation.setInternalEndPoint(new TEndPoint("0.0.0.0", 9003 + i));
dataNodeLocation.setDataBlockManagerEndPoint(new TEndPoint("0.0.0.0", 8777 + i));
dataNodeLocation.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 40010 + i));
dataNodeLocation.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 50010 + i));
Assert.assertEquals(dataNodeLocation, dataNodeInfos.get(i));
}
}

@Test
public void testSetAndQueryStorageGroup() throws TException {
TSStatus status;
@@ -232,6 +232,12 @@ private IoTDBConstant() {}
public static final String WAL_VERSION_ID = "versionId";
public static final String WAL_START_SEARCH_INDEX = "startSearchIndex";

// show cluster status
public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
public static final String NODE_TYPE_DATA_NODE = "DataNode";
public static final String NODE_STATUS_RUNNING = "Running";
public static final String NODE_STATUS_Down = "Down";

// client version number
public enum ClientVersion {
V_0_12,
@@ -33,6 +33,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerReq;
import org.apache.iotdb.confignode.rpc.thrift.TAuthorizerResp;
import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
@@ -265,6 +266,22 @@ public TDataNodeInfoResp getDataNodeInfo(int dataNodeId) throws TException {
throw new TException(MSG_RECONNECTION_FAIL);
}

@Override
public TClusterNodeInfos getAllClusterNodeInfos() throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
try {
TClusterNodeInfos resp = client.getAllClusterNodeInfos();
if (!updateConfigNodeLeader(resp.status)) {
return resp;
}
} catch (TException e) {
configLeader = null;
}
reconnect();
}
throw new TException(MSG_RECONNECTION_FAIL);
}

@Override
public TSStatus setStorageGroup(TSetStorageGroupReq req) throws TException {
for (int i = 0; i < RETRY_NUM; i++) {
@@ -57,6 +57,13 @@ public class HeaderConstant {
public static final String COLUMN_COUNT_TIMESERIES = "count(timeseries)";
public static final String COLUMN_COUNT_STORAGE_GROUP = "count(storage group)";

// column names for show cluster statement
public static final String COLUMN_NODE_ID = "NodeID";
public static final String COLUMN_NODE_TYPE = "NodeType";
public static final String COLUMN_STATUS = "Status";
public static final String COLUMN_HOST_ADDRESS = "HostAdress";
public static final String COLUMN_PORT = "Port";

// dataset header for schema statement
public static final DatasetHeader showTimeSeriesHeader;
public static final DatasetHeader showDevicesHeader;
@@ -73,6 +80,9 @@ public class HeaderConstant {
public static final DatasetHeader countTimeSeriesHeader;
public static final DatasetHeader countLevelTimeSeriesHeader;

// dataset header for show cluster statement
public static final DatasetHeader showClusterHeader;

// dataset header for last query
public static final DatasetHeader LAST_QUERY_HEADER;

@@ -160,4 +170,16 @@ public class HeaderConstant {
new ColumnHeader(COLUMN_TIMESERIES_DATATYPE, TSDataType.TEXT)),
false);
}

static {
showClusterHeader =
new DatasetHeader(
Arrays.asList(
new ColumnHeader(COLUMN_NODE_ID, TSDataType.INT32),
new ColumnHeader(COLUMN_NODE_TYPE, TSDataType.TEXT),
new ColumnHeader(COLUMN_STATUS, TSDataType.TEXT),
new ColumnHeader(COLUMN_HOST_ADDRESS, TSDataType.TEXT),
new ColumnHeader(COLUMN_PORT, TSDataType.INT32)),
true);
}
}
@@ -72,6 +72,7 @@
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
@@ -1209,6 +1210,15 @@ public Analysis visitShowDevices(
return analysis;
}

@Override
public Analysis visitShowCluster(
ShowClusterStatement showClusterStatement, MPPQueryContext context) {
Analysis analysis = new Analysis();
analysis.setStatement(showClusterStatement);
analysis.setRespDatasetHeader(HeaderConstant.showClusterHeader);
return analysis;
}

@Override
public Analysis visitCountStorageGroup(
CountStorageGroupStatement countStorageGroupStatement, MPPQueryContext context) {
@@ -28,6 +28,7 @@
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.UnSetTTLStatement;
@@ -86,6 +87,12 @@ public IConfigTask visitShowTTL(ShowTTLStatement showTTLStatement, TaskContext c
return new ShowTTLTask(showTTLStatement);
}

@Override
public IConfigTask visitShowCluster(
ShowClusterStatement showClusterStatement, TaskContext context) {
return new ShowClusterTask(showClusterStatement);
}

@Override
public IConfigTask visitAuthor(AuthorStatement statement, TaskContext context) {
return new AuthorizerConfigTask(statement);
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.mpp.plan.execution.config;

import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.consensus.PartitionRegionId;
import org.apache.iotdb.confignode.rpc.thrift.TClusterNodeInfos;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_STATUS_RUNNING;
import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_CONFIG_NODE;
import static org.apache.iotdb.commons.conf.IoTDBConstant.NODE_TYPE_DATA_NODE;

public class ShowClusterTask implements IConfigTask {

private static final Logger LOGGER = LoggerFactory.getLogger(ShowClusterTask.class);

private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

public ShowClusterTask(ShowClusterStatement showClusterStatement) {}

@Override
public ListenableFuture<ConfigTaskResult> execute(
IClientManager<PartitionRegionId, ConfigNodeClient> clientManager)
throws InterruptedException {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
TClusterNodeInfos clusterNodeInfos = new TClusterNodeInfos();

if (config.isClusterMode()) {
try (ConfigNodeClient client = clientManager.borrowClient(ConfigNodeInfo.partitionRegionId)) {
clusterNodeInfos = client.getAllClusterNodeInfos();
} catch (TException | IOException e) {
LOGGER.error("Failed to connect to config node.");
future.setException(e);
}
}

// build TSBlock
TsBlockBuilder builder =
new TsBlockBuilder(HeaderConstant.showClusterHeader.getRespDataTypes());

AtomicInteger configNodeId = new AtomicInteger();
clusterNodeInfos
.getConfigNodeList()
.forEach(
e ->
buildTsBlock(
builder,
configNodeId.getAndIncrement(),
NODE_TYPE_CONFIG_NODE,
NODE_STATUS_RUNNING,
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));

clusterNodeInfos
.getDataNodeList()
.forEach(
e ->
buildTsBlock(
builder,
e.getDataNodeId(),
NODE_TYPE_DATA_NODE,
NODE_STATUS_RUNNING,
e.getInternalEndPoint().getIp(),
e.getInternalEndPoint().getPort()));

DatasetHeader datasetHeader = HeaderConstant.showClusterHeader;
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
return future;
}

private void buildTsBlock(
TsBlockBuilder builder,
int nodeId,
String nodeType,
String nodeStatus,
String hostAddress,
int port) {
builder.getTimeColumnBuilder().writeLong(0L);
builder.getColumnBuilder(0).writeInt(nodeId);
builder.getColumnBuilder(1).writeBinary(new Binary(nodeType));
builder.getColumnBuilder(2).writeBinary(new Binary(nodeStatus));
builder.getColumnBuilder(3).writeBinary(new Binary(hostAddress));
builder.getColumnBuilder(4).writeInt(port);
builder.declarePosition();
}
}

0 comments on commit b940a65

Please sign in to comment.