Skip to content

Commit

Permalink
[IOTDB-386] Vectorize the raw data query process (#652)
Browse files Browse the repository at this point in the history
* add batch reader interfaces

* modify docs in SeriesReaderWithoutValueFilter

* Add data to TSExecuteStatementResp (#631)

* add fill buffer in EngineDataSetWithoutValueFilter (#646)

* [IOTDB-330] Improve the reading method of unsequence data (#619)

* change UnseqResourceMergeReader to IBatchReader

* Fix bug of "Has not execute query" error when querying (#656)

* Original query process (#653)

* add aggregation reader

* Update SeriesReaderWithoutValueFilter.java

* fix code-coverage stage does not compile service-rpc and some other modules

Co-authored-by: Jackie Tien <JackieTien@foxmail.com>
Co-authored-by: Dawei Liu <atoildw@163.com>
Co-authored-by: Zesong Sun <szs19@mails.tsinghua.edu.cn>
Co-authored-by: Lei Rui <33376433+LeiRui@users.noreply.github.com>
  • Loading branch information
5 people committed Dec 26, 2019
1 parent e7adcbb commit f0f229d
Show file tree
Hide file tree
Showing 128 changed files with 2,246 additions and 1,999 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ matrix:
jdk: openjdk8
script:
# now, grafana has no tests; spark-* tests are written by scala
- mvn post-integration-test -Pcode-coverage -Pcoveralls -Dservice_name=travis_ci -pl tsfile,client,jdbc,server,session,hadoop,hive-connector
- mvn post-integration-test -Pcode-coverage
- mvn coveralls:report -Dservice_name=travis_ci -pl tsfile,jdbc,server,session,client,hadoop,hive-connector

cache:
directories:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static void main(String[] args) throws IOException {
defaultTimeDecoder);
while (reader1.hasNextBatch()) {
BatchData batchData = reader1.nextBatch();
while (batchData.hasNext()) {
while (batchData.hasCurrent()) {
System.out.println(
"\t\t\ttime, value: " + batchData.currentTime() + ", " + batchData
.currentValue());
Expand Down
3 changes: 0 additions & 3 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,6 @@ public CallableStatement prepareCall(String arg0, int arg1, int arg2, int arg3)

@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
if (sql.equalsIgnoreCase("INSERT")) {
return new IoTDBPreparedInsertionStatement(this, getClient(), sessionId, zoneId);
}
return new IoTDBPreparedStatement(this, getClient(), sessionId, sql, zoneId);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ public class IoTDBPreparedStatement extends IoTDBStatement implements PreparedSt
*/
private final Map<Integer, String> parameters = new HashMap<>();

IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
Long sessionId, ZoneId zoneId) throws SQLException{
super(connection, client, sessionId, zoneId);
}

IoTDBPreparedStatement(IoTDBConnection connection, Iface client,
Long sessionId, String sql,
ZoneId zoneId) throws SQLException {
Expand Down
110 changes: 63 additions & 47 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBQueryResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public IoTDBQueryResultSet() {

public IoTDBQueryResultSet(Statement statement, List<String> columnNameList,
List<String> columnTypeList, boolean ignoreTimeStamp, TSIService.Iface client,
String sql, long queryId, long sessionId)
String sql, long queryId, long sessionId, TSQueryDataSet dataset)
throws SQLException {
this.statement = statement;
this.fetchSize = statement.getFetchSize();
Expand Down Expand Up @@ -102,6 +102,7 @@ public IoTDBQueryResultSet(Statement statement, List<String> columnNameList,
this.client = client;
this.sql = sql;
this.queryId = queryId;
this.tsQueryDataSet = dataset;
this.sessionId = sessionId;
}

Expand Down Expand Up @@ -337,10 +338,8 @@ public double getDouble(String columnName) throws SQLException {
int index = columnInfoMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToDouble(values[index]);
}
else {
throw new SQLException(
String.format(VALUE_IS_NULL, columnName));
} else {
throw new SQLException(String.format(VALUE_IS_NULL, columnName));
}
}

Expand Down Expand Up @@ -375,10 +374,8 @@ public float getFloat(String columnName) throws SQLException {
int index = columnInfoMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToFloat(values[index]);
}
else {
throw new SQLException(
String.format(VALUE_IS_NULL, columnName));
} else {
throw new SQLException(String.format(VALUE_IS_NULL, columnName));
}
}

Expand All @@ -398,10 +395,8 @@ public int getInt(String columnName) throws SQLException {
int index = columnInfoMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToInt(values[index]);
}
else {
throw new SQLException(
String.format(VALUE_IS_NULL, columnName));
} else {
throw new SQLException(String.format(VALUE_IS_NULL, columnName));
}
}

Expand All @@ -419,10 +414,8 @@ public long getLong(String columnName) throws SQLException {
int index = columnInfoMap.get(columnName) - START_INDEX;
if (values[index] != null) {
return BytesUtils.bytesToLong(values[index]);
}
else {
throw new SQLException(
String.format(VALUE_IS_NULL, columnName));
} else {
throw new SQLException(String.format(VALUE_IS_NULL, columnName));
}
}

Expand Down Expand Up @@ -668,32 +661,49 @@ public void moveToInsertRow() throws SQLException {

@Override
public boolean next() throws SQLException {
if ((tsQueryDataSet == null || !tsQueryDataSet.time.hasRemaining()) && !emptyResultSet) {
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId);
try {
TSFetchResultsResp resp = client.fetchResults(req);
try {
RpcUtils.verifySuccess(resp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
}
if (!resp.hasResultSet) {
emptyResultSet = true;
} else {
tsQueryDataSet = resp.getQueryDataSet();
rowsIndex = 0;
}
} catch (TException e) {
throw new SQLException(
"Cannot fetch result from server, because of network connection: {} ", e);
}

if (hasCachedResults()) {
constructOneRow();
return true;
}
if (emptyResultSet) {
return false;
}
constructOneRow();
return true;
if (fetchResults()) {
constructOneRow();
return true;
}
return false;
}


/**
* @return true means has results
*/
private boolean fetchResults() throws SQLException {
rowsIndex = 0;
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId);
try {
TSFetchResultsResp resp = client.fetchResults(req);

try {
RpcUtils.verifySuccess(resp.getStatus());
} catch (IoTDBRPCException e) {
throw new IoTDBSQLException(e.getMessage(), resp.getStatus());
}
if (!resp.hasResultSet) {
emptyResultSet = true;
} else {
tsQueryDataSet = resp.getQueryDataSet();
}
return resp.hasResultSet;
} catch (TException e) {
throw new SQLException(
"Cannot fetch result from server, because of network connection: {} ", e);
}
}

private boolean hasCachedResults() {
return tsQueryDataSet != null && tsQueryDataSet.time.hasRemaining();
}

private void constructOneRow() {
Expand All @@ -710,28 +720,33 @@ private void constructOneRow() {
TSDataType dataType = TSDataType.valueOf(columnTypeDeduplicatedList.get(i));
switch (dataType) {
case BOOLEAN:
if (values[i] == null)
if (values[i] == null) {
values[i] = new byte[1];
}
valueBuffer.get(values[i]);
break;
case INT32:
if (values[i] == null)
if (values[i] == null) {
values[i] = new byte[Integer.BYTES];
}
valueBuffer.get(values[i]);
break;
case INT64:
if (values[i] == null)
if (values[i] == null) {
values[i] = new byte[Long.BYTES];
}
valueBuffer.get(values[i]);
break;
case FLOAT:
if (values[i] == null)
if (values[i] == null) {
values[i] = new byte[Float.BYTES];
}
valueBuffer.get(values[i]);
break;
case DOUBLE:
if (values[i] == null)
if (values[i] == null) {
values[i] = new byte[Double.BYTES];
}
valueBuffer.get(values[i]);
break;
case TEXT:
Expand All @@ -740,7 +755,7 @@ private void constructOneRow() {
break;
default:
throw new UnSupportedDataTypeException(
String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
String.format("Data type %s is not supported.", columnTypeDeduplicatedList.get(i)));
}
}
}
Expand All @@ -749,8 +764,9 @@ private void constructOneRow() {

/**
* judge whether the specified column value is null in the current position
* @param index column index
* @return
*
* @param index series index
* @param rowNum current position
*/
private boolean isNull(int index, int rowNum) {
byte bitmap = currentBitmap[index];
Expand Down
Loading

0 comments on commit f0f229d

Please sign in to comment.