Skip to content

Commit

Permalink
Merge 991cba9 into 66eb044
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Mar 10, 2020
2 parents 66eb044 + 991cba9 commit 4df96e0
Show file tree
Hide file tree
Showing 34 changed files with 515 additions and 485 deletions.
Expand Up @@ -70,21 +70,21 @@ private void initIoTDB(String host, int port, String user, String password)
createTimeseries(sql);
}
}

private void addStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
session.setStorageGroup(storageGroup);
}

private void createTimeseries(String[] sql) throws StatementExecutionException, IoTDBConnectionException {
String timeseries = sql[0];
TSDataType dataType = TSDataType.valueOf(sql[1]);
TSEncoding encoding = TSEncoding.valueOf(sql[2]);
CompressionType compressionType = CompressionType.valueOf(sql[3]);
session.createTimeseries(timeseries, dataType, encoding, compressionType);
}
private void insert(String data) throws IoTDBConnectionException {

private void insert(String data) throws IoTDBConnectionException, StatementExecutionException {
String[] dataArray = data.split(",");
String device = dataArray[0];
long time = Long.parseLong(dataArray[1]);
Expand Down Expand Up @@ -120,7 +120,7 @@ public void prepareConsume() throws MQClientException {
new String(msg.getBody())));
try {
insert(new String(msg.getBody()));
} catch (IoTDBConnectionException e) {
} catch (Exception e) {
logger.error(e.getMessage());
}
}
Expand Down Expand Up @@ -153,8 +153,8 @@ public static void main(String[] args)
/**
*Instantiate with specified consumer group name and specify name server addresses.
*/
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
RocketMQConsumer consumer = new RocketMQConsumer(Constant.CONSUMER_GROUP,
Constant.SERVER_ADDRESS,
Constant.IOTDB_CONNECTION_HOST,
Constant.IOTDB_CONNECTION_PORT,
Constant.IOTDB_CONNECTION_USER,
Expand Down
Expand Up @@ -31,7 +31,7 @@ public IoTDBSQLException(String reason) {
}

public IoTDBSQLException(String reason, TSStatus status) {
super(reason, status.sqlState, status.statusType.code);
super(reason, status.message, status.code);
}

public IoTDBSQLException(Throwable cause) {
Expand Down
46 changes: 14 additions & 32 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
Expand Up @@ -24,7 +24,6 @@
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;

import java.sql.*;
import java.time.ZoneId;
import java.util.ArrayList;
Expand Down Expand Up @@ -258,41 +257,24 @@ public int[] executeBatch() throws SQLException {
}
}

private int[] executeBatchSQL() throws TException, SQLException {
private int[] executeBatchSQL() throws TException, BatchUpdateException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId,
batchSQLList);
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId, batchSQLList);
TSExecuteBatchStatementResp execResp = client.executeBatchStatement(execReq);
if (execResp.getStatus().getStatusType().getCode() == TSStatusCode.SUCCESS_STATUS
.getStatusCode()) {
if (execResp.getResult() == null) {
return new int[0];
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
return updateArray;
int[] result = new int[execResp.statusList.size()];
boolean allSuccess = true;
String message = "";
for (int i = 0; i < result.length; i++) {
result[i] = execResp.statusList.get(i).code;
if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
allSuccess = false;
message = execResp.statusList.get(i).message;
}
} else {
BatchUpdateException exception;
if (execResp.getResult() == null) {
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
new int[0]);
} else {
List<Integer> result = execResp.getResult();
int len = result.size();
int[] updateArray = new int[len];
for (int i = 0; i < len; i++) {
updateArray[i] = result.get(i);
}
exception = new BatchUpdateException(execResp.getStatus().getStatusType().getMessage(),
updateArray);
}
throw exception;
}
if (!allSuccess) {
throw new BatchUpdateException(message, result);
}
return result;
}

@Override
Expand Down
55 changes: 27 additions & 28 deletions jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
Expand Up @@ -29,12 +29,12 @@
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSStatusType;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Before;
Expand All @@ -51,10 +51,7 @@ public class BatchTest {
private long sessionId;
@Mock
private IoTDBStatement statement;
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatusType errorStatus = new TSStatusType(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private TSStatus Status_ERROR = new TSStatus(errorStatus);
private TSStatus errorStatus = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
private TSExecuteBatchStatementResp resp;
private ZoneId zoneID = ZoneId.systemDefault();

Expand All @@ -74,25 +71,26 @@ public void tearDown() throws Exception {
@Test
public void testExecuteBatchSQL1() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_SUCCESS);
resp = new TSExecuteBatchStatementResp();
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
int[] result = statement.executeBatch();
assertEquals(result.length, 0);
assertEquals(result.length, 1);

List<Integer> resExpected = new ArrayList<Integer>() {
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(Statement.SUCCESS_NO_INFO);
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
};
resp.setResult(resExpected);
resp.setStatusList(resExpected);

statement.addBatch("SET STORAGE GROUP TO root.ln.wf01.wt01");
statement.addBatch(
Expand All @@ -112,17 +110,18 @@ public void testExecuteBatchSQL1() throws SQLException, TException {
statement.addBatch(
"insert into root.ln.wf01.wt01(timestamp,temperature) vvvvvv(1509465720000,20.092794)");
result = statement.executeBatch();
assertEquals(result.length, resExpected.size());
for (int i = 0; i < resExpected.size(); i++) {
assertEquals(result[i], (int) resExpected.get(i));
assertEquals(resp.statusList.size(), result.length);
for (int i = 0; i < resp.statusList.size(); i++) {
assertEquals(resExpected.get(i).code, result[i]);
}
statement.clearBatch();
}

@Test(expected = BatchUpdateException.class)
public void testExecuteBatchSQL2() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_ERROR);
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR);

when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
statement.executeBatch();
}
Expand All @@ -131,21 +130,21 @@ public void testExecuteBatchSQL2() throws SQLException, TException {
@Test
public void testExecuteBatchSQL3() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp(Status_ERROR);
List<Integer> resExpected = new ArrayList<Integer>() {
resp = RpcUtils.getTSBatchExecuteStatementResp(errorStatus);
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(Statement.SUCCESS_NO_INFO);
add(Statement.EXECUTE_FAILED);
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR));
}
};
resp.setResult(resExpected);
resp.setStatusList(resExpected);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
try {
statement.executeBatch();
} catch (BatchUpdateException e) {
int[] result = e.getUpdateCounts();
for (int i = 0; i < resExpected.size(); i++) {
assertEquals(result[i], (int) resExpected.get(i));
assertEquals(resExpected.get(i).code, result[i]);
}
return;
}
Expand Down
Expand Up @@ -21,10 +21,9 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.List;

import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.*;
import org.apache.thrift.TException;
Expand All @@ -40,8 +39,7 @@ public class IoTDBConnectionTest {
private TSIService.Iface client;

private IoTDBConnection connection = new IoTDBConnection();
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private TSStatus successStatus = RpcUtils.SUCCESS_STATUS;
private long sessionId;

@Before
Expand All @@ -57,7 +55,7 @@ public void tearDown() throws Exception {
public void testSetTimeZone() throws TException, IoTDBSQLException {
String timeZone = "Asia/Shanghai";
when(client.setTimeZone(any(TSSetTimeZoneReq.class)))
.thenReturn(new TSStatus(Status_SUCCESS));
.thenReturn(new TSStatus(successStatus));
connection.setClient(client);
connection.setTimeZone(timeZone);
assertEquals(connection.getTimeZone(), timeZone);
Expand All @@ -67,13 +65,13 @@ public void testSetTimeZone() throws TException, IoTDBSQLException {
public void testGetTimeZone() throws IoTDBSQLException, TException {
String timeZone = "GMT+:08:00";
sessionId = connection.getSessionId();
when(client.getTimeZone(sessionId)).thenReturn(new TSGetTimeZoneResp(Status_SUCCESS, timeZone));
when(client.getTimeZone(sessionId)).thenReturn(new TSGetTimeZoneResp(successStatus, timeZone));
connection.setClient(client);
assertEquals(connection.getTimeZone(), timeZone);
}

@Test
public void testGetServerProperties() throws IoTDBSQLException, TException {
public void testGetServerProperties() throws TException {
final String version = "v0.1";
@SuppressWarnings("serial") final List<String> supportedAggregationTime = new ArrayList<String>() {
{
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class IoTDBPreparedStatementTest {
@Mock
private Iface client;
@Mock
private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private long queryId;
private long sessionId;
Expand Down
Expand Up @@ -36,6 +36,7 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
Expand All @@ -47,7 +48,6 @@
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSStatusType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -118,9 +118,7 @@ public class IoTDBQueryResultSetTest {
@Mock
private TSFetchResultsResp fetchResultsResp;

private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
"");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private TSStatus successStatus = RpcUtils.SUCCESS_STATUS;
private ZoneId zoneID = ZoneId.systemDefault();

@Before
Expand All @@ -134,15 +132,15 @@ public void before() throws Exception {
when(connection.isClosed()).thenReturn(false);
when(client.executeStatement(any(TSExecuteStatementReq.class))).thenReturn(execResp);
when(execResp.getQueryId()).thenReturn(queryId);
when(execResp.getStatus()).thenReturn(Status_SUCCESS);
when(execResp.getStatus()).thenReturn(successStatus);

when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
when(fetchMetadataResp.getStatus()).thenReturn(Status_SUCCESS);
when(fetchMetadataResp.getStatus()).thenReturn(successStatus);

when(client.fetchResults(any(TSFetchResultsReq.class))).thenReturn(fetchResultsResp);
when(fetchResultsResp.getStatus()).thenReturn(Status_SUCCESS);
when(fetchResultsResp.getStatus()).thenReturn(successStatus);

TSStatus closeResp = Status_SUCCESS;
TSStatus closeResp = successStatus;
when(client.closeOperation(any(TSCloseOperationReq.class))).thenReturn(closeResp);
}

Expand Down
Expand Up @@ -24,12 +24,13 @@

import java.sql.SQLException;
import java.time.ZoneId;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
import org.apache.iotdb.service.rpc.thrift.TSIService.Iface;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSStatusType;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -49,8 +50,6 @@ public class IoTDBStatementTest {
@Mock
private TSFetchMetadataResp fetchMetadataResp;

private TSStatusType successStatus = new TSStatusType(TSStatusCode.SUCCESS_STATUS.getStatusCode(), "");
private TSStatus Status_SUCCESS = new TSStatus(successStatus);
private ZoneId zoneID = ZoneId.systemDefault();

@Before
Expand All @@ -59,7 +58,7 @@ public void setUp() throws Exception {
when(connection.getMetaData()).thenReturn(new IoTDBDatabaseMetadata(connection, client, sessionId));
when(connection.isClosed()).thenReturn(false);
when(client.fetchMetadata(any(TSFetchMetadataReq.class))).thenReturn(fetchMetadataResp);
when(fetchMetadataResp.getStatus()).thenReturn(Status_SUCCESS);
when(fetchMetadataResp.getStatus()).thenReturn(RpcUtils.SUCCESS_STATUS);
}

@After
Expand Down

0 comments on commit 4df96e0

Please sign in to comment.