From a5cfb37e31122de1e84d0ae3a631d1dc6b1cdbfe Mon Sep 17 00:00:00 2001 From: Jialin Qiao Date: Fri, 20 Mar 2020 16:48:57 +0800 Subject: [PATCH] avoid flushing empty memtable (#926) * avoid flushing empty nomal memtable --- .../apache/iotdb/db/engine/StorageEngine.java | 15 ++-- .../db/engine/memtable/AbstractMemTable.java | 13 ++- .../iotdb/db/engine/memtable/IMemTable.java | 5 +- .../storagegroup/StorageGroupProcessor.java | 89 ++++++++----------- .../engine/storagegroup/TsFileProcessor.java | 53 ++++------- .../iotdb/db/exception/IoTDBException.java | 5 ++ .../db/exception/StorageEngineException.java | 4 + .../db/exception/WriteProcessException.java | 4 + .../db/exception/query/OutOfTTLException.java | 3 +- .../apache/iotdb/db/monitor/StatMonitor.java | 3 +- .../db/writelog/recover/LogReplayer.java | 8 +- .../engine/cache/DeviceMetaDataCacheTest.java | 6 +- .../FileNodeManagerBenchmark.java | 3 +- .../StorageGroupProcessorTest.java | 9 +- .../iotdb/db/engine/storagegroup/TTLTest.java | 11 +-- .../storagegroup/TsFileProcessorTest.java | 10 +-- .../iotdb/db/integration/IoTDBTtlIT.java | 2 +- .../db/query/reader/ReaderTestHelper.java | 7 +- 18 files changed, 113 insertions(+), 137 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 6bd9b146f8dd..c63f7284990b 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -52,7 +52,6 @@ import org.apache.iotdb.db.exception.TsFileProcessorException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.runtime.StorageEngineFailureException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; @@ -253,23 +252,21 @@ public synchronized void reset() { * * @param insertPlan physical plan of insertion */ - public void insert(InsertPlan insertPlan) - throws StorageEngineException, QueryProcessException { + public void insert(InsertPlan insertPlan) throws StorageEngineException { StorageGroupProcessor storageGroupProcessor; try { storageGroupProcessor = getProcessor(insertPlan.getDeviceId()); - } catch (StorageEngineException e) { - logger.warn("get StorageGroupProcessor of device {} failed, because {}", - insertPlan.getDeviceId(), e.getMessage(), e); - throw new StorageEngineException(e); + } catch (Exception e) { + throw new StorageEngineException( + "get StorageGroupProcessor of device failed: " + insertPlan.getDeviceId(), e); } // TODO monitor: update statistics try { storageGroupProcessor.insert(insertPlan); - } catch (QueryProcessException e) { - throw new QueryProcessException(e); + } catch (WriteProcessException e) { + throw new StorageEngineException(e); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java index 3bd46dd573d6..06366c5cd188 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java @@ -27,8 +27,8 @@ import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.modification.Modification; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.rescon.TVListAllocator; @@ -37,7 +37,6 @@ import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.utils.Binary; public abstract class AbstractMemTable implements IMemTable { @@ -86,7 +85,7 @@ private IWritableMemChunk createIfNotExistAndGet(String deviceId, String measure protected abstract IWritableMemChunk genMemSeries(TSDataType dataType); @Override - public void insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { try { for (int i = 0; i < insertPlan.getValues().length; i++) { @@ -96,20 +95,20 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException { } long recordSizeInByte = MemUtils.getRecordSize(insertPlan); memSize += recordSizeInByte; - } catch (RuntimeException e) { - throw new QueryProcessException(e.getMessage()); + } catch (Exception e) { + throw new WriteProcessException(e.getMessage()); } } @Override public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end) - throws QueryProcessException { + throws WriteProcessException { try { write(batchInsertPlan, start, end); long recordSizeInByte = MemUtils.getRecordSize(batchInsertPlan, start, end); memSize += recordSizeInByte; } catch (RuntimeException e) { - throw new QueryProcessException(e.getMessage()); + throw new WriteProcessException(e.getMessage()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java index 1ffd5f0bafeb..7f48b46feaa8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.iotdb.db.engine.modification.Deletion; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -54,13 +55,13 @@ void write(String deviceId, String measurement, TSDataType dataType, */ long memSize(); - void insert(InsertPlan insertPlan) throws QueryProcessException; + void insert(InsertPlan insertPlan) throws WriteProcessException; /** * [start, end) */ void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end) - throws QueryProcessException; + throws WriteProcessException; ReadOnlyMemChunk query(String deviceId, String measurement, TSDataType dataType, TSEncoding encoding, Map props, long timeLowerBound) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 6642ae15c07f..295a0747983c 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -282,14 +282,7 @@ private void recover() throws StorageGroupProcessorException { .putAll(resource.getEndTimeMap()); partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()) .putAll(resource.getEndTimeMap()); - - for (Map.Entry mapEntry : resource.getEndTimeMap().entrySet()) { - if (!globalLatestFlushedTimeForEachDevice.containsKey(mapEntry.getKey()) - || globalLatestFlushedTimeForEachDevice.get(mapEntry.getKey()) - < mapEntry.getValue()) { - globalLatestFlushedTimeForEachDevice.put(mapEntry.getKey(), mapEntry.getValue()); - } - } + globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap()); } } } @@ -466,7 +459,7 @@ public void addMeasurement(String measurementId, TSDataType dataType, TSEncoding } } - public void insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { // reject insertions that are out of ttl if (!checkTTL(insertPlan.getTime())) { throw new OutOfTTLException(insertPlan.getTime(), (System.currentTimeMillis() - dataTTL)); @@ -475,15 +468,15 @@ public void insert(InsertPlan insertPlan) throws QueryProcessException { try { // init map long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime()); - latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()) - .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE); - partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()) - .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE); + + latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>()); + partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()); // insert to sequence or unSequence file insertToTsFileProcessor(insertPlan, insertPlan.getTime() > partitionLatestFlushedTimeForEachDevice.get(timePartitionId) - .get(insertPlan.getDeviceId())); + .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE)); + } finally { writeUnlock(); } @@ -584,7 +577,7 @@ private boolean checkTTL(long time) { private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan, int start, int end, boolean sequence, TSStatus[] results, long timePartitionId) throws WriteProcessException { - // return when start <= end + // return when start >= end if (start >= end) { return; } @@ -598,23 +591,24 @@ private void insertBatchToTsFileProcessor(BatchInsertPlan batchInsertPlan, return; } - tsFileProcessor.insertBatch(batchInsertPlan, start, end, results); + try { + tsFileProcessor.insertBatch(batchInsertPlan, start, end, results); + } catch (WriteProcessException e) { + logger.error("insert to TsFileProcessor error ", e); + return; + } - latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>()) - .putIfAbsent(batchInsertPlan.getDeviceId(), Long.MIN_VALUE); + latestTimeForEachDevice.computeIfAbsent(timePartitionId, t -> new HashMap<>()); // try to update the latest time of the device of this tsRecord - if (sequence && latestTimeForEachDevice.get(timePartitionId).get(batchInsertPlan.getDeviceId()) + if (sequence && latestTimeForEachDevice.get(timePartitionId) + .getOrDefault(batchInsertPlan.getDeviceId(), Long.MIN_VALUE) < batchInsertPlan.getTimes()[end - 1]) { latestTimeForEachDevice.get(timePartitionId) .put(batchInsertPlan.getDeviceId(), batchInsertPlan.getTimes()[end - 1]); } - long globalLatestFlushedTime = - globalLatestFlushedTimeForEachDevice.computeIfAbsent( - batchInsertPlan.getDeviceId(), k -> Long.MIN_VALUE); + long globalLatestFlushedTime = globalLatestFlushedTimeForEachDevice.getOrDefault( + batchInsertPlan.getDeviceId(), Long.MIN_VALUE); tryToUpdateBatchInsertLastCache(batchInsertPlan, globalLatestFlushedTime); - if (globalLatestFlushedTime < batchInsertPlan.getMaxTime()) - globalLatestFlushedTimeForEachDevice.put( - batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime()); // check memtable size and may async try to flush the work memtable if (tsFileProcessor.shouldFlush()) { @@ -640,9 +634,8 @@ public void tryToUpdateBatchInsertLastCache(BatchInsertPlan plan, Long latestFlu } private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) - throws QueryProcessException { + throws WriteProcessException { TsFileProcessor tsFileProcessor; - boolean result; long timePartitionId = StorageEngine.fromTimeToTimePartition(insertPlan.getTime()); tsFileProcessor = getOrCreateTsFileProcessor(timePartitionId, sequence); @@ -652,22 +645,19 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) } // insert TsFileProcessor - result = tsFileProcessor.insert(insertPlan); + tsFileProcessor.insert(insertPlan); // try to update the latest time of the device of this tsRecord - if (result - && latestTimeForEachDevice.get(timePartitionId).get(insertPlan.getDeviceId()) < insertPlan - .getTime()) { + if (latestTimeForEachDevice.get(timePartitionId) + .getOrDefault(insertPlan.getDeviceId(), Long.MIN_VALUE) < insertPlan.getTime()) { latestTimeForEachDevice.get(timePartitionId) .put(insertPlan.getDeviceId(), insertPlan.getTime()); } - long globalLatestFlushTime = - globalLatestFlushedTimeForEachDevice.computeIfAbsent( - insertPlan.getDeviceId(), k -> Long.MIN_VALUE); + + long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.getOrDefault( + insertPlan.getDeviceId(), Long.MIN_VALUE); + tryToUpdateInsertLastCache(insertPlan, globalLatestFlushTime); - if (result && globalLatestFlushTime < insertPlan.getTime()) { - globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime()); - } // check memtable size and may asyncTryToFlush the work memtable if (tsFileProcessor.shouldFlush()) { @@ -676,7 +666,7 @@ private void insertToTsFileProcessor(InsertPlan insertPlan, boolean sequence) } public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) - throws QueryProcessException { + throws WriteProcessException { try { MNode node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(plan.getDeviceId()); @@ -687,8 +677,8 @@ public void tryToUpdateInsertLastCache(InsertPlan plan, Long latestFlushedTime) ((LeafMNode) measurementNode) .updateCachedLast(plan.composeTimeValuePair(i), true, latestFlushedTime); } - } catch (MetadataException e) { - throw new QueryProcessException(e); + } catch (MetadataException | QueryProcessException e) { + throw new WriteProcessException(e); } } @@ -1286,9 +1276,9 @@ private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) { .get(processor.getTimeRangeId()); if (curPartitionDeviceLatestTime == null) { - logger.error("Partition: " + processor.getTimeRangeId() + - " does't have latest time for each device record. Flushing tsfile is: " - + processor.getTsFileResource().getFile()); + logger.warn("Partition: {} does't have latest time for each device. " + + "No valid record is written into memtable. Flushing tsfile is: {}", + processor.getTimeRangeId(), processor.getTsFileResource().getFile()); return false; } @@ -1296,8 +1286,8 @@ private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) { partitionLatestFlushedTimeForEachDevice .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>()) .put(entry.getKey(), entry.getValue()); - if (!globalLatestFlushedTimeForEachDevice.containsKey(entry.getKey()) - || globalLatestFlushedTimeForEachDevice.get(entry.getKey()) < entry.getValue()) { + if (globalLatestFlushedTimeForEachDevice + .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) { globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue()); } } @@ -1762,14 +1752,11 @@ private void updateLatestTimeMap(TsFileResource newTsFileResource) { Map latestFlushTimeForPartition = partitionLatestFlushedTimeForEachDevice .getOrDefault(timePartitionId, new HashMap<>()); - if (!latestFlushTimeForPartition.containsKey(device) - || latestFlushTimeForPartition.get(device) < endTime) { + if (latestFlushTimeForPartition.getOrDefault(device, Long.MIN_VALUE) < endTime) { partitionLatestFlushedTimeForEachDevice - .computeIfAbsent(timePartitionId, id -> new HashMap<>()) - .put(device, endTime); + .computeIfAbsent(timePartitionId, id -> new HashMap<>()).put(device, endTime); } - if (!globalLatestFlushedTimeForEachDevice.containsKey(device) - || globalLatestFlushedTimeForEachDevice.get(device) < endTime) { + if (globalLatestFlushedTimeForEachDevice.getOrDefault(device, Long.MIN_VALUE) < endTime) { globalLatestFlushedTimeForEachDevice.put(device, endTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index a770882e3d2e..97768443b1f8 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -45,7 +45,6 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.UpdateEndTimeCallBack; import org.apache.iotdb.db.engine.version.VersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.qp.constant.DatetimeUtils; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; @@ -153,9 +152,8 @@ public TsFileProcessor(String storageGroupName, TsFileResource tsFileResource, S * insert data in an InsertPlan into the workingMemtable. * * @param insertPlan physical plan of insertion - * @return succeed or fail */ - public boolean insert(InsertPlan insertPlan) throws QueryProcessException { + public void insert(InsertPlan insertPlan) throws WriteProcessException { if (workMemTable == null) { workMemTable = MemTablePool.getInstance().getAvailableMemTable(this); @@ -167,10 +165,9 @@ public boolean insert(InsertPlan insertPlan) throws QueryProcessException { if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { try { getLogNode().write(insertPlan); - } catch (IOException e) { - logger.error("{}: {} write WAL failed", storageGroupName, - tsFileResource.getFile().getName(), e); - return false; + } catch (Exception e) { + throw new WriteProcessException(String.format("%s: %s write WAL failed, because %s", + storageGroupName, tsFileResource.getFile().getAbsolutePath(), e.getMessage())); } } @@ -181,8 +178,6 @@ public boolean insert(InsertPlan insertPlan) throws QueryProcessException { if (!sequence) { tsFileResource.updateEndTime(insertPlan.getDeviceId(), insertPlan.getTime()); } - - return true; } public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end, @@ -195,24 +190,20 @@ public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end, // insert insertPlan to the work memtable try { workMemTable.insertBatch(batchInsertPlan, start, end); - for (int i = start; i < end; i++) { - results[i] = RpcUtils.SUCCESS_STATUS; - } - } catch (Exception e) { - setErrorResult(start, end, results, e); - return; - } - - if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { - try { + if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) { batchInsertPlan.setStart(start); batchInsertPlan.setEnd(end); getLogNode().write(batchInsertPlan); - } catch (IOException e) { - logger.error("{}: {} write WAL failed", storageGroupName, - tsFileResource.getFile().getName(), e); - setErrorResult(start, end, results, e); } + } catch (Exception e) { + for (int i = start; i < end; i++) { + results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); + } + throw new WriteProcessException(e); + } + + for (int i = start; i < end; i++) { + results[i] = RpcUtils.SUCCESS_STATUS; } tsFileResource @@ -226,14 +217,6 @@ public void insertBatch(BatchInsertPlan batchInsertPlan, int start, int end, } } - private void setErrorResult(int start, int end, TSStatus[] results, Exception e) - throws WriteProcessException { - for (int i = start; i < end; i++) { - results[i] = RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage()); - } - throw new WriteProcessException(e); - } - /** * Delete data which belongs to the timeseries `deviceId.measurementId` and the timestamp of which * <= 'timestamp' in the deletion.
@@ -449,9 +432,11 @@ public void asyncFlush() { * flushManager again. */ private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException { - if(!updateLatestFlushTimeCallback.call(this)){ - logger.error("{}: {} Memetable info: {}", storageGroupName, - tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap()); + if(!tobeFlushed.isSignalMemTable() && + (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)){ + logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}", + storageGroupName, tsFileResource.getFile().getName(), tobeFlushed.getMemTableMap()); + return; } flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { diff --git a/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java b/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java index 90c16a0aeb03..5169d3cdb79c 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/IoTDBException.java @@ -30,6 +30,11 @@ public IoTDBException(String message, int errorCode) { this.errorCode = errorCode; } + public IoTDBException(String message, Throwable cause, int errorCode) { + super(message, cause); + this.errorCode = errorCode; + } + public IoTDBException(Throwable cause, int errorCode) { super(cause); this.errorCode = errorCode; diff --git a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java index bd03a29c9d78..5e5b6fa53d66 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/StorageEngineException.java @@ -32,6 +32,10 @@ public StorageEngineException(String message) { super(message, TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode()); } + public StorageEngineException(String message, Throwable cause) { + super(message, cause, TSStatusCode.STORAGE_ENGINE_ERROR.getStatusCode()); + } + public StorageEngineException(String message, int errorCode) { super(message, errorCode); } diff --git a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java index 9ae88efd8d9d..dde7735591ff 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/WriteProcessException.java @@ -29,6 +29,10 @@ public WriteProcessException(String message) { super(message, TSStatusCode.STORAGE_GROUP_ERROR.getStatusCode()); } + public WriteProcessException(String message, int errorCode) { + super(message, errorCode); + } + public WriteProcessException(Exception exception) { super(exception, TSStatusCode.STORAGE_GROUP_PROCESSOR_ERROR.getStatusCode()); } diff --git a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java index 6365b217957a..746fc690152c 100644 --- a/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java +++ b/server/src/main/java/org/apache/iotdb/db/exception/query/OutOfTTLException.java @@ -21,9 +21,10 @@ package org.apache.iotdb.db.exception.query; import java.util.Date; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.rpc.TSStatusCode; -public class OutOfTTLException extends QueryProcessException { +public class OutOfTTLException extends WriteProcessException { private static final long serialVersionUID = -1197147887094603300L; diff --git a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java index 70cd44d8b526..93723130513f 100644 --- a/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java +++ b/server/src/main/java/org/apache/iotdb/db/monitor/StatMonitor.java @@ -34,7 +34,6 @@ import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeManagerStatConstants; import org.apache.iotdb.db.monitor.MonitorConstants.FileNodeProcessorStatConstants; @@ -384,7 +383,7 @@ public void insert(Map tsRecordHashMap) { numInsert.incrementAndGet(); pointNum = entry.getValue().dataPointList.size(); numPointsInsert.addAndGet(pointNum); - } catch (StorageEngineException | QueryProcessException e) { + } catch (StorageEngineException e) { numInsertError.incrementAndGet(); logger.error("Inserting stat points error.", e); } diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java index 3d3b9770d45f..dd4362da8081 100644 --- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java +++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/LogReplayer.java @@ -28,7 +28,7 @@ import org.apache.iotdb.db.engine.modification.ModificationFile; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; import org.apache.iotdb.db.engine.version.VersionController; -import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; @@ -105,7 +105,7 @@ public void replayLogs() throws StorageGroupProcessorException { } } catch (IOException e) { throw new StorageGroupProcessorException("Cannot replay logs" + e.getMessage()); - } catch (QueryProcessException e) { + } catch (WriteProcessException e) { throw new StorageGroupProcessorException( "Cannot replay logs for query processor exception" + e.getMessage()); } finally { @@ -129,7 +129,7 @@ private void replayDelete(DeletePlan deletePlan) throws IOException { } } - private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProcessException { + private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws WriteProcessException { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file Long lastEndTime = currentTsFileResource.getEndTimeMap().get(batchInsertPlan.getDeviceId()); @@ -150,7 +150,7 @@ private void replayBatchInsert(BatchInsertPlan batchInsertPlan) throws QueryProc recoverMemTable.insertBatch(batchInsertPlan, 0, batchInsertPlan.getRowCount()); } - private void replayInsert(InsertPlan insertPlan) throws QueryProcessException { + private void replayInsert(InsertPlan insertPlan) { if (currentTsFileResource != null) { // the last chunk group may contain the same data with the logs, ignore such logs in seq file Long lastEndTime = currentTsFileResource.getEndTimeMap().get(insertPlan.getDeviceId()); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java index a46be2a7c824..1c629d54d94d 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java @@ -30,7 +30,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; -import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; @@ -82,7 +82,7 @@ public void tearDown() throws Exception { EnvironmentUtils.cleanDir(systemDir); } - private void insertOneRecord(long time, int num) throws QueryProcessException { + private void insertOneRecord(long time, int num) throws WriteProcessException { TSRecord record = new TSRecord(time, storageGroup); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId0, String.valueOf(num))); record.addTuple(DataPoint.getDataPoint(TSDataType.INT64, measurementId1, String.valueOf(num))); @@ -92,7 +92,7 @@ private void insertOneRecord(long time, int num) throws QueryProcessException { storageGroupProcessor.insert(new InsertPlan(record)); } - protected void insertData() throws IOException, QueryProcessException { + protected void insertData() throws IOException, WriteProcessException { for (int j = 1; j <= 100; j++) { insertOneRecord(j, j); } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java index 7a773a78a556..99199ac01b8d 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/FileNodeManagerBenchmark.java @@ -24,7 +24,6 @@ import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.StorageEngineException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -117,7 +116,7 @@ public void run() { TSRecord tsRecord = getRecord(deltaObject, time); StorageEngine.getInstance().insert(new InsertPlan(tsRecord)); } - } catch (QueryProcessException | StorageEngineException e) { + } catch (StorageEngineException e) { e.printStackTrace(); } finally { latch.countDown(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java index 0abeb561d1a3..cfb777c3d8d6 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java @@ -26,7 +26,6 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.StorageGroupProcessorException; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan; @@ -85,7 +84,7 @@ public void tearDown() throws Exception { @Test - public void testUnseqUnsealedDelete() throws QueryProcessException, IOException { + public void testUnseqUnsealedDelete() throws WriteProcessException, IOException { TSRecord record = new TSRecord(10000, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000))); processor.insert(new InsertPlan(record)); @@ -132,7 +131,7 @@ record = new TSRecord(j, deviceId); } @Test - public void testSequenceSyncClose() throws QueryProcessException { + public void testSequenceSyncClose() throws WriteProcessException { for (int j = 1; j <= 10; j++) { TSRecord record = new TSRecord(j, deviceId); record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); @@ -208,7 +207,7 @@ public void testIoTDBRowBatchWriteAndSyncClose() throws WriteProcessException { @Test - public void testSeqAndUnSeqSyncClose() throws QueryProcessException { + public void testSeqAndUnSeqSyncClose() throws WriteProcessException { for (int j = 21; j <= 30; j++) { TSRecord record = new TSRecord(j, deviceId); @@ -240,7 +239,7 @@ public void testSeqAndUnSeqSyncClose() throws QueryProcessException { } @Test - public void testMerge() throws QueryProcessException { + public void testMerge() throws WriteProcessException { mergeLock = new AtomicLong(0); for (int j = 21; j <= 30; j++) { diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 24ad39fbd369..0675ab4328a3 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -36,6 +36,7 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -121,7 +122,7 @@ public void testSetMetaTTL() throws IOException, MetadataException { } @Test - public void testTTLWrite() throws QueryProcessException { + public void testTTLWrite() throws WriteProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -146,7 +147,7 @@ public void testTTLWrite() throws QueryProcessException { storageGroupProcessor.insert(insertPlan); } - private void prepareData() throws QueryProcessException { + private void prepareData() throws WriteProcessException { InsertPlan insertPlan = new InsertPlan(); insertPlan.setDeviceId(sg1); insertPlan.setTime(System.currentTimeMillis()); @@ -174,7 +175,7 @@ private void prepareData() throws QueryProcessException { } @Test - public void testTTLRead() throws IOException, QueryProcessException, StorageEngineException { + public void testTTLRead() throws IOException, WriteProcessException, StorageEngineException { prepareData(); // files before ttl @@ -222,7 +223,7 @@ public void testTTLRead() throws IOException, QueryProcessException, StorageEngi } @Test - public void testTTLRemoval() throws StorageEngineException, QueryProcessException { + public void testTTLRemoval() throws StorageEngineException, WriteProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); @@ -333,7 +334,7 @@ public void testShowTTL() } @Test - public void testTTLCleanFile() throws QueryProcessException { + public void testTTLCleanFile() throws WriteProcessException { prepareData(); storageGroupProcessor.syncCloseAllWorkingTsFileProcessors(); diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index 1c926d9ffc06..34559eb6a1ac 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -34,8 +34,8 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk; import org.apache.iotdb.db.engine.version.SysTimeVersionController; import org.apache.iotdb.db.exception.TsFileProcessorException; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.utils.EnvironmentUtils; @@ -84,7 +84,7 @@ public void tearDown() throws Exception { } @Test - public void testWriteAndFlush() throws IOException, QueryProcessException, MetadataException { + public void testWriteAndFlush() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndFlush begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -133,7 +133,7 @@ public void testWriteAndFlush() throws IOException, QueryProcessException, Metad @Test public void testWriteAndRestoreMetadata() - throws IOException, QueryProcessException, MetadataException { + throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -204,7 +204,7 @@ public void testWriteAndRestoreMetadata() @Test - public void testMultiFlush() throws IOException, QueryProcessException, MetadataException { + public void testMultiFlush() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, this::closeTsFileProcessor, @@ -239,7 +239,7 @@ public void testMultiFlush() throws IOException, QueryProcessException, Metadata @Test - public void testWriteAndClose() throws IOException, QueryProcessException, MetadataException { + public void testWriteAndClose() throws IOException, WriteProcessException, MetadataException { logger.info("testWriteAndRestoreMetadata begin.."); processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), SchemaUtils.constructSchema(deviceId), SysTimeVersionController.INSTANCE, diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java index 4fd5f133c0e5..5ac6598ed12e 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java @@ -104,7 +104,7 @@ public void testTTL() throws SQLException { boolean caught = false; try { statement.execute(String.format("INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", - now - 50000 + i, i)); + now - 500000 + i, i)); } catch (SQLException e) { if (TSStatusCode.OUT_OF_TTL_ERROR.getStatusCode() == e.getErrorCode()) { caught = true; diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java index 44245d722b5b..dbc7ac3b9148 100644 --- a/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java +++ b/server/src/test/java/org/apache/iotdb/db/query/reader/ReaderTestHelper.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.engine.MetadataManagerHelper; import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy; import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; +import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.qp.physical.crud.InsertPlan; @@ -68,10 +69,4 @@ public void tearDown() throws Exception { abstract protected void insertData() throws IOException, QueryProcessException; - protected void insertOneRecord(long time, int num) throws QueryProcessException { - TSRecord record = new TSRecord(time, deviceId); - record.addTuple(DataPoint.getDataPoint(dataType, measurementId, String.valueOf(num))); - storageGroupProcessor.insert(new InsertPlan(record)); - } - } \ No newline at end of file