Skip to content

Commit

Permalink
[IOTDB-615] Use binary rather than string in insert plan (#1229)
Browse files Browse the repository at this point in the history
* add insert object interface in session
  • Loading branch information
SilverNarcissus committed May 28, 2020
1 parent 4466180 commit 4e7be1c
Show file tree
Hide file tree
Showing 38 changed files with 1,481 additions and 673 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void run(SourceContext context) throws Exception {
tuple.put("device", "root.sg.d1");
tuple.put("timestamp", String.valueOf(System.currentTimeMillis()));
tuple.put("measurements", "s1");
tuple.put("types", "DOUBLE");
tuple.put("values", String.valueOf(random.nextDouble()));

context.collect(tuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,25 @@ public class Constant {
{"root.test.d1.s0", "INT32", "PLAIN", "SNAPPY"},
};
public static final String[] ALL_DATA = {
"root.vehicle.d0,10,s0,100",
"root.vehicle.d0,12,s0:s1,101:'employeeId102'",
"root.vehicle.d0,19,s1,'employeeId103'",
"root.vehicle.d1,11,s2,104.0",
"root.vehicle.d1,15,s2:s3,105.0:true",
"root.vehicle.d1,17,s3,false",
"root.vehicle.d0,20,s0,1000",
"root.vehicle.d0,22,s0:s1,1001:'employeeId1002'",
"root.vehicle.d0,29,s1,'employeeId1003'",
"root.vehicle.d1,21,s2,1004.0",
"root.vehicle.d1,25,s2:s3,1005.0:true",
"root.vehicle.d1,27,s3,true",
"root.test.d0,10,s0,106",
"root.test.d0,14,s0:s1,107:'employeeId108'",
"root.test.d0,16,s1,'employeeId109'",
"root.test.d1,1,s0,110",
"root.test.d0,30,s0,1006",
"root.test.d0,34,s0:s1,1007:'employeeId1008'",
"root.test.d0,36,s1,'employeeId1090'",
"root.test.d1,10,s0,1100",
"root.vehicle.d0,10,s0,INT32,100",
"root.vehicle.d0,12,s0:s1,INT32:TEXT,101:'employeeId102'",
"root.vehicle.d0,19,s1,TEXT,'employeeId103'",
"root.vehicle.d1,11,s2,FLOAT,104.0",
"root.vehicle.d1,15,s2:s3,FLOAT:BOOLEAN,105.0:true",
"root.vehicle.d1,17,s3,BOOLEAN,false",
"root.vehicle.d0,20,s0,INT32,1000",
"root.vehicle.d0,22,s0:s1,INT32:TEXT,1001:'employeeId1002'",
"root.vehicle.d0,29,s1,TEXT,'employeeId1003'",
"root.vehicle.d1,21,s2,FLOAT,1004.0",
"root.vehicle.d1,25,s2:s3,FLOAT:BOOLEAN,1005.0:true",
"root.vehicle.d1,27,s3,BOOLEAN,true",
"root.test.d0,10,s0,INT32,106",
"root.test.d0,14,s0:s1,INT32:TEXT,107:'employeeId108'",
"root.test.d0,16,s1,TEXT,'employeeId109'",
"root.test.d1,1,s0,INT32,110",
"root.test.d0,30,s0,INT32,1006",
"root.test.d0,34,s0:s1,INT32:TEXT,1007:'employeeId1008'",
"root.test.d0,36,s1,TEXT,'employeeId1090'",
"root.test.d1,10,s0,INT32,1100",
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.rocketmq;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
Expand All @@ -37,11 +38,11 @@

public class RocketMQConsumer {

private static final Logger logger = LoggerFactory.getLogger(RocketMQConsumer.class);
private static Session session;
private DefaultMQPushConsumer consumer;
private String producerGroup;
private String serverAddresses;
private static Session session;
private static final Logger logger = LoggerFactory.getLogger(RocketMQConsumer.class);

public RocketMQConsumer(String producerGroup, String serverAddresses, String connectionHost,
int connectionPort, String user, String password)
Expand All @@ -53,6 +54,22 @@ public RocketMQConsumer(String producerGroup, String serverAddresses, String con
initIoTDB(connectionHost, connectionPort, user, password);
}

public static void main(String[] args)
throws MQClientException, StatementExecutionException, IoTDBConnectionException {
/**
*Instantiate with specified consumer group name and specify name server addresses.
*/
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
Constant.IOTDB_CONNECTION_HOST,
Constant.IOTDB_CONNECTION_PORT,
Constant.IOTDB_CONNECTION_USER,
Constant.IOTDB_CONNECTION_PASSWORD);
consumer.prepareConsume();
consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
consumer.start();
}

private void initIoTDB(String host, int port, String user, String password)
throws IoTDBConnectionException, StatementExecutionException {
if (host == null) {
Expand All @@ -76,7 +93,8 @@ private void addStorageGroup(String storageGroup)
session.setStorageGroup(storageGroup);
}

private void createTimeseries(String[] sql) throws StatementExecutionException, IoTDBConnectionException {
private void createTimeseries(String[] sql)
throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
Expand All @@ -89,8 +107,37 @@ private void insert(String data) throws IoTDBConnectionException, StatementExecu
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
List<String> measurements = Arrays.asList(dataArray[2].split(":"));
List<String> values = Arrays.asList(dataArray[3].split(":"));
session.insertRecord(device, time, measurements, values);
List<TSDataType> types = new ArrayList<>();
for (String type : dataArray[3].split(":")) {
types.add(TSDataType.valueOf(type));
}

List<Object> values = new ArrayList<>();
String[] valuesStr = dataArray[4].split(":");
for (int i = 0; i < valuesStr.length; i++) {
switch (types.get(i)) {
case INT64:
values.add(Long.parseLong(valuesStr[i]));
break;
case DOUBLE:
values.add(Double.parseDouble(valuesStr[i]));
break;
case INT32:
values.add(Integer.parseInt(valuesStr[i]));
break;
case TEXT:
values.add(valuesStr[i]);
break;
case FLOAT:
values.add(Float.parseFloat(valuesStr[i]));
break;
case BOOLEAN:
values.add(Boolean.parseBoolean(valuesStr[i]));
break;
}
}

session.insertRecord(device, time, measurements, types, values);
}

public void start() throws MQClientException {
Expand All @@ -99,6 +146,7 @@ public void start() throws MQClientException {

/**
* Subscribe topic and add register Listener
*
* @throws MQClientException
*/
public void prepareConsume() throws MQClientException {
Expand All @@ -116,8 +164,9 @@ public void prepareConsume() throws MQClientException {
*/
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
logger.info(String.format("%s Receive New Messages: %s %n", Thread.currentThread().getName(),
new String(msg.getBody())));
logger
.info(String.format("%s Receive New Messages: %s %n", Thread.currentThread().getName(),
new String(msg.getBody())));
try {
insert(new String(msg.getBody()));
} catch (Exception e) {
Expand Down Expand Up @@ -147,20 +196,4 @@ public String getServerAddresses() {
public void setServerAddresses(String serverAddresses) {
this.serverAddresses = serverAddresses;
}

public static void main(String[] args)
throws MQClientException, StatementExecutionException, IoTDBConnectionException {
/**
*Instantiate with specified consumer group name and specify name server addresses.
*/
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
Constant.IOTDB_CONNECTION_HOST,
Constant.IOTDB_CONNECTION_PORT,
Constant.IOTDB_CONNECTION_USER,
Constant.IOTDB_CONNECTION_PASSWORD);
consumer.prepareConsume();
consumer.addStorageGroup(Constant.ADDITIONAL_STORAGE_GROUP);
consumer.start();
}
}
43 changes: 29 additions & 14 deletions example/session/src/main/java/org/apache/iotdb/SessionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,27 +136,37 @@ private static void createMultiTimeseries()
private static void insertRecord() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);

for (long time = 0; time < 100; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
session.insertRecord(deviceId, time, measurements, values);
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
session.insertRecord(deviceId, time, measurements, types, values);
}
}

private static void insertRecordInObject()
throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);

for (long time = 0; time < 100; time++) {
session.insertRecord(deviceId, time, measurements, 1L, 1L, 1L);
session.insertRecord(deviceId, time, measurements, types, 1L, 1L, 1L);
}
}

Expand All @@ -168,31 +178,36 @@ private static void insertRecords() throws IoTDBConnectionException, BatchExecut
measurements.add("s3");
List<String> deviceIds = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
List<List<Object>> valuesList = new ArrayList<>();
List<Long> timestamps = new ArrayList<>();
List<List<TSDataType>> typesList = new ArrayList<>();

for (long time = 0; time < 500; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
List<Object> values = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);

deviceIds.add(deviceId);
measurementsList.add(measurements);
valuesList.add(values);
typesList.add(types);
timestamps.add(time);
if (time != 0 && time % 100 == 0) {
session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
deviceIds.clear();
measurementsList.clear();
valuesList.clear();
timestamps.clear();
}
}

session.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList);
}

/**
* insert the data of a device. For each timestamp, the number of measurements is the same.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.iotdb;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -28,6 +27,7 @@
import org.apache.iotdb.session.SessionDataSet.DataIterator;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;

public class SessionPoolExample {

Expand All @@ -51,15 +51,20 @@ public static void main(String[] args)
private static void insertRecord() throws StatementExecutionException, IoTDBConnectionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
List<TSDataType> types = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);
types.add(TSDataType.INT64);

for (long time = 0; time < 10; time++) {
List<String> values = new ArrayList<>();
values.add("1");
values.add("2");
values.add("3");
pool.insertRecord(deviceId, time, measurements, values);
List<Object> values = new ArrayList<>();
values.add(1L);
values.add(2L);
values.add(3L);
pool.insertRecord(deviceId, time, measurements, types, values);
}
}

Expand Down
Loading

0 comments on commit 4e7be1c

Please sign in to comment.