Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static void insertRecordInObject()
}
}

private static void insertRecords() throws IoTDBConnectionException, BatchExecutionException {
private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException {
String deviceId = "root.sg1.d1";
List<String> measurements = new ArrayList<>();
measurements.add("s1");
Expand Down Expand Up @@ -237,7 +237,7 @@ private static void insertRecords() throws IoTDBConnectionException, BatchExecut
*
* Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize
*/
private static void insertTablet() throws IoTDBConnectionException, BatchExecutionException {
private static void insertTablet() throws IoTDBConnectionException, StatementExecutionException {
// The schema of sensors of one device
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
Expand Down Expand Up @@ -268,7 +268,7 @@ private static void insertTablet() throws IoTDBConnectionException, BatchExecuti
}
}

private static void insertTablets() throws IoTDBConnectionException, BatchExecutionException {
private static void insertTablets() throws IoTDBConnectionException, StatementExecutionException {
// The schema of sensors of one device
List<MeasurementSchema> schemaList = new ArrayList<>();
schemaList.add(new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.RLE));
Expand Down
20 changes: 13 additions & 7 deletions jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
Expand Down Expand Up @@ -261,15 +260,22 @@ public int[] executeBatch() throws SQLException {
private int[] executeBatchSQL() throws TException, BatchUpdateException {
isCancelled = false;
TSExecuteBatchStatementReq execReq = new TSExecuteBatchStatementReq(sessionId, batchSQLList);
TSExecuteBatchStatementResp execResp = client.executeBatchStatement(execReq);
int[] result = new int[execResp.statusList.size()];
TSStatus execResp = client.executeBatchStatement(execReq);
int[] result = new int[batchSQLList.size()];
boolean allSuccess = true;
String message = "";
for (int i = 0; i < result.length; i++) {
result[i] = execResp.statusList.get(i).code;
if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
allSuccess = false;
message = execResp.statusList.get(i).message;
if (execResp.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
result[i] = execResp.getSubStatus().get(i).code;
if (result[i] != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
allSuccess = false;
message = execResp.getSubStatus().get(i).message;
}
} else {
allSuccess =
allSuccess && execResp.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
result[i] = execResp.getCode();
message = execResp.getMessage();
}
}
if (!allSuccess) {
Expand Down
27 changes: 17 additions & 10 deletions jdbc/src/test/java/org/apache/iotdb/jdbc/BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementResp;
import org.apache.iotdb.service.rpc.thrift.TSIService;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
Expand All @@ -52,7 +52,7 @@ public class BatchTest {
@Mock
private IoTDBStatement statement;
private TSStatus errorStatus = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR);
private TSExecuteBatchStatementResp resp;
private TSStatus resp;
private ZoneId zoneID = ZoneId.systemDefault();

@Before
Expand All @@ -71,8 +71,10 @@ public void tearDown() throws Exception {
@Test
public void testExecuteBatchSQL1() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = new TSExecuteBatchStatementResp();
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SUCCESS_STATUS);
statement.addBatch("sql1");
resp = new TSStatus();
resp =
RpcUtils.getStatus(Collections.singletonList(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)));
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
int[] result = statement.executeBatch();
assertEquals(1, result.length);
Expand All @@ -90,8 +92,9 @@ public void testExecuteBatchSQL1() throws SQLException, TException {
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
}
};
resp.setStatusList(resExpected);
resp.setSubStatus(resExpected);

statement.clearBatch();
statement.addBatch("SET STORAGE GROUP TO root.ln.wf01.wt01");
statement.addBatch(
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN");
Expand All @@ -110,8 +113,8 @@ public void testExecuteBatchSQL1() throws SQLException, TException {
statement.addBatch(
"insert into root.ln.wf01.wt01(timestamp,temperature) vvvvvv(1509465720000,20.092794)");
result = statement.executeBatch();
assertEquals(resp.statusList.size(), result.length);
for (int i = 0; i < resp.statusList.size(); i++) {
assertEquals(resp.getSubStatus().size(), result.length);
for (int i = 0; i < resp.getSubStatus().size(); i++) {
assertEquals(resExpected.get(i).code, result[i]);
}
statement.clearBatch();
Expand All @@ -120,7 +123,9 @@ public void testExecuteBatchSQL1() throws SQLException, TException {
@Test(expected = BatchUpdateException.class)
public void testExecuteBatchSQL2() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = RpcUtils.getTSBatchExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR);
statement.addBatch("sql1");
resp =
RpcUtils.getStatus(Collections.singletonList(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR)));

when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
statement.executeBatch();
Expand All @@ -130,14 +135,16 @@ public void testExecuteBatchSQL2() throws SQLException, TException {
@Test
public void testExecuteBatchSQL3() throws SQLException, TException {
Statement statement = connection.createStatement();
resp = RpcUtils.getTSBatchExecuteStatementResp(errorStatus);
resp = RpcUtils.getStatus(Collections.singletonList(errorStatus));
statement.addBatch("sql1");
statement.addBatch("sql1");
List<TSStatus> resExpected = new ArrayList<TSStatus>() {
{
add(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
add(RpcUtils.getStatus(TSStatusCode.SQL_PARSE_ERROR));
}
};
resp.setStatusList(resExpected);
resp.setSubStatus(resExpected);
when(client.executeBatchStatement(any(TSExecuteBatchStatementReq.class))).thenReturn(resp);
try {
statement.executeBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
Expand Down Expand Up @@ -289,20 +290,19 @@ public void insert(InsertPlan insertPlan) throws StorageEngineException {
*
* @return result of each row
*/
public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws StorageEngineException {
public void insertTablet(InsertTabletPlan insertTabletPlan)
throws StorageEngineException, BatchInsertionException {
StorageGroupProcessor storageGroupProcessor;
try {
storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
} catch (StorageEngineException e) {
logger.warn("get StorageGroupProcessor of device {} failed, because {}",
insertTabletPlan.getDeviceId(),
e.getMessage(), e);
throw new StorageEngineException(e);
throw new StorageEngineException(String.format("Get StorageGroupProcessor of device %s "
+ "failed", insertTabletPlan.getDeviceId()), e);
}

// TODO monitor: update statistics
try {
return storageGroupProcessor.insertTablet(insertTabletPlan);
storageGroupProcessor.insertTablet(insertTabletPlan);
} catch (WriteProcessException e) {
throw new StorageEngineException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.LoadFileException;
import org.apache.iotdb.db.exception.MergeException;
Expand Down Expand Up @@ -622,10 +624,19 @@ public void insert(InsertPlan insertPlan) throws WriteProcessException {
}
}

public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException {
/**
* Insert a tablet (rows belonging to the same devices) into this storage group.
* @param insertTabletPlan
* @throws WriteProcessException when update last cache failed
* @throws BatchInsertionException if some of the rows failed to be inserted
*/
public void insertTablet(InsertTabletPlan insertTabletPlan) throws WriteProcessException,
BatchInsertionException {
writeLock();
try {
TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
boolean noFailure = true;

/*
* assume that batch has been sorted by client
Expand All @@ -638,13 +649,14 @@ public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WritePr
results[loc] = RpcUtils.getStatus(TSStatusCode.OUT_OF_TTL_ERROR,
"time " + currTime + " in current line is out of TTL: " + dataTTL);
loc++;
noFailure = false;
} else {
break;
}
}
// loc pointing at first legal position
if (loc == insertTabletPlan.getRowCount()) {
return results;
throw new BatchInsertionException(results);
}
// before is first start point
int before = loc;
Expand All @@ -659,12 +671,12 @@ public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WritePr
while (loc < insertTabletPlan.getRowCount()) {
long time = insertTabletPlan.getTimes()[loc];
long curTimePartition = StorageEngine.getTimePartition(time);
results[loc] = RpcUtils.SUCCESS_STATUS;
// start next partition
if (curTimePartition != beforeTimePartition) {
// insert last time partition
insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition);
noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
results,
beforeTimePartition) && noFailure;
// re initialize
before = loc;
beforeTimePartition = curTimePartition;
Expand All @@ -678,8 +690,8 @@ public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WritePr
// judge if we should insert sequence
if (!isSequence && time > lastFlushTime) {
// insert into unsequence and then start sequence
insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
beforeTimePartition);
noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, false, results,
beforeTimePartition) && noFailure;
before = loc;
isSequence = true;
}
Expand All @@ -689,14 +701,16 @@ public TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws WritePr

// do not forget last part
if (before < loc) {
insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence, results,
beforeTimePartition);
noFailure = insertTabletToTsFileProcessor(insertTabletPlan, before, loc, isSequence,
results, beforeTimePartition) && noFailure;
}
long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault(
insertTabletPlan.getDeviceId(), Long.MIN_VALUE);
tryToUpdateBatchInsertLastCache(insertTabletPlan, globalLatestFlushedTime);

return results;
if (!noFailure) {
throw new BatchInsertionException(results);
}
} finally {
writeUnlock();
}
Expand All @@ -719,12 +733,13 @@ private boolean isAlive(long time) {
* @param end end index of rows to be inserted in insertTabletPlan
* @param results result array
* @param timePartitionId time partition id
* @return false if any failure occurs when inserting the tablet, true otherwise
*/
private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
private boolean insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) {
// return when start >= end
if (start >= end) {
return;
return true;
}

TsFileProcessor tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence);
Expand All @@ -733,14 +748,14 @@ private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
"can not create TsFileProcessor, timePartitionId: " + timePartitionId);
}
return;
return false;
}

try {
tsFileProcessor.insertTablet(insertTabletPlan, start, end, results);
} catch (WriteProcessException e) {
logger.error("insert to TsFileProcessor error ", e);
return;
return false;
}

latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>());
Expand All @@ -756,6 +771,7 @@ private void insertTabletToTsFileProcessor(InsertTabletPlan insertTabletPlan,
if (tsFileProcessor.shouldFlush()) {
fileFlushPolicy.apply(this, tsFileProcessor, sequence);
}
return true;
}

private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestFlushedTime)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.exception;

import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.service.rpc.thrift.TSStatus;

public class BatchInsertionException extends QueryProcessException {

private TSStatus[] failingStatus;

public BatchInsertionException(TSStatus[] failingStatus) {
super("Batch insertion failed");
this.failingStatus = failingStatus;
}

public void setFailingStatus(TSStatus[] failingStatus) {
this.failingStatus = failingStatus;
}

public TSStatus[] getFailingStatus() {
return failingStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.sql.SQLException;
import org.apache.iotdb.db.exception.BatchInsertionException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
Expand Down Expand Up @@ -93,6 +94,7 @@ void update(Path path, long startTime, long endTime, String value)
* execute batch insert plan
*
* @return result of each row
* @throws BatchInsertionException when some of the rows failed
*/
TSStatus[] insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException;
}
Loading