Skip to content
Permalink
Browse files
[IOTDB-1352][IOTDB-1353] Rafactor E2E test, add ClusterInfo thrift ba…
…sed API (#3143)
  • Loading branch information
jixuan1989 committed May 11, 2021
1 parent 819b195 commit 19cfcd5374a562a495c70b7b18e171a04df0fb41
Showing 35 changed files with 1,264 additions and 208 deletions.
@@ -38,6 +38,9 @@ internal_meta_port=9003
# port for data service
internal_data_port=40010

# port for cluster info API, 6567 by default
#cluster_info_public_port=6567

# whether open port for server module (for debug purpose)
# if true, the rpc_port of the single server will be changed to rpc_port (in iotdb-engines.properties) + 1
# open_server_rpc_port=false
@@ -29,12 +29,14 @@
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.MetaClusterServer;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.clusterinfo.ClusterInfoServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.TestOnly;

import org.apache.thrift.TException;
import org.apache.thrift.async.TAsyncClientManager;
@@ -106,6 +108,9 @@ public static void main(String[] args) {
preStartCustomize();
metaServer.start();
metaServer.buildCluster();
// Currently, we do not register ClusterInfoService as a JMX Bean,
// so we use startService() rather than start()
ClusterInfoServer.getInstance().startService();
} catch (TTransportException
| StartupException
| QueryProcessException
@@ -120,6 +125,9 @@ public static void main(String[] args) {
preStartCustomize();
metaServer.start();
metaServer.joinCluster();
// Currently, we do not register ClusterInfoService as a JMX Bean,
// so we use startService() rather than start()
ClusterInfoServer.getInstance().startService();
} catch (TTransportException
| StartupException
| QueryProcessException
@@ -300,4 +308,9 @@ private int extractSerialNumInSGName(String storageGroupName) {
}
});
}

@TestOnly
public static void setMetaClusterServer(MetaClusterServer metaClusterServer) {
metaServer = metaClusterServer;
}
}
@@ -38,6 +38,7 @@ public class ClusterConfig {
private int internalMetaPort = 9003;
private int internalDataPort = 40010;
private int clusterRpcPort = IoTDBDescriptor.getInstance().getConfig().getRpcPort();
private int clusterInfoRpcPort = 6567;

/** each one is a {internalIp | domain name}:{meta port} string tuple. */
private List<String> seedNodeUrls;
@@ -486,4 +487,12 @@ public long getWaitClientTimeoutMS() {
public void setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
}

public int getClusterInfoRpcPort() {
return clusterInfoRpcPort;
}

public void setClusterInfoRpcPort(int clusterInfoRpcPort) {
this.clusterInfoRpcPort = clusterInfoRpcPort;
}
}
@@ -142,9 +142,14 @@ private void loadProps() {
properties.getProperty(
"internal_data_port", Integer.toString(config.getInternalDataPort()))));

config.setClusterRpcPort(
// rpc port and rpc address are defined in iotdb-engine.properties.
// To avoid inconsistency, we do not read "rpc_port" in iotdb-cluster.properties
// even users claim the property.

config.setClusterInfoRpcPort(
Integer.parseInt(
properties.getProperty("rpc_port", Integer.toString(config.getClusterRpcPort()))));
properties.getProperty(
"cluster_info_public_port", Integer.toString(config.getClusterInfoRpcPort()))));

config.setMaxConcurrentClientNum(
Integer.parseInt(
@@ -24,20 +24,7 @@
import org.apache.iotdb.cluster.exception.StartUpCheckFailureException;
import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntriesRequest;
import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest;
import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest;
import org.apache.iotdb.cluster.rpc.thrift.ExecutNonQueryReq;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatRequest;
import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
import org.apache.iotdb.cluster.rpc.thrift.*;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncProcessor;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Processor;
import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatServer;
@@ -49,6 +36,7 @@
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.RegisterManager;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.TSStatus;

import org.apache.thrift.TException;
@@ -112,7 +100,9 @@ public void start() throws TTransportException, StartupException {
((CMManager) IoTDB.metaManager).setCoordinator(coordinator);
ioTDB.active();
member.start();
// JMX based DBA API
registerManager.register(ClusterMonitor.INSTANCE);

}

/** Also stops the IoTDB instance, the MetaGroupMember and the ClusterMonitor. */
@@ -366,4 +356,14 @@ public void handshake(Node sender) {
public void handshake(Node sender, AsyncMethodCallback<Void> resultHandler) {
asyncService.handshake(sender, resultHandler);
}

@TestOnly
public void setMetaGroupMember(MetaGroupMember metaGroupMember) {
this.member = metaGroupMember;
}

@TestOnly
public IoTDB getIoTDB() {
return ioTDB;
}
}
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.server.clusterinfo;

import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService.Processor;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.service.thrift.ThriftService;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;

public class ClusterInfoServer extends ThriftService implements ClusterInfoServerMBean {
private ClusterInfoServiceImpl serviceImpl;

public static ClusterInfoServer getInstance() {
return ClusterMonitorServiceHolder.INSTANCE;
}

@Override
public ServiceType getID() {
return ServiceType.CLUSTER_INFO_SERVICE;
}

@Override
public ThriftService getImplementation() {
return getInstance();
}

@Override
public void initTProcessor() {
serviceImpl = new ClusterInfoServiceImpl();
processor = new Processor<>(serviceImpl);
}

@Override
public void initThriftServiceThread() throws IllegalAccessException {
ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
IoTDBConfig nodeConfig = IoTDBDescriptor.getInstance().getConfig();
try {
thriftServiceThread =
new ThriftServiceThread(
processor,
getID().getName(),
ThreadName.CLUSTER_INFO_SERVICE.getName(),
nodeConfig.getRpcAddress(),
clusterConfig.getClusterInfoRpcPort(),
nodeConfig.getRpcMaxConcurrentClientNum(),
nodeConfig.getThriftServerAwaitTimeForStopService(),
new ClusterInfoServiceThriftHandler(serviceImpl),
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
thriftServiceThread.setName(ThreadName.CLUSTER_INFO_SERVICE.getName() + "Service");
}

@Override
public String getBindIP() {
return IoTDBDescriptor.getInstance().getConfig().getRpcAddress();
}

@Override
public int getBindPort() {
return ClusterDescriptor.getInstance().getConfig().getClusterInfoRpcPort();
}

private static class ClusterMonitorServiceHolder {

private static final ClusterInfoServer INSTANCE = new ClusterInfoServer();

private ClusterMonitorServiceHolder() {}
}
}
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.server.clusterinfo;

public interface ClusterInfoServerMBean {}
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.server.clusterinfo;

import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.rpc.thrift.ClusterInfoService;
import org.apache.iotdb.cluster.rpc.thrift.DataPartitionEntry;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.nodetool.ClusterMonitor;

import org.apache.commons.collections4.map.MultiKeyMap;
import org.apache.thrift.TException;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ClusterInfoServiceImpl implements ClusterInfoService.Iface {

@Override
public List<Node> getRing() throws TException {
return ClusterMonitor.INSTANCE.getRing();
}

@Override
public List<DataPartitionEntry> getDataPartition(String path, long startTime, long endTime) {
MultiKeyMap<Long, PartitionGroup> partitions =
ClusterMonitor.INSTANCE.getDataPartition(path, startTime, endTime);
List<DataPartitionEntry> result = new ArrayList<>(partitions.size());
partitions.forEach(
(multikey, nodes) ->
result.add(new DataPartitionEntry(multikey.getKey(0), multikey.getKey(1), nodes)));
return result;
}

@Override
public List<Node> getMetaPartition(String path) throws TException {
return ClusterMonitor.INSTANCE.getMetaPartition(path);
}

@Override
public Map<Node, Boolean> getAllNodeStatus() throws TException {
return ClusterMonitor.INSTANCE.getAllNodeStatus();
}

@Override
public String getInstrumentingInfo() throws TException {
return ClusterMonitor.INSTANCE.getInstrumentingInfo();
}

public void handleClientExit() {
// do something when a client connection exits.
}
}
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.cluster.server.clusterinfo;

import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TTransport;

public class ClusterInfoServiceThriftHandler implements TServerEventHandler {
private ClusterInfoServiceImpl serviceImpl;

ClusterInfoServiceThriftHandler(ClusterInfoServiceImpl serviceImpl) {
this.serviceImpl = serviceImpl;
}

@Override
public ServerContext createContext(TProtocol arg0, TProtocol arg1) {
// nothing
return null;
}

@Override
public void deleteContext(ServerContext arg0, TProtocol arg1, TProtocol arg2) {
// release query resources.
serviceImpl.handleClientExit();
}

@Override
public void preServe() {
// nothing
}

@Override
public void processContext(ServerContext arg0, TTransport arg1, TTransport arg2) {
// nothing
}
}

0 comments on commit 19cfcd5

Please sign in to comment.