Skip to content

Commit

Permalink
init (#888)
Browse files Browse the repository at this point in the history
  • Loading branch information
SilverNarcissus committed Mar 11, 2020
1 parent 16d1570 commit e760b7c
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 3 deletions.
68 changes: 68 additions & 0 deletions example/session/src/main/java/org/apache/iotdb/SessionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.iotdb;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
Expand All @@ -29,6 +33,7 @@
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -77,6 +82,17 @@ private static void insert() throws IoTDBConnectionException, StatementExecution
}
}

private static void insertInObject() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
for (long time = 0; time < 100; time++) {
session.insert(deviceId, time, measurements, 1L, 1L, 1L);
}
}

private static void insertInBatch() throws IoTDBConnectionException, BatchExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
Expand Down Expand Up @@ -155,6 +171,58 @@ private static void insertRowBatch() throws IoTDBConnectionException, BatchExecu
}
}

private static void insertMultipleDeviceRowBatch()
throws IoTDBConnectionException, BatchExecutionException {
// The schema of sensors of one device
Schema schema1 = new Schema();
schema1.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema1.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema1.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch1 = schema1.createRowBatch("root.sg1.d1", 100);

Schema schema2 = new Schema();
schema2.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema2.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema2.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch2 = schema1.createRowBatch("root.sg1.d2", 100);

Map<String, RowBatch> rowBatchMap = new HashMap<>();
rowBatchMap.put("root.sg1.d1", rowBatch1);
rowBatchMap.put("root.sg1.d2", rowBatch2);

long[] timestamps1 = rowBatch1.timestamps;
Object[] values1 = rowBatch1.values;
long[] timestamps2 = rowBatch2.timestamps;
Object[] values2 = rowBatch2.values;

for (long time = 0; time < 100; time++) {
int row1 = rowBatch1.batchSize++;
int row2 = rowBatch2.batchSize++;
timestamps1[row1] = time;
timestamps2[row2] = time;
for (int i = 0; i < 3; i++) {
long[] sensor1 = (long[]) values1[i];
sensor1[row1] = i;
long[] sensor2 = (long[]) values2[i];
sensor2[row2] = i;
}
if (rowBatch1.batchSize == rowBatch1.getMaxBatchSize()) {
session.insertMultipleDeviceBatch(rowBatchMap);

rowBatch1.reset();
rowBatch2.reset();
}
}

if (rowBatch1.batchSize != 0) {
session.insertMultipleDeviceBatch(rowBatchMap);
rowBatch1.reset();
rowBatch2.reset();
}
}

private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
String path = "root.sg1.d1.s1";
long deleteTime = 99;
Expand Down
54 changes: 52 additions & 2 deletions session/src/main/java/org/apache/iotdb/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
Expand Down Expand Up @@ -233,6 +235,33 @@ public void insertSortedBatch(RowBatch rowBatch)
insertSortedBatchIntern(rowBatch);
}

/**
* use batch interface to insert data in multiple device
*
* @param rowBatchMap data batch in multiple device
*/
public void insertMultipleDeviceBatch
(Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
sortRowBatch(dataInOneDevice.getValue());
insertBatch(dataInOneDevice.getValue());
}
}

/**
* use batch interface to insert sorted data in multiple device
* times in row batch must be sorted before!
*
* @param rowBatchMap data batch in multiple device
*/
public void insertMultipleDeviceSortedBatch
(Map<String, RowBatch> rowBatchMap) throws IoTDBConnectionException, BatchExecutionException {
for(Map.Entry<String, RowBatch> dataInOneDevice : rowBatchMap.entrySet()){
checkSorted(dataInOneDevice.getValue());
insertSortedBatchIntern(dataInOneDevice.getValue());
}
}

/**
* use batch interface to insert data
*
Expand Down Expand Up @@ -359,7 +388,24 @@ public void insertInBatch(List<String> deviceIds, List<Long> times,
* @see Session#insertInBatch(List, List, List, List)
* @see Session#insertBatch(RowBatch)
*/
public void insert(String deviceId, long time, List<String> measurements,
public TSStatus insert(String deviceId, long time, List<String> measurements,
Object... values) throws IoTDBConnectionException, StatementExecutionException {
List<String> stringValues = new ArrayList<>();
for (Object o : values) {
stringValues.add(o.toString());
}

return insert(deviceId, time, measurements, stringValues);
}

/**
* insert data in one row, if you want improve your performance, please use insertInBatch method
* or insertBatch method
*
* @see Session#insertInBatch(List, List, List, List)
* @see Session#insertBatch(RowBatch)
*/
public TSStatus insert(String deviceId, long time, List<String> measurements,
List<String> values) throws IoTDBConnectionException, StatementExecutionException {
TSInsertReq request = new TSInsertReq();
request.setSessionId(sessionId);
Expand All @@ -368,11 +414,15 @@ public void insert(String deviceId, long time, List<String> measurements,
request.setMeasurements(measurements);
request.setValues(values);

TSStatus result;
try {
RpcUtils.verifySuccess(client.insert(request));
result = client.insert(request);
RpcUtils.verifySuccess(result);
} catch (TException e) {
throw new IoTDBConnectionException(e);
}

return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ public void tearDown() throws Exception {
}

@Test
public void testInsertByObject()
throws IoTDBConnectionException, SQLException, ClassNotFoundException, StatementExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();

session.setStorageGroup("root.sg1");

createTimeseries();
insertInObject();

// sql test
insert_via_sql();
query3();

session.close();
}


public void testAlignByDevice() throws IoTDBConnectionException,
StatementExecutionException, BatchExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
Expand Down Expand Up @@ -350,6 +368,17 @@ private void insertInBatch() throws IoTDBConnectionException, BatchExecutionExce
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
}

private void insertInObject() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
for (long time = 0; time < 100; time++) {
session.insert(deviceId, time, measurements, 1L, 2L, 3L);
}
}

private void insert() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
Expand Down Expand Up @@ -565,7 +594,7 @@ private void query3() throws IoTDBConnectionException, StatementExecutionExcepti
long index = 1;
count++;
for (Field f : sessionDataSet.next().getFields()) {
Assert.assertEquals(f.getLongV(), index);
Assert.assertEquals(index, f.getLongV());
index++;
}
}
Expand Down

0 comments on commit e760b7c

Please sign in to comment.