Skip to content

Commit

Permalink
resolve conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Mar 11, 2020
2 parents 9014ebf + e760b7c commit b03a057
Show file tree
Hide file tree
Showing 46 changed files with 709 additions and 506 deletions.
10 changes: 10 additions & 0 deletions docker/ReadMe.md
Expand Up @@ -38,6 +38,16 @@ For example,
docker run -d -p 6667:6667 -p 31999:31999 -p 8181:8181 -p 5555:5555 apache/iotdb:0.9.1
```

## How to configure docker volumes

The instructions below show how to store the output data and logs of IoTDB to two folders called
iotdb_data and iotdb_logs respectively.

`/D/docker/iotdb_data` and `/D/docker/iotdb_logs` can be changed to any local directory of your own host.
```
docker run -it -v /D/docker/iotdb_data:/iotdb/data -v /D/docker/iotdb_logs:/iotdb/logs --name 123 apache/iotdb:0.9.0
```

# How to run IoTDB client

Suppose you have run an IoTDB Server in docker
Expand Down
Expand Up @@ -396,12 +396,19 @@ root.sg1.d0.s0 is INT32 while root.sg2.d3.s0 is FLOAT.
7. 在Select子句中重复写列名是生效的。例如, "select s0,s0,s1 from root.sg.* align by device" 不等于 "select s0,s1 from root.sg.* align by device".
8. 更多正例:
8. 在Where子句中时间过滤条件和值过滤条件均可以使用,值过滤条件可以使用叶子节点 path,或以 root 开头的整个 path,不允许存在通配符。例如,
- select * from root.sg.* where time = 1 align by device
- select * from root.sg.* where s0 < 100 align by device
- select * from root.sg.* where time < 20 AND s0 > 50 align by device
- select * from root.sg.d0 where root.sg.d0.s0 = 15 align by device
9. 更多正例:
- select * from root.vehicle align by device
- select s0,s0,s1 from root.vehicle.* align by device
- select s0,s1 from root.vehicle.* limit 10 offset 1 align by device
- select * from root.vehicle slimit 10 soffset 2 align by device
- select * from root.vehicle where time > 10 align by device
- select * from root.vehicle.* where time < 10 AND s0 > 25 align by device
- select * from root.vehicle where root.vehicle.d0.s0>0 align by device
- select count(*) from root.vehicle align by device
- select sum(*) from root.vehicle GROUP BY (20ms,0,[2,50]) align by device
Expand Down Expand Up @@ -882,4 +889,3 @@ eg. root.ln.wf01.wt01.*
eg. *.wt01.*
eg. *
```

Expand Up @@ -353,7 +353,7 @@ Rules:
Correct example: select * from root.sg1 align by device
Correct example: select * from root.sg1 ALIGN BY DEVICE
2. GroupbyDeviceClause can only be used at the end of a query statement.
2. AlignbyDeviceClause can only be used at the end of a query statement.
Correct example: select * from root.sg1 where time > 10 align by device
Wrong example: select * from root.sg1 align by device where time > 10
Expand Down Expand Up @@ -406,12 +406,19 @@ For example. "select s0,s1 from root.sg.*,root.sg.d0 align by device" is equal t
7. The duplicated measurements in the suffix paths are not neglected.
For example, "select s0,s0,s1 from root.sg.* align by device" is not equal to "select s0,s1 from root.sg.* align by device".
8. More correct examples:
8. Both time predicates and value predicates are allowed in Where Clause. The paths of the value predicates can be the leaf node or full path started with ROOT. And wildcard is not allowed here. For example:
- select * from root.sg.* where time = 1 align by device
- select * from root.sg.* where s0 < 100 align by device
- select * from root.sg.* where time < 20 AND s0 > 50 align by device
- select * from root.sg.d0 where root.sg.d0.s0 = 15 align by device
9. More correct examples:
- select * from root.vehicle align by device
- select s0,s0,s1 from root.vehicle.* align by device
- select s0,s1 from root.vehicle.* limit 10 offset 1 align by device
- select * from root.vehicle slimit 10 soffset 2 align by device
- select * from root.vehicle where time > 10 align by device
- select * from root.vehicle.* where time < 10 AND s0 > 25 align by device
- select * from root.vehicle where root.vehicle.d0.s0>0 align by device
- select count(*) from root.vehicle align by device
- select sum(*) from root.vehicle GROUP BY (20ms,0,[2,50]) align by device
Expand Down
Expand Up @@ -70,21 +70,21 @@ private void initIoTDB(String host, int port, String user, String password)
createTimeseries(sql);
}
}

private void addStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
session.setStorageGroup(storageGroup);
}

private void createTimeseries(String[] sql) throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
CompressionType compressionType = CompressionType.valueOf(sql[3]);
session.createTimeseries(timeseries, dataType, encoding, compressionType);
}
private void insert(String data) throws IoTDBConnectionException {

private void insert(String data) throws IoTDBConnectionException, StatementExecutionException {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
Expand Down Expand Up @@ -120,7 +120,7 @@ public void prepareConsume() throws MQClientException {
new String(msg.getBody())));
try {
insert(new String(msg.getBody()));
} catch (IoTDBConnectionException e) {
} catch (Exception e) {
logger.error(e.getMessage());
}
}
Expand Down Expand Up @@ -153,8 +153,8 @@ public static void main(String[] args)
/**
*Instantiate with specified consumer group name and specify name server addresses.
*/
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
Constant.IOTDB_CONNECTION_HOST,
Constant.IOTDB_CONNECTION_PORT,
Constant.IOTDB_CONNECTION_USER,
Expand Down
82 changes: 76 additions & 6 deletions example/session/src/main/java/org/apache/iotdb/SessionExample.java
Expand Up @@ -18,6 +18,11 @@
*/
package org.apache.iotdb;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
Expand All @@ -28,6 +33,7 @@
import org.apache.iotdb.tsfile.write.record.RowBatch;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.Schema;
import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -36,7 +42,7 @@ public class SessionExample {
private static Session session;

public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();

Expand All @@ -61,7 +67,7 @@ public static void main(String[] args)
session.close();
}

private static void insert() throws IoTDBConnectionException {
private static void insert() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
Expand All @@ -76,7 +82,18 @@ private static void insert() throws IoTDBConnectionException {
}
}

private static void insertInBatch() throws IoTDBConnectionException {
private static void insertInObject() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
measurements.add("s2");
measurements.add("s3");
for (long time = 0; time < 100; time++) {
session.insert(deviceId, time, measurements, 1L, 1L, 1L);
}
}

private static void insertInBatch() throws IoTDBConnectionException, BatchExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
Expand Down Expand Up @@ -123,7 +140,7 @@ private static void insertInBatch() throws IoTDBConnectionException {
* Users need to control the count of RowBatch and write a batch when it reaches the maxBatchSize
*
*/
private static void insertRowBatch() throws IoTDBConnectionException {
private static void insertRowBatch() throws IoTDBConnectionException, BatchExecutionException {
// The schema of sensors of one device
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
Expand Down Expand Up @@ -154,13 +171,66 @@ private static void insertRowBatch() throws IoTDBConnectionException {
}
}

private static void deleteData() throws IoTDBConnectionException {
private static void insertMultipleDeviceRowBatch()
throws IoTDBConnectionException, BatchExecutionException {
// The schema of sensors of one device
Schema schema1 = new Schema();
schema1.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema1.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema1.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch1 = schema1.createRowBatch("root.sg1.d1", 100);

Schema schema2 = new Schema();
schema2.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
schema2.registerMeasurement(new MeasurementSchema("s2", TSDataType.INT64, TSEncoding.RLE));
schema2.registerMeasurement(new MeasurementSchema("s3", TSDataType.INT64, TSEncoding.RLE));

RowBatch rowBatch2 = schema1.createRowBatch("root.sg1.d2", 100);

Map<String, RowBatch> rowBatchMap = new HashMap<>();
rowBatchMap.put("root.sg1.d1", rowBatch1);
rowBatchMap.put("root.sg1.d2", rowBatch2);

long[] timestamps1 = rowBatch1.timestamps;
Object[] values1 = rowBatch1.values;
long[] timestamps2 = rowBatch2.timestamps;
Object[] values2 = rowBatch2.values;

for (long time = 0; time < 100; time++) {
int row1 = rowBatch1.batchSize++;
int row2 = rowBatch2.batchSize++;
timestamps1[row1] = time;
timestamps2[row2] = time;
for (int i = 0; i < 3; i++) {
long[] sensor1 = (long[]) values1[i];
sensor1[row1] = i;
long[] sensor2 = (long[]) values2[i];
sensor2[row2] = i;
}
if (rowBatch1.batchSize == rowBatch1.getMaxBatchSize()) {
session.insertMultipleDeviceBatch(rowBatchMap);

rowBatch1.reset();
rowBatch2.reset();
}
}

if (rowBatch1.batchSize != 0) {
session.insertMultipleDeviceBatch(rowBatchMap);
rowBatch1.reset();
rowBatch2.reset();
}
}

private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
String path = "root.sg1.d1.s1";
long deleteTime = 99;
session.deleteData(path, deleteTime);
}

private static void deleteTimeseries() throws IoTDBConnectionException {
private static void deleteTimeseries()
throws IoTDBConnectionException, StatementExecutionException {
List<String> paths = new ArrayList<>();
paths.add("root.sg1.d1.s1");
paths.add("root.sg1.d1.s2");
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,9 +115,9 @@ public void invoke(IN input, Context context) throws Exception {
}

convertText(event.getDevice(), event.getMeasurements(), event.getValues());
TSStatus status = session.insert(event.getDevice(), event.getTimestamp(),
event.getMeasurements(), event.getValues());
LOG.debug("send event result: {}", status);
session.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
event.getValues());
LOG.debug("send event successfully");
}

public IoTDBSink<IN> withBatchSize(int batchSize) {
Expand Down Expand Up @@ -177,8 +176,8 @@ private void flush() throws Exception {
measurementsList.add(event.getMeasurements());
valuesList.add(event.getValues());
}
List<TSStatus> statusList = session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send events result: {}", statusList);
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send event successfully");
batchList.clear();
}
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ public IoTDBSQLException(String reason) {
}

public IoTDBSQLException(String reason, TSStatus status) {
super(reason, status.sqlState, status.statusType.code);
super(reason, status.message, status.code);
}

public IoTDBSQLException(Throwable cause) {
Expand Down
46 changes: 14 additions & 32 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;

import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -258,41 +257,24 @@ public int[] executeBatch() throws SQLException {
}
}

private int[] executeBatchSQL() throws TException, SQLException {
private int[] executeBatchSQL() throws TException, BatchUpdateException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId,
batchSQLList);
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId, batchSQLList);
TSExecuteBatchStatementResp execResp = client.executeBatchStatement(execReq);
if (execResp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS
.getStatusCode()) {
if (execResp.getResult() == null) {
return new int[0];
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
return updateArray;
int[] result = new int[execResp.statusList.size()];
boolean allSuccess = true;
String message = "";
for (int i = 0; i < result.length; i++) {
result[i] = execResp.statusList.get(i).code;
if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
allSuccess = false;
message = execResp.statusList.get(i).message;
}
} else {
BatchUpdateException exception;
if (execResp.getResult() == null) {
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
new int[0]);
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
updateArray);
}
throw exception;
}
if (!allSuccess) {
throw new BatchUpdateException(message, result);
}
return result;
}

@Override
Expand Down

0 comments on commit b03a057

Please sign in to comment.