Skip to content

Commit

Permalink
Merge 12226d9 into 66eb044
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Mar 10, 2020
2 parents 66eb044 + 12226d9 commit 7c41f6b
Show file tree
Hide file tree
Showing 43 changed files with 539 additions and 510 deletions.
Expand Up @@ -70,21 +70,21 @@ private void initIoTDB(String host, int port, String user, String password)
createTimeseries(sql);
}
}

private void addStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
session.setStorageGroup(storageGroup);
}

private void createTimeseries(String[] sql) throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
CompressionType compressionType = CompressionType.valueOf(sql[3]);
session.createTimeseries(timeseries, dataType, encoding, compressionType);
}
private void insert(String data) throws IoTDBConnectionException {

private void insert(String data) throws IoTDBConnectionException, StatementExecutionException {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
Expand Down Expand Up @@ -120,7 +120,7 @@ public void prepareConsume() throws MQClientException {
new String(msg.getBody())));
try {
insert(new String(msg.getBody()));
} catch (IoTDBConnectionException e) {
} catch (Exception e) {
logger.error(e.getMessage());
}
}
Expand Down Expand Up @@ -153,8 +153,8 @@ public static void main(String[] args)
/**
*Instantiate with specified consumer group name and specify name server addresses.
*/
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
Constant.IOTDB_CONNECTION_HOST,
Constant.IOTDB_CONNECTION_PORT,
Constant.IOTDB_CONNECTION_USER,
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb;

import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.Session;
Expand All @@ -36,7 +37,7 @@ public class SessionExample {
private static Session session;

public static void main(String[] args)
throws IoTDBConnectionException, StatementExecutionException {
throws IoTDBConnectionException, StatementExecutionException, BatchExecutionException {
session = new Session("127.0.0.1", 6667, "root", "root");
session.open();

Expand All @@ -61,7 +62,7 @@ public static void main(String[] args)
session.close();
}

private static void insert() throws IoTDBConnectionException {
private static void insert() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
Expand All @@ -76,7 +77,7 @@ private static void insert() throws IoTDBConnectionException {
}
}

private static void insertInBatch() throws IoTDBConnectionException {
private static void insertInBatch() throws IoTDBConnectionException, BatchExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
Expand Down Expand Up @@ -123,7 +124,7 @@ private static void insertInBatch() throws IoTDBConnectionException {
* Users need to control the count of RowBatch and write a batch when it reaches the maxBatchSize
*
*/
private static void insertRowBatch() throws IoTDBConnectionException {
private static void insertRowBatch() throws IoTDBConnectionException, BatchExecutionException {
// The schema of sensors of one device
Schema schema = new Schema();
schema.registerMeasurement(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
Expand Down Expand Up @@ -154,13 +155,14 @@ private static void insertRowBatch() throws IoTDBConnectionException {
}
}

private static void deleteData() throws IoTDBConnectionException {
private static void deleteData() throws IoTDBConnectionException, StatementExecutionException {
String path = "root.sg1.d1.s1";
long deleteTime = 99;
session.deleteData(path, deleteTime);
}

private static void deleteTimeseries() throws IoTDBConnectionException {
private static void deleteTimeseries()
throws IoTDBConnectionException, StatementExecutionException {
List<String> paths = new ArrayList<>();
paths.add("root.sg1.d1.s1");
paths.add("root.sg1.d1.s2");
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Preconditions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -116,9 +115,9 @@ public void invoke(IN input, Context context) throws Exception {
}

convertText(event.getDevice(), event.getMeasurements(), event.getValues());
TSStatus status = session.insert(event.getDevice(), event.getTimestamp(),
event.getMeasurements(), event.getValues());
LOG.debug("send event result: {}", status);
session.insert(event.getDevice(), event.getTimestamp(), event.getMeasurements(),
event.getValues());
LOG.debug("send event successfully");
}

public IoTDBSink<IN> withBatchSize(int batchSize) {
Expand Down Expand Up @@ -177,8 +176,8 @@ private void flush() throws Exception {
measurementsList.add(event.getMeasurements());
valuesList.add(event.getValues());
}
List<TSStatus> statusList = session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send events result: {}", statusList);
session.insertInBatch(deviceIds, timestamps, measurementsList, valuesList);
LOG.debug("send event successfully");
batchList.clear();
}
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ public IoTDBSQLException(String reason) {
}

public IoTDBSQLException(String reason, TSStatus status) {
super(reason, status.sqlState, status.statusType.code);
super(reason, status.message, status.code);
}

public IoTDBSQLException(Throwable cause) {
Expand Down
46 changes: 14 additions & 32 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;

import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -258,41 +257,24 @@ public int[] executeBatch() throws SQLException {
}
}

private int[] executeBatchSQL() throws TException, SQLException {
private int[] executeBatchSQL() throws TException, BatchUpdateException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId,
batchSQLList);
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId, batchSQLList);
TSExecuteBatchStatementResp execResp = client.executeBatchStatement(execReq);
if (execResp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS
.getStatusCode()) {
if (execResp.getResult() == null) {
return new int[0];
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
return updateArray;
int[] result = new int[execResp.statusList.size()];
boolean allSuccess = true;
String message = "";
for (int i = 0; i < result.length; i++) {
result[i] = execResp.statusList.get(i).code;
if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
allSuccess = false;
message = execResp.statusList.get(i).message;
}
} else {
BatchUpdateException exception;
if (execResp.getResult() == null) {
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
new int[0]);
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
updateArray);
}
throw exception;
}
if (!allSuccess) {
throw new BatchUpdateException(message, result);
}
return result;
}

@Override
Expand Down
55 changes: 27 additions & 28 deletions jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
Expand Up @@ -29,12 +29,12 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSStatusType;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Before;
Expand All @@ -51,10 +51,7 @@ public class BatchTest {
private long sessionId;
@Mock
private IoTDBStatement statement;
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatusType errorStatus = new TSStatusType(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private TSStatus Status_ERROR = new TSStatus(errorStatus);
private TSStatus errorStatus = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
private TSExecuteBatchStatementResp resp;
private ZoneId zoneID = ZoneId.systemDefault();

Expand All @@ -74,25 +71,26 @@ public void tearDown() throws Exception {
@Test
public void testExecuteBatchSQL1() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_SUCCESS);
resp = new TSExecuteBatchStatementResp();
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
int[] result = statement.executeBatch();
assertEquals(result.length, 0);
assertEquals(1, result.length);

List<Integer> resExpected = new ArrayList<Integer>() {
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
};
resp.setResult(resExpected);
resp.setStatusList(resExpected);

statement.addBatch("SET STORAGE GROUP TO root.ln.wf01.wt01");
statement.addBatch(
Expand All @@ -112,17 +110,18 @@ public void testExecuteBatchSQL1() throws SQLException, TException {
statement.addBatch(
"insert into root.ln.wf01.wt01(timestamp,temperature) vvvvvv(1509465720000,20.092794)");
result = statement.executeBatch();
assertEquals(result.length, resExpected.size());
for (int i = 0; i < resExpected.size(); i++) {
assertEquals(result[i], (int) resExpected.get(i));
assertEquals(resp.statusList.size(), result.length);
for (int i = 0; i < resp.statusList.size(); i++) {
assertEquals(resExpected.get(i).code, result[i]);
}
statement.clearBatch();
}

@Test(expected = BatchUpdateException.class)
public void testExecuteBatchSQL2() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_ERROR);
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR);

when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
statement.executeBatch();
}
Expand All @@ -131,21 +130,21 @@ public void testExecuteBatchSQL2() throws SQLException, TException {
@Test
public void testExecuteBatchSQL3() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_ERROR);
List<Integer> resExpected = new ArrayList<Integer>() {
resp = RpcUtils.getTSBatchExecuteStatementResp(errorStatus);
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR));
}
};
resp.setResult(resExpected);
resp.setStatusList(resExpected);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
try {
statement.executeBatch();
} catch (BatchUpdateException e) {
int[] result = e.getUpdateCounts();
for (int i = 0; i < resExpected.size(); i++) {
assertEquals(result[i], (int) resExpected.get(i));
assertEquals(resExpected.get(i).code, result[i]);
}
return;
}
Expand Down
Expand Up @@ -21,10 +21,9 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;
Expand All @@ -40,8 +39,7 @@ public class IoTDBConnectionTest {
private TSIService.Iface client;

private IoTDBConnection connection = new IoTDBConnection();
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private TSStatus successStatus = RpcUtils.SUCCESS_STATUS;
private long sessionId;

@Before
Expand All @@ -57,7 +55,7 @@ public void tearDown() throws Exception {
public void testSetTimeZone() throws TException, IoTDBSQLException {
String timeZone = "Asia/Shanghai";
when(client.setTimeZone(any(TSSetTimeZoneReq.class)))
.thenReturn(new TSStatus(Status_SUCCESS));
.thenReturn(new TSStatus(successStatus));
connection.setClient(client);
connection.setTimeZone(timeZone);
assertEquals(connection.getTimeZone(), timeZone);
Expand All @@ -67,13 +65,13 @@ public void testSetTimeZone() throws TException, IoTDBSQLException {
public void testGetTimeZone() throws IoTDBSQLException, TException {
String timeZone = "GMT+:08:00";
sessionId = connection.getSessionId();
when(client.getTimeZone(sessionId)).thenReturn(new TSGetTimeZoneResp(Status_SUCCESS, timeZone));
when(client.getTimeZone(sessionId)).thenReturn(new TSGetTimeZoneResp(successStatus, timeZone));
connection.setClient(client);
assertEquals(connection.getTimeZone(), timeZone);
}

@Test
public void testGetServerProperties() throws IoTDBSQLException, TException {
public void testGetServerProperties() throws TException {
final String version = "v0.1";
@SuppressWarnings("serial") final List<String> supportedAggregationTime = new ArrayList<String>() {
{
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class IoTDBPreparedStatementTest {
@Mock
private Iface client;
@Mock
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private long queryId;
private long sessionId;
Expand Down

0 comments on commit 7c41f6b

Please sign in to comment.