Skip to content
Permalink
Browse files
[IOTDB-1901] Compatibility of Apache IoTDB with InfluxDB - Server-sid…
…e InfluxDB Metadata Manager (#4460)
  • Loading branch information
jun0315 committed Dec 7, 2021
1 parent 45a6658 commit a583ead3c6200b0353658925853fa6bb8f1ec41d
Show file tree
Hide file tree
Showing 52 changed files with 1,688 additions and 702 deletions.
@@ -90,4 +90,4 @@ fragment StringFieldEscapeSequence
;
fragment INT
: '0' | [1-9] [0-9]*
;
;
@@ -24,10 +24,10 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.service.RPCServiceThriftHandler;
import org.apache.iotdb.db.service.ServiceType;
import org.apache.iotdb.db.service.thrift.ThriftService;
import org.apache.iotdb.db.service.thrift.ThriftServiceThread;
import org.apache.iotdb.db.service.thrift.handler.RPCServiceThriftHandler;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;

public class ClusterRPCService extends ThriftService implements ClusterRPCServiceMBean {
@@ -30,7 +30,7 @@
import org.apache.iotdb.db.qp.physical.sys.FlushPlan;
import org.apache.iotdb.db.qp.physical.sys.SetSystemModePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.db.service.thrift.impl.TSServiceImpl;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
@@ -63,6 +63,7 @@ By default, the ports that IoTDB uses are:

* 6667: RPC port
* 31999: JMX port
* 8086: InfluxDB Protocol port
* 8181: Monitor port
* 5555: Data sync port
* 9003: internal metadata rpc port (for cluster)
@@ -37,6 +37,7 @@ RUN dos2unix /iotdb/sbin/../conf/iotdb-env.sh
EXPOSE 6667
EXPOSE 31999
EXPOSE 5555
EXPOSE 8086
EXPOSE 8181
VOLUME /iotdb/data
VOLUME /iotdb/logs
@@ -36,17 +36,31 @@
<influxdb.ut.skip>${influxdb.test.skip}</influxdb.ut.skip>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<docker.test.skip>false</docker.test.skip>
<docker.build.executable>docker</docker.build.executable>
<docker.build.single.argument>build -t apache/iotdb:maven-development -f ${basedir}/../docker/src/main/Dockerfile-single ${basedir}/../.</docker.build.single.argument>
<docker.clean.single.argument>image rm apache/iotdb:maven-development</docker.clean.single.argument>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-session</artifactId>
<version>0.13.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-thrift</artifactId>
<version>0.13.0-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>iotdb-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>influxdb-thrift</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
@@ -65,6 +79,38 @@
</dependencies>
<build>
<plugins>
<!-- before integration test, we build the docker image -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>build-docker-image</id>
<phase>pre-integration-test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<skip>${docker.test.skip}</skip>
<executable>${docker.build.executable}</executable>
<commandlineArgs>${docker.build.single.argument}</commandlineArgs>
</configuration>
</execution>
<execution>
<id>clean-docker-image</id>
<phase>post-integration-test</phase>
<goals>
<goal>exec</goal>
</goals>
<configuration>
<skip>${docker.test.skip}</skip>
<executable>${docker.build.executable}</executable>
<commandlineArgs>${docker.clean.single.argument}</commandlineArgs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
@@ -93,41 +139,6 @@
<skipITs>${influxdb.it.skip}</skipITs>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>${antlr4.version}</version>
<executions>
<execution>
<configuration>
<listener>false</listener>
<visitor>true</visitor>
</configuration>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.build.directory}/generated-sources/antlr4</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
@@ -20,17 +20,13 @@
package org.apache.iotdb.influxdb;

import org.apache.iotdb.influxdb.protocol.constant.InfluxDBConstant;
import org.apache.iotdb.influxdb.protocol.dto.SessionPoint;
import org.apache.iotdb.influxdb.protocol.impl.IoTDBInfluxDBService;
import org.apache.iotdb.influxdb.protocol.input.InfluxLineParser;
import org.apache.iotdb.influxdb.protocol.util.ParameterUtils;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionDataSet;

import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBException;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
@@ -48,8 +44,6 @@

public class IoTDBInfluxDB implements InfluxDB {

private final Session session;

private final IoTDBInfluxDBService influxDBService;

public IoTDBInfluxDB(String url, String userName, String password) {
@@ -59,35 +53,25 @@ public IoTDBInfluxDB(String url, String userName, String password) {
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Unable to parse url: " + url, e);
}
session = new Session(uri.getHost(), uri.getPort(), userName, password);
openSession();
influxDBService = new IoTDBInfluxDBService(session);
influxDBService = new IoTDBInfluxDBService(uri.getHost(), uri.getPort(), userName, password);
}

public IoTDBInfluxDB(String host, int rpcPort, String userName, String password) {
session = new Session(host, rpcPort, userName, password);
openSession();
influxDBService = new IoTDBInfluxDBService(session);
}

public IoTDBInfluxDB(Session session) {
this.session = session;
openSession();
influxDBService = new IoTDBInfluxDBService(session);
influxDBService = new IoTDBInfluxDBService(host, rpcPort, userName, password);
}

public IoTDBInfluxDB(Session.Builder builder) {
session = builder.build();
openSession();
influxDBService = new IoTDBInfluxDBService(session);
this(builder.build());
}

private void openSession() {
try {
session.open(false);
} catch (IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
public IoTDBInfluxDB(Session session) {
SessionPoint sessionPoint = new SessionPoint(session);
influxDBService =
new IoTDBInfluxDBService(
sessionPoint.getHost(),
sessionPoint.getRpcPort(),
sessionPoint.getUsername(),
sessionPoint.getPassword());
}

@Override
@@ -115,7 +99,7 @@ public void write(final BatchPoints batchPoints) {
batchPoints.getRetentionPolicy(),
TimeUtil.toTimePrecision(batchPoints.getPrecision()),
batchPoints.getConsistency().value(),
batchPoints);
batchPoints.lineProtocol());
}

@Override
@@ -149,14 +133,12 @@ public void write(
final ConsistencyLevel consistency,
final TimeUnit precision,
final String records) {
BatchPoints batchPoints =
BatchPoints.database(database)
.retentionPolicy(retentionPolicy)
.consistency(consistency)
.precision(precision)
.points(InfluxLineParser.parserRecordsToPoints(records, precision))
.build();
write(batchPoints);
influxDBService.writePoints(
database,
retentionPolicy,
consistency.value(),
precision == null ? "" : TimeUtil.toTimePrecision(precision),
records);
}

@Override
@@ -270,21 +252,12 @@ public boolean databaseExists(final String name) {

@Override
public void flush() {
try {
session.executeNonQueryStatement("flush");
} catch (IoTDBConnectionException | StatementExecutionException e) {
throw new InfluxDBException(e);
}
throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
}

@Override
public void close() {
try {
influxDBService.close();
session.close();
} catch (IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
influxDBService.close();
}

@Override
@@ -411,25 +384,11 @@ public boolean isBatchEnabled() {

@Override
public Pong ping() {
final long started = System.currentTimeMillis();
Pong pong = new Pong();
pong.setVersion(version());
pong.setResponseTime(System.currentTimeMillis() - started);
return pong;
throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
}

@Override
public String version() {
try {
SessionDataSet sessionDataSet = session.executeQueryStatement("show version");
String version = null;
while (sessionDataSet.hasNext()) {
version = sessionDataSet.next().getFields().get(0).getStringValue();
}
sessionDataSet.closeOperationHandle();
return version;
} catch (StatementExecutionException | IoTDBConnectionException e) {
throw new InfluxDBException(e.getMessage());
}
throw new UnsupportedOperationException(InfluxDBConstant.METHOD_NOT_SUPPORTED);
}
}
@@ -28,7 +28,7 @@
public enum IoTDBInfluxDBFactory {
INSTANCE;

private IoTDBInfluxDBFactory() {}
IoTDBInfluxDBFactory() {}

public static InfluxDB connect(String url, String username, String password) {
ParameterUtils.checkNonEmptyString(url, "url");
@@ -23,8 +23,6 @@

import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

import java.util.HashMap;
import java.util.Map;
@@ -34,12 +32,12 @@ public class InfluxDBExample {

private static InfluxDB influxDB;

public static void main(String[] args) throws Exception {
influxDB = IoTDBInfluxDBFactory.connect("http://127.0.0.1:6667", "root", "root");
public static void main(String[] args) {
influxDB = IoTDBInfluxDBFactory.connect("http://127.0.0.1:8086", "root", "root");
influxDB.createDatabase("database");
influxDB.setDatabase("database");
insertData();
queryData();
influxDB.close();
}

private static void insertData() {
@@ -71,32 +69,4 @@ private static void insertData() {
point = builder.build();
influxDB.write(point);
}

private static void queryData() {
Query query;
QueryResult result;

// the selector query is parallel to the field value
query =
new Query(
"select * from student where (name=\"xie\" and sex=\"m\")or time<now()-7d", "database");
result = influxDB.query(query);
System.out.println("query1 result:" + result.getResults().get(0).getSeries().get(0).toString());

// use iotdb built-in func
query =
new Query(
"select max(score),min(score),sum(score),count(score),spread(score),mean(score),first(score),last(score) from student ",
"database");
result = influxDB.query(query);
System.out.println("query2 result:" + result.getResults().get(0).getSeries().get(0).toString());

// aggregate query and selector query are parallel
query =
new Query(
"select count(score),first(score),last(country),max(score),mean(score),median(score),min(score),mode(score),spread(score),stddev(score),sum(score) from student where (name=\"xie\" and sex=\"m\")or score<99",
"database");
result = influxDB.query(query);
System.out.println("query3 result:" + result.getResults().get(0).getSeries().get(0).toString());
}
}
@@ -22,6 +22,4 @@
public class InfluxDBConstant {

public static final String METHOD_NOT_SUPPORTED = "Method not supported.";

public static final String PLACE_HOLDER = "PH";
}

0 comments on commit a583ead

Please sign in to comment.