From ab108f404f377d9b7eaf9dc0d588df2b76e5a028 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:17:47 +0800 Subject: [PATCH 01/13] Update IoTDBAirGapReceiverAgent.java --- .../receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java index 3097b0fcac85d..e14310eaa4a70 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/airgap/IoTDBAirGapReceiverAgent.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -79,7 +80,9 @@ public void start() throws StartupException { @Override public void stop() { try { - serverSocket.close(); + if (Objects.nonNull(serverSocket)) { + serverSocket.close(); + } } catch (final IOException e) { LOGGER.warn("Failed to close IoTDBAirGapReceiverAgent's server socket", e); } From 422bd686f928726e8003fc1657f35461d8da3229 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:14:56 +0800 Subject: [PATCH 02/13] Refactor --- .../resource/memory/PipeMemoryManager.java | 6 ++-- .../plan/analyze/AnalyzeVisitor.java | 9 +++--- .../load/LoadTsFileToTableModelAnalyzer.java | 16 ++++++---- .../plan/planner/LogicalPlanVisitor.java | 3 +- .../statement/crud/LoadTsFileStatement.java | 30 +++++++++---------- 5 files changed, 35 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 2bda0e32a10cf..1ccf7688a1ec9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -89,7 +89,7 @@ public synchronized PipeMemoryBlock forceAllocate(long sizeInBytes) return forceAllocate(sizeInBytes, PipeMemoryBlockType.NORMAL); } - public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBytes) + public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(final long tabletSizeInBytes) throws PipeRuntimeOutOfMemoryCriticalException { if (!PIPE_MEMORY_MANAGEMENT_ENABLED) { // No need to calculate the tablet size, skip it to save time @@ -104,7 +104,7 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy try { Thread.sleep(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS); - } catch (InterruptedException ex) { + } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); LOGGER.warn("forceAllocateWithRetry: interrupted while waiting for available memory", ex); } @@ -127,7 +127,7 @@ public PipeTabletMemoryBlock forceAllocateForTabletWithRetry(long tabletSizeInBy } } - public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(long tsFileSizeInBytes) + public PipeTsFileMemoryBlock forceAllocateForTsFileWithRetry(final long tsFileSizeInBytes) throws PipeRuntimeOutOfMemoryCriticalException { if (!PIPE_MEMORY_MANAGEMENT_ENABLED) { return new PipeTsFileMemoryBlock(0); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 3f787f0a3f288..124066ed62964 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -3006,8 +3006,8 @@ public Analysis visitInsertRowsOfOneDevice( @Override public Analysis visitPipeEnrichedStatement( - PipeEnrichedStatement pipeEnrichedStatement, MPPQueryContext context) { - Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, context); + final PipeEnrichedStatement pipeEnrichedStatement, final MPPQueryContext context) { + final Analysis analysis = pipeEnrichedStatement.getInnerStatement().accept(this, context); analysis.setDatabaseName(context.getDatabaseName().orElse(null)); // statement may be changed because of logical view @@ -3017,7 +3017,8 @@ public Analysis visitPipeEnrichedStatement( } @Override - public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { + public Analysis visitLoadFile( + final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) { context.setQueryType(QueryType.WRITE); final long startTime = System.nanoTime(); @@ -3041,7 +3042,7 @@ public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryC } private LoadTsFileAnalyzer getAnalyzer( - LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { + final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) { if (Objects.equals(loadTsFileStatement.getModel(), LoadTsFileConfigurator.MODEL_TREE_VALUE)) { // Load to tree-model return new LoadTsFileToTreeModelAnalyzer(loadTsFileStatement, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java index 33c1f73e6fc53..6a16f2e49fecf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java @@ -72,21 +72,25 @@ public class LoadTsFileToTableModelAnalyzer extends LoadTsFileAnalyzer { private final LoadTsFileTableSchemaCache schemaCache; public LoadTsFileToTableModelAnalyzer( - LoadTsFileStatement loadTsFileStatement, Metadata metadata, MPPQueryContext context) { + final LoadTsFileStatement loadTsFileStatement, + final Metadata metadata, + final MPPQueryContext context) { super(loadTsFileStatement, context); this.metadata = metadata; this.schemaCache = new LoadTsFileTableSchemaCache(metadata, context); } public LoadTsFileToTableModelAnalyzer( - LoadTsFile loadTsFileTableStatement, Metadata metadata, MPPQueryContext context) { + final LoadTsFile loadTsFileTableStatement, + final Metadata metadata, + final MPPQueryContext context) { super(loadTsFileTableStatement, context); this.metadata = metadata; this.schemaCache = new LoadTsFileTableSchemaCache(metadata, context); } @Override - public IAnalysis analyzeFileByFile(IAnalysis analysis) { + public IAnalysis analyzeFileByFile(final IAnalysis analysis) { checkBeforeAnalyzeFileByFile(analysis); if (analysis.isFinishQueryAfterAnalyze()) { return analysis; @@ -133,7 +137,7 @@ protected void analyzeSingleTsFile(final File tsFile) } // check whether the encrypt type of the tsfile is supported - EncryptParameter param = reader.getEncryptParam(); + final EncryptParameter param = reader.getEncryptParam(); if (!Objects.equals(param.getType(), EncryptUtils.encryptParam.getType()) || !Arrays.equals(param.getKey(), EncryptUtils.encryptParam.getKey())) { throw new SemanticException("The encryption way of the TsFile is not supported."); @@ -145,7 +149,7 @@ protected void analyzeSingleTsFile(final File tsFile) schemaCache.setDatabase(database); schemaCache.setCurrentModificationsAndTimeIndex(tsFileResource, reader); - for (Map.Entry name2Schema : + for (final Map.Entry name2Schema : reader.readFileMetadata().getTableSchemaMap().entrySet()) { final TableSchema fileSchema = TableSchema.fromTsFileTableSchema(name2Schema.getKey(), name2Schema.getValue()); @@ -158,7 +162,7 @@ protected void analyzeSingleTsFile(final File tsFile) final Map> device2TimeseriesMetadata = timeseriesMetadataIterator.next(); - for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) { + for (final IDeviceID deviceId : device2TimeseriesMetadata.keySet()) { schemaCache.autoCreateAndVerify(deviceId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index f4de4085c2927..8fe9a2fae865a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -513,7 +513,8 @@ public PlanNode visitPipeEnrichedStatement( } @Override - public PlanNode visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { + public PlanNode visitLoadFile( + final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) { final List isTableModel = new ArrayList<>(); for (int i = 0; i < loadTsFileStatement.getResources().size(); i++) { isTableModel.add( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index a8c7a011710dd..c18791f1d2d30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -123,11 +123,11 @@ private static List findAllTsFile(File file) { return tsFiles; } - private static void sortTsFiles(List files) { + private static void sortTsFiles(final List files) { files.sort( (o1, o2) -> { - String file1Name = o1.getName(); - String file2Name = o2.getName(); + final String file1Name = o1.getName(); + final String file2Name = o2.getName(); try { return TsFileResource.checkAndCompareFileName(file1Name, file2Name); } catch (IOException e) { @@ -136,27 +136,27 @@ private static void sortTsFiles(List files) { }); } - public void setDeleteAfterLoad(boolean deleteAfterLoad) { + public void setDeleteAfterLoad(final boolean deleteAfterLoad) { this.deleteAfterLoad = deleteAfterLoad; } - public void setDatabaseLevel(int databaseLevel) { + public void setDatabaseLevel(final int databaseLevel) { this.databaseLevel = databaseLevel; } - public void setDatabase(String database) { + public void setDatabase(final String database) { this.database = database; } - public void setModel(String model) { + public void setModel(final String model) { this.model = model; } - public void setVerifySchema(boolean verifySchema) { + public void setVerifySchema(final boolean verifySchema) { this.verifySchema = verifySchema; } - public void setAutoCreateDatabase(boolean autoCreateDatabase) { + public void setAutoCreateDatabase(final boolean autoCreateDatabase) { this.autoCreateDatabase = autoCreateDatabase; } @@ -188,7 +188,7 @@ public List getTsFiles() { return tsFiles; } - public void addTsFileResource(TsFileResource resource) { + public void addTsFileResource(final TsFileResource resource) { resources.add(resource); } @@ -196,11 +196,11 @@ public List getResources() { return resources; } - public void addWritePointCount(long writePointCount) { + public void addWritePointCount(final long writePointCount) { writePointCountList.add(writePointCount); } - public long getWritePointCount(int resourceIndex) { + public long getWritePointCount(final int resourceIndex) { return writePointCountList.get(resourceIndex); } @@ -224,7 +224,7 @@ public List getPaths() { } @Override - public TSStatus checkPermissionBeforeProcess(String userName) { + public TSStatus checkPermissionBeforeProcess(final String userName) { // no need to check here, it will be checked in process phase return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @@ -232,7 +232,7 @@ public TSStatus checkPermissionBeforeProcess(String userName) { @TableModel @Override public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelationalStatement( - MPPQueryContext context) { + final MPPQueryContext context) { loadAttributes = new HashMap<>(); loadAttributes.put(DATABASE_LEVEL_KEY, String.valueOf(databaseLevel)); if (database != null) { @@ -246,7 +246,7 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat } @Override - public R accept(StatementVisitor visitor, C context) { + public R accept(final StatementVisitor visitor, C context) { return visitor.visitLoadFile(this, context); } From b3a336a6395f07a1dc30d63652211a4c455a9ac5 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:18:10 +0800 Subject: [PATCH 03/13] Update LoadTsFileNode.java --- .../plan/node/load/LoadTsFileNode.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index 3588b6ddbb052..edc2e53b1375e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -48,7 +48,10 @@ public class LoadTsFileNode extends WritePlanNode { private final String database; public LoadTsFileNode( - PlanNodeId id, List resources, List isTableModel, String database) { + final PlanNodeId id, + final List resources, + final List isTableModel, + final String database) { super(id); this.resources = resources; this.isTableModel = isTableModel; @@ -66,7 +69,7 @@ public List getChildren() { } @Override - public void addChild(PlanNode child) { + public void addChild(final PlanNode child) { // Do nothing } @@ -86,17 +89,17 @@ public List getOutputColumnNames() { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { + protected void serializeAttributes(final ByteBuffer byteBuffer) { // Do nothing } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { + protected void serializeAttributes(final DataOutputStream stream) throws IOException { // Do nothing } @Override - public List splitByPartition(IAnalysis analysis) { + public List splitByPartition(final IAnalysis analysis) { if (analysis instanceof Analysis) { return splitByPartitionForTreeModel((Analysis) analysis); } else { @@ -105,9 +108,9 @@ public List splitByPartition(IAnalysis analysis) { } } - private List splitByPartitionForTreeModel(Analysis analysis) { - List res = new ArrayList<>(); - LoadTsFileStatement statement = + private List splitByPartitionForTreeModel(final Analysis analysis) { + final List res = new ArrayList<>(); + final LoadTsFileStatement statement = analysis.getTreeStatement() instanceof PipeEnrichedStatement ? (LoadTsFileStatement) ((PipeEnrichedStatement) analysis.getTreeStatement()).getInnerStatement() @@ -127,9 +130,9 @@ private List splitByPartitionForTreeModel(Analysis analysis) { } private List splitByPartitionForTableModel( - org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis analysis) { - List res = new ArrayList<>(); - LoadTsFile statement = + final org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis analysis) { + final List res = new ArrayList<>(); + final LoadTsFile statement = (analysis.getStatement() instanceof PipeEnriched) ? (LoadTsFile) ((PipeEnriched) analysis.getStatement()).getInnerStatement() : (LoadTsFile) analysis.getStatement(); @@ -150,14 +153,14 @@ private List splitByPartitionForTableModel( } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o; + final LoadTsFileNode loadTsFileNode = (LoadTsFileNode) o; return Objects.equals(resources, loadTsFileNode.resources) && Objects.equals(database, loadTsFileNode.database) && Objects.equals(isTableModel, loadTsFileNode.isTableModel); From 76104fa8471fb2939f9b3ee413c609b23733b780 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:32:02 +0800 Subject: [PATCH 04/13] refactor --- .../plan/node/load/LoadSingleTsFileNode.java | 26 +++++----- .../plan/node/load/LoadTsFilePieceNode.java | 40 +++++++-------- .../scheduler/load/LoadTsFileScheduler.java | 23 ++++----- .../load/splitter/DeletionData.java | 10 ++-- .../load/splitter/TsFileSplitter.java | 49 ++++++++++--------- 5 files changed, 74 insertions(+), 74 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 9a6314fbf2939..a7317b8d7d40b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -66,12 +66,12 @@ public class LoadSingleTsFileNode extends WritePlanNode { private TRegionReplicaSet localRegionReplicaSet; public LoadSingleTsFileNode( - PlanNodeId id, - TsFileResource resource, - boolean isTableModel, - String database, - boolean deleteAfterLoad, - long writePointCount) { + final PlanNodeId id, + final TsFileResource resource, + final boolean isTableModel, + final String database, + final boolean deleteAfterLoad, + final long writePointCount) { super(id); this.tsFile = resource.getTsFile(); this.resource = resource; @@ -86,9 +86,9 @@ public boolean isTsFileEmpty() { } public boolean needDecodeTsFile( - Function>, List> partitionFetcher) - throws IOException { - List> slotList = new ArrayList<>(); + final Function>, List> + partitionFetcher) { + final List> slotList = new ArrayList<>(); resource .getDevices() .forEach( @@ -112,13 +112,13 @@ public boolean needDecodeTsFile( return needDecodeTsFile; } - private boolean isDispatchedToLocal(Set replicaSets) { + private boolean isDispatchedToLocal(final Set replicaSets) { if (replicaSets.size() > 1) { return false; } - for (TRegionReplicaSet replicaSet : replicaSets) { - List dataNodeLocationList = replicaSet.getDataNodeLocations(); + for (final TRegionReplicaSet replicaSet : replicaSets) { + final List dataNodeLocationList = replicaSet.getDataNodeLocations(); if (dataNodeLocationList.size() > 1) { return false; } @@ -131,7 +131,7 @@ private boolean isDispatchedToLocal(Set replicaSets) { return true; } - private boolean isDispatchedToLocal(TEndPoint endPoint) { + private boolean isDispatchedToLocal(final TEndPoint endPoint) { return IoTDBDescriptor.getInstance().getConfig().getInternalAddress().equals(endPoint.getIp()) && IoTDBDescriptor.getInstance().getConfig().getInternalPort() == endPoint.port; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java index 5d4f02378fc24..bdf98397d6f0d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFilePieceNode.java @@ -58,7 +58,7 @@ public LoadTsFilePieceNode(PlanNodeId id) { super(id); } - public LoadTsFilePieceNode(PlanNodeId id, File tsFile) { + public LoadTsFilePieceNode(final PlanNodeId id, final File tsFile) { super(id); this.tsFile = tsFile; this.dataSize = 0; @@ -69,7 +69,7 @@ public long getDataSize() { return dataSize; } - public void addTsFileData(TsFileData tsFileData) { + public void addTsFileData(final TsFileData tsFileData) { tsFileDataList.add(tsFileData); dataSize += tsFileData.getDataSize(); } @@ -93,7 +93,7 @@ public List getChildren() { } @Override - public void addChild(PlanNode child) { + public void addChild(final PlanNode child) { // Do nothing } @@ -118,26 +118,26 @@ public List getOutputColumnNames() { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { + protected void serializeAttributes(final ByteBuffer byteBuffer) { try { - ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); - DataOutputStream stream = new DataOutputStream(byteOutputStream); + final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); + final DataOutputStream stream = new DataOutputStream(byteOutputStream); serializeAttributes(stream); byteBuffer.put(byteOutputStream.toByteArray()); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.error("Serialize to ByteBuffer error.", e); } } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { + protected void serializeAttributes(final DataOutputStream stream) throws IOException { PlanNodeType.LOAD_TSFILE.serialize(stream); ReadWriteIOUtils.write(tsFile.getPath(), stream); // TODO: can save this space ReadWriteIOUtils.write(tsFileDataList.size(), stream); - for (TsFileData tsFileData : tsFileDataList) { + for (final TsFileData tsFileData : tsFileDataList) { try { tsFileData.serialize(stream); - } catch (IOException e) { + } catch (final IOException e) { LOGGER.error( String.format( "Serialize data of TsFile %s error, skip TsFileData %s", @@ -147,38 +147,38 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { } @Override - public List splitByPartition(IAnalysis analysis) { + public List splitByPartition(final IAnalysis analysis) { throw new NotImplementedException("split load piece TsFile is not implemented"); } - public static PlanNode deserialize(ByteBuffer buffer) { - InputStream stream = new ByteArrayInputStream(buffer.array()); + public static PlanNode deserialize(final ByteBuffer buffer) { + final InputStream stream = new ByteArrayInputStream(buffer.array()); try { ReadWriteIOUtils.readShort(stream); // read PlanNodeType - File tsFile = new File(ReadWriteIOUtils.readString(stream)); - LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile); - int tsFileDataSize = ReadWriteIOUtils.readInt(stream); + final File tsFile = new File(ReadWriteIOUtils.readString(stream)); + final LoadTsFilePieceNode pieceNode = new LoadTsFilePieceNode(new PlanNodeId(""), tsFile); + final int tsFileDataSize = ReadWriteIOUtils.readInt(stream); for (int i = 0; i < tsFileDataSize; i++) { - TsFileData tsFileData = TsFileData.deserialize(stream); + final TsFileData tsFileData = TsFileData.deserialize(stream); pieceNode.addTsFileData(tsFileData); } pieceNode.setPlanNodeId(PlanNodeId.deserialize(stream)); return pieceNode; - } catch (IOException | PageException | IllegalPathException e) { + } catch (final IOException | PageException | IllegalPathException e) { LOGGER.error("Deserialize {} error.", LoadTsFilePieceNode.class.getName(), e); return null; } } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - LoadTsFilePieceNode loadTsFilePieceNode = (LoadTsFilePieceNode) o; + final LoadTsFilePieceNode loadTsFilePieceNode = (LoadTsFilePieceNode) o; return Objects.equals(tsFile, loadTsFilePieceNode.tsFile) && Objects.equals(dataSize, loadTsFilePieceNode.dataSize) && Objects.equals(tsFileDataList, loadTsFilePieceNode.tsFileDataList); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index c112eb3449059..4fb261e15cb2b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -134,12 +134,13 @@ public class LoadTsFileScheduler implements IScheduler { private final LoadTsFileDataCacheMemoryBlock block; public LoadTsFileScheduler( - DistributedQueryPlan distributedQueryPlan, - MPPQueryContext queryContext, - QueryStateMachine stateMachine, - IClientManager internalServiceClientManager, - IPartitionFetcher partitionFetcher, - boolean isGeneratedByPipe) { + final DistributedQueryPlan distributedQueryPlan, + final MPPQueryContext queryContext, + final QueryStateMachine stateMachine, + final IClientManager + internalServiceClientManager, + final IPartitionFetcher partitionFetcher, + final boolean isGeneratedByPipe) { this.queryContext = queryContext; this.stateMachine = stateMachine; this.tsFileNodeList = new ArrayList<>(); @@ -150,7 +151,7 @@ public LoadTsFileScheduler( this.isGeneratedByPipe = isGeneratedByPipe; this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock(); - for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) { + for (final FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) { tsFileNodeList.add((LoadSingleTsFileNode) fragmentInstance.getFragment().getPlanNodeTree()); } } @@ -199,7 +200,7 @@ public void start() { LoadTsFileCostMetricsSet.LOAD_LOCALLY, System.nanoTime() - startTime); } } else { // need decode, load locally or remotely, use two phases method - String uuid = UUID.randomUUID().toString(); + final String uuid = UUID.randomUUID().toString(); dispatcher.setUuid(uuid); allReplicaSets.clear(); @@ -242,7 +243,7 @@ public void start() { i + 1, tsFileNodeListSize); } - } catch (Exception e) { + } catch (final Exception e) { isLoadSuccess = false; stateMachine.transitionToFailed(e); LOGGER.warn("LoadTsFileScheduler loads TsFile {} error", filePath, e); @@ -262,7 +263,7 @@ public void start() { } } - private boolean firstPhase(LoadSingleTsFileNode node) { + private boolean firstPhase(final LoadSingleTsFileNode node) { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { new TsFileSplitter( @@ -280,7 +281,7 @@ private boolean firstPhase(LoadSingleTsFileNode node) { node.getTsFileResource().getTsFile()), e); return false; - } catch (Exception e) { + } catch (final Exception e) { stateMachine.transitionToFailed(e); LOGGER.warn( String.format("Parse or send TsFile %s error.", node.getTsFileResource().getTsFile()), e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java index 0695c7a84def9..1435a7d63b618 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/DeletionData.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.load.splitter; -import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; @@ -33,7 +32,7 @@ public class DeletionData implements TsFileData { private final ModEntry deletion; - public DeletionData(ModEntry deletion) { + public DeletionData(final ModEntry deletion) { this.deletion = deletion; } @@ -42,7 +41,7 @@ public long getDataSize() { return deletion.serializedSize(); } - public void writeToModificationFile(ModificationFile modificationFile) throws IOException { + public void writeToModificationFile(final ModificationFile modificationFile) throws IOException { modificationFile.write(deletion); } @@ -52,13 +51,12 @@ public TsFileDataType getType() { } @Override - public void serialize(DataOutputStream stream) throws IOException { + public void serialize(final DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(getType().ordinal(), stream); deletion.serialize(stream); } - public static DeletionData deserialize(InputStream stream) - throws IllegalPathException, IOException { + public static DeletionData deserialize(final InputStream stream) throws IOException { return new DeletionData(ModEntry.createFrom(new DataInputStream(stream))); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index 5c86082ce8c9f..b3e3c235db571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -82,14 +82,14 @@ public class TsFileSplitter { private List> pageIndex2TimesList = null; private List isTimeChunkNeedDecodeList = new ArrayList<>(); - public TsFileSplitter(File tsFile, Function consumer) { + public TsFileSplitter(final File tsFile, final Function consumer) { this.tsFile = tsFile; this.consumer = consumer; } @SuppressWarnings({"squid:S3776", "squid:S6541"}) public void splitTsFileByDataPartition() throws IOException, IllegalStateException { - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { getAllModification(deletions); if (!checkMagic(reader)) { @@ -402,40 +402,40 @@ private boolean checkMagic(TsFileSequenceReader reader) throws IOException { } private void getChunkMetadata( - TsFileSequenceReader reader, Map offset2ChunkMetadata) + final TsFileSequenceReader reader, final Map offset2ChunkMetadata) throws IOException { - Map> device2Metadata = + final Map> device2Metadata = reader.getAllTimeseriesMetadata(true); - for (Map.Entry> entry : device2Metadata.entrySet()) { - for (TimeseriesMetadata timeseriesMetadata : entry.getValue()) { - for (IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { + for (final Map.Entry> entry : device2Metadata.entrySet()) { + for (final TimeseriesMetadata timeseriesMetadata : entry.getValue()) { + for (final IChunkMetadata chunkMetadata : timeseriesMetadata.getChunkMetadataList()) { offset2ChunkMetadata.put(chunkMetadata.getOffsetOfChunkHeader(), chunkMetadata); } } } } - private void handleModification(List deletions) { + private void handleModification(final List deletions) { deletions.forEach(o -> consumer.apply(new DeletionData(o))); } private void consumeAllAlignedChunkData( - long offset, Map> pageIndex2ChunkData) { + final long offset, final Map> pageIndex2ChunkData) { if (pageIndex2ChunkData.isEmpty()) { return; } - Map chunkDataMap = new HashMap<>(); - for (Map.Entry> entry : pageIndex2ChunkData.entrySet()) { - List alignedChunkDataList = entry.getValue(); + final Map chunkDataMap = new HashMap<>(); + for (final Map.Entry> entry : pageIndex2ChunkData.entrySet()) { + final List alignedChunkDataList = entry.getValue(); for (int i = 0; i < alignedChunkDataList.size(); i++) { - AlignedChunkData oldChunkData = alignedChunkDataList.get(i); - BatchedAlignedValueChunkData chunkData = + final AlignedChunkData oldChunkData = alignedChunkDataList.get(i); + final BatchedAlignedValueChunkData chunkData = chunkDataMap.computeIfAbsent(oldChunkData, BatchedAlignedValueChunkData::new); alignedChunkDataList.set(i, chunkData); } } - for (AlignedChunkData chunkData : chunkDataMap.keySet()) { + for (final AlignedChunkData chunkData : chunkDataMap.keySet()) { if (Boolean.FALSE.equals(consumer.apply(chunkData))) { throw new IllegalStateException( String.format( @@ -446,7 +446,8 @@ private void consumeAllAlignedChunkData( this.pageIndex2ChunkData = new HashMap<>(); } - private void consumeChunkData(String measurement, long offset, ChunkData chunkData) { + private void consumeChunkData( + final String measurement, final long offset, final ChunkData chunkData) { if (Boolean.FALSE.equals(consumer.apply(chunkData))) { throw new IllegalStateException( String.format( @@ -455,12 +456,12 @@ private void consumeChunkData(String measurement, long offset, ChunkData chunkDa } } - private boolean needDecodeChunk(IChunkMetadata chunkMetadata) { + private boolean needDecodeChunk(final IChunkMetadata chunkMetadata) { return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()) .equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime())); } - private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetadata) { + private boolean needDecodePage(final PageHeader pageHeader, final IChunkMetadata chunkMetadata) { if (pageHeader.getStatistics() == null) { return !TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getStartTime()) .equals(TimePartitionUtils.getTimePartitionSlot(chunkMetadata.getEndTime())); @@ -470,12 +471,12 @@ private boolean needDecodePage(PageHeader pageHeader, IChunkMetadata chunkMetada } private Pair decodePage( - boolean isAligned, - ByteBuffer pageData, - PageHeader pageHeader, - Decoder timeDecoder, - Decoder valueDecoder, - ChunkHeader chunkHeader) + final boolean isAligned, + final ByteBuffer pageData, + final PageHeader pageHeader, + final Decoder timeDecoder, + final Decoder valueDecoder, + final ChunkHeader chunkHeader) throws IOException { if (isAligned) { TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, timeDecoder); From 934d8e8be628a9630aab168b1e43429678a88222 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:00:57 +0800 Subject: [PATCH 05/13] init key value --- .../plan/relational/sql/ast/LoadTsFile.java | 29 ++++++++++++------- .../statement/crud/LoadTsFileStatement.java | 8 ++++- .../load/config/LoadTsFileConfigurator.java | 19 +++++++++++- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 71ca9359e6f69..264024a81a181 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -43,6 +43,7 @@ public class LoadTsFile extends Statement { private String database; // For loading to table-model only private boolean deleteAfterLoad; private boolean autoCreateDatabase; + private boolean loadWithMods; private String model = LoadTsFileConfigurator.MODEL_TABLE_VALUE; private final Map loadAttributes; @@ -51,7 +52,10 @@ public class LoadTsFile extends Statement { private final List resources; private final List writePointCountList; - public LoadTsFile(NodeLocation location, String filePath, Map loadAttributes) { + public LoadTsFile( + final NodeLocation location, + final String filePath, + final Map loadAttributes) { super(location); this.filePath = requireNonNull(filePath, "filePath is null"); @@ -59,6 +63,7 @@ public LoadTsFile(NodeLocation location, String filePath, Map lo this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.deleteAfterLoad = false; this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); + this.loadWithMods = true; this.resources = new ArrayList<>(); this.writePointCountList = new ArrayList<>(); this.loadAttributes = loadAttributes; @@ -68,7 +73,7 @@ public LoadTsFile(NodeLocation location, String filePath, Map lo this.tsFiles = org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement.processTsFile( file); - } catch (FileNotFoundException e) { + } catch (final FileNotFoundException e) { throw new SemanticException(e); } } @@ -81,7 +86,7 @@ public Map getLoadAttributes() { return loadAttributes; } - public void setAutoCreateDatabase(boolean autoCreateDatabase) { + public void setAutoCreateDatabase(final boolean autoCreateDatabase) { this.autoCreateDatabase = autoCreateDatabase; } @@ -93,6 +98,10 @@ public boolean isAutoCreateDatabase() { return autoCreateDatabase; } + public boolean isLoadWithMods() { + return loadWithMods; + } + public int getDatabaseLevel() { return databaseLevel; } @@ -101,7 +110,7 @@ public String getDatabase() { return database; } - public void setDatabase(String database) { + public void setDatabase(final String database) { this.database = database; } @@ -113,7 +122,7 @@ public List getTsFiles() { return tsFiles; } - public void addTsFileResource(TsFileResource resource) { + public void addTsFileResource(final TsFileResource resource) { resources.add(resource); } @@ -121,11 +130,11 @@ public List getResources() { return resources; } - public void addWritePointCount(long writePointCount) { + public void addWritePointCount(final long writePointCount) { writePointCountList.add(writePointCount); } - public long getWritePointCount(int resourceIndex) { + public long getWritePointCount(final int resourceIndex) { return writePointCountList.get(resourceIndex); } @@ -139,7 +148,7 @@ private void initAttributes() { } @Override - public R accept(AstVisitor visitor, C context) { + public R accept(final AstVisitor visitor, final C context) { return visitor.visitLoadTsFile(this, context); } @@ -154,14 +163,14 @@ public int hashCode() { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) { return true; } if ((obj == null) || (getClass() != obj.getClass())) { return false; } - LoadTsFile other = (LoadTsFile) obj; + final LoadTsFile other = (LoadTsFile) obj; return Objects.equals(filePath, other.filePath) && Objects.equals(loadAttributes, other.loadAttributes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index c18791f1d2d30..2906a80dd04a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -56,6 +56,7 @@ public class LoadTsFileStatement extends Statement { private boolean verifySchema; private boolean deleteAfterLoad; private boolean autoCreateDatabase; + private boolean loadWithMods; private String model = LoadTsFileConfigurator.MODEL_TREE_VALUE; private Map loadAttributes; @@ -64,12 +65,13 @@ public class LoadTsFileStatement extends Statement { private final List resources; private final List writePointCountList; - public LoadTsFileStatement(String filePath) throws FileNotFoundException { + public LoadTsFileStatement(final String filePath) throws FileNotFoundException { this.file = new File(filePath); this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultStorageGroupLevel(); this.verifySchema = true; this.deleteAfterLoad = false; this.autoCreateDatabase = IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled(); + this.loadWithMods = true; this.resources = new ArrayList<>(); this.writePointCountList = new ArrayList<>(); this.statementType = StatementType.MULTI_BATCH_INSERT; @@ -172,6 +174,10 @@ public boolean isAutoCreateDatabase() { return autoCreateDatabase; } + public boolean isLoadWithMods() { + return loadWithMods; + } + public int getDatabaseLevel() { return databaseLevel; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 929d106f24437..8afa481dae3d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -47,6 +47,9 @@ public static void validateParameters(final String key, final String value) { case MODEL_KEY: validateModelParam(value); break; + case LOAD_WITH_MODS_KEY: + validateLoadWithModParam(value); + break; case DATABASE_NAME_KEY: break; default: @@ -68,7 +71,7 @@ public static void validateDatabaseLevelParam(final String databaseLevel) { "Given database level %d is less than the minimum value %d, please input a valid database level.", level, DATABASE_LEVEL_MIN_VALUE)); } - } catch (Exception e) { + } catch (final Exception e) { throw new SemanticException( String.format( "Given database level %s is not a valid integer, please input a valid database level.", @@ -134,4 +137,18 @@ public static void validateModelParam(final String model) { private LoadTsFileConfigurator() { throw new IllegalStateException("Utility class"); } + + public static final String LOAD_WITH_MODS_KEY = "load-with-mods"; + public static final boolean LOAD_WITH_MODS_DEFAULT_VALUE = true; + + public static void validateLoadWithModParam(final String withModsStr) { + try { + Boolean.parseBoolean(withModsStr); + } catch (final Exception e) { + throw new SemanticException( + String.format( + "Given with mods params %s is not a boolean value, please input a valid boolean.", + withModsStr)); + } + } } From 6d74402afd91d8f8b2a06e367967452313a97be9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:06:11 +0800 Subject: [PATCH 06/13] Enrich --- .../plan/relational/sql/ast/LoadTsFile.java | 1 + .../plan/statement/crud/LoadTsFileStatement.java | 3 +++ .../load/config/LoadTsFileConfigurator.java | 10 ++++++++-- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java index 264024a81a181..63bf246fa5cb1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java @@ -145,6 +145,7 @@ private void initAttributes() { this.model = LoadTsFileConfigurator.parseOrGetDefaultModel( loadAttributes, LoadTsFileConfigurator.MODEL_TABLE_VALUE); + this.loadWithMods = LoadTsFileConfigurator.parseOrGetDefaultLoadWithMod(loadAttributes); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 2906a80dd04a0..7ce29a4b8d068 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -45,6 +45,7 @@ import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY; +import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.LOAD_WITH_MODS_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.MODEL_KEY; import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ON_SUCCESS_KEY; @@ -222,6 +223,7 @@ private void initAttributes() { this.model = LoadTsFileConfigurator.parseOrGetDefaultModel( loadAttributes, LoadTsFileConfigurator.MODEL_TREE_VALUE); + this.loadWithMods = LoadTsFileConfigurator.parseOrGetDefaultLoadWithMod(loadAttributes); } @Override @@ -248,6 +250,7 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat if (model != null) { loadAttributes.put(MODEL_KEY, model); } + loadAttributes.put(LOAD_WITH_MODS_KEY, String.valueOf(loadWithMods)); return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java index 8afa481dae3d0..c5552d02f7eb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/config/LoadTsFileConfigurator.java @@ -64,7 +64,7 @@ public static void validateParameters(final String key, final String value) { public static void validateDatabaseLevelParam(final String databaseLevel) { try { - int level = Integer.parseInt(databaseLevel); + final int level = Integer.parseInt(databaseLevel); if (level < DATABASE_LEVEL_MIN_VALUE) { throw new SemanticException( String.format( @@ -147,8 +147,14 @@ public static void validateLoadWithModParam(final String withModsStr) { } catch (final Exception e) { throw new SemanticException( String.format( - "Given with mods params %s is not a boolean value, please input a valid boolean.", + "Given with mods params %s is not a boolean value, please input a boolean string.", withModsStr)); } } + + public static boolean parseOrGetDefaultLoadWithMod(final Map loadAttributes) { + return Boolean.parseBoolean( + loadAttributes.getOrDefault( + LOAD_WITH_MODS_KEY, String.valueOf(LOAD_WITH_MODS_DEFAULT_VALUE))); + } } From 2cd1c1afdf62af083ea61c1fc44add82cf93d3be Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:24:26 +0800 Subject: [PATCH 07/13] may final --- .../plan/analyze/load/LoadTsFileAnalyzer.java | 32 +++++++++-------- .../load/LoadTsFileToTreeModelAnalyzer.java | 10 +++--- .../TreeSchemaAutoCreatorAndVerifier.java | 4 +-- .../plan/node/load/LoadSingleTsFileNode.java | 9 ++++- .../plan/node/load/LoadTsFileNode.java | 6 ++-- .../scheduler/load/LoadTsFileScheduler.java | 34 ++++++++++--------- .../load/splitter/TsFileSplitter.java | 13 ++++--- .../planner/node/load/LoadTsFileNodeTest.java | 22 ++++++------ ...tchedCompactionWithTsFileSplitterTest.java | 3 +- 9 files changed, 76 insertions(+), 57 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 0d096dfe28ebf..d9e132a7c5759 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -68,7 +68,7 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { protected final boolean isDeleteAfterLoad; protected final boolean isAutoCreateDatabase; - + protected final boolean isLoadWithMods; protected final int databaseLevel; protected final String database; @@ -78,13 +78,14 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { final IPartitionFetcher partitionFetcher = ClusterPartitionFetcher.getInstance(); final ISchemaFetcher schemaFetcher = ClusterSchemaFetcher.getInstance(); - LoadTsFileAnalyzer(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { + LoadTsFileAnalyzer(final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) { this.loadTsFileStatement = loadTsFileStatement; this.tsFiles = loadTsFileStatement.getTsFiles(); this.statementString = loadTsFileStatement.toString(); this.isVerifySchema = loadTsFileStatement.isVerifySchema(); this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad(); this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase(); + this.isLoadWithMods = loadTsFileStatement.isLoadWithMods(); this.databaseLevel = loadTsFileStatement.getDatabaseLevel(); this.database = loadTsFileStatement.getDatabase(); @@ -93,7 +94,7 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { this.context = context; } - LoadTsFileAnalyzer(LoadTsFile loadTsFileTableStatement, MPPQueryContext context) { + LoadTsFileAnalyzer(final LoadTsFile loadTsFileTableStatement, final MPPQueryContext context) { this.loadTsFileTableStatement = loadTsFileTableStatement; this.tsFiles = loadTsFileTableStatement.getTsFiles(); this.statementString = loadTsFileTableStatement.toString(); @@ -102,15 +103,16 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { this.isAutoCreateDatabase = loadTsFileTableStatement.isAutoCreateDatabase(); this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel(); this.database = loadTsFileTableStatement.getDatabase(); + this.isLoadWithMods = loadTsFileTableStatement.isLoadWithMods(); this.loadTsFileStatement = null; this.isTableModelStatement = true; this.context = context; } - public abstract IAnalysis analyzeFileByFile(IAnalysis analysis); + public abstract IAnalysis analyzeFileByFile(final IAnalysis analysis); - protected boolean doAnalyzeFileByFile(IAnalysis analysis) { + protected boolean doAnalyzeFileByFile(final IAnalysis analysis) { // analyze tsfile metadata file by file for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) { final File tsFile = tsFiles.get(i); @@ -134,17 +136,17 @@ protected boolean doAnalyzeFileByFile(IAnalysis analysis) { "Load - Analysis Stage: {}/{} tsfiles have been analyzed, progress: {}%", i + 1, tsfileNum, String.format("%.3f", (i + 1) * 100.00 / tsfileNum)); } - } catch (AuthException e) { + } catch (final AuthException e) { setFailAnalysisForAuthException(analysis, e); return false; - } catch (BufferUnderflowException e) { + } catch (final BufferUnderflowException e) { LOGGER.warn( "The file {} is not a valid tsfile. Please check the input file.", tsFile.getPath(), e); throw new SemanticException( String.format( "The file %s is not a valid tsfile. Please check the input file.", tsFile.getPath())); - } catch (Exception e) { + } catch (final Exception e) { final String exceptionMessage = String.format( "Loading file %s failed. Detail: %s", @@ -178,7 +180,7 @@ protected String getStatementString() { return statementString; } - protected void setRealStatement(IAnalysis analysis) { + protected void setRealStatement(final IAnalysis analysis) { if (isTableModelStatement) { // Do nothing by now. } else { @@ -186,7 +188,7 @@ protected void setRealStatement(IAnalysis analysis) { } } - protected void addTsFileResource(TsFileResource tsFileResource) { + protected void addTsFileResource(final TsFileResource tsFileResource) { if (isTableModelStatement) { loadTsFileTableStatement.addTsFileResource(tsFileResource); } else { @@ -194,7 +196,7 @@ protected void addTsFileResource(TsFileResource tsFileResource) { } } - protected void addWritePointCount(long writePointCount) { + protected void addWritePointCount(final long writePointCount) { if (isTableModelStatement) { loadTsFileTableStatement.addWritePointCount(writePointCount); } else { @@ -215,19 +217,19 @@ protected int getDatabaseLevel() { } protected long getWritePointCount( - Map> device2TimeseriesMetadata) { - return device2TimeseriesMetadata.values().stream() + final Map> device2TimeSeriesMetadata) { + return device2TimeSeriesMetadata.values().stream() .flatMap(List::stream) .mapToLong(t -> t.getStatistics().getCount()) .sum(); } - protected void setFailAnalysisForAuthException(IAnalysis analysis, AuthException e) { + protected void setFailAnalysisForAuthException(final IAnalysis analysis, final AuthException e) { analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus(RpcUtils.getStatus(e.getCode(), e.getMessage())); } - protected void checkBeforeAnalyzeFileByFile(IAnalysis analysis) { + protected void checkBeforeAnalyzeFileByFile(final IAnalysis analysis) { if (TSFileDescriptor.getInstance().getConfig().getEncryptFlag()) { analysis.setFinishQueryAfterAnalyze(true); analysis.setFailStatus( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java index 76bac3bc26bc2..41fcb66c98ba0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java @@ -57,19 +57,19 @@ public class LoadTsFileToTreeModelAnalyzer extends LoadTsFileAnalyzer { private final TreeSchemaAutoCreatorAndVerifier schemaAutoCreatorAndVerifier; public LoadTsFileToTreeModelAnalyzer( - LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { + final LoadTsFileStatement loadTsFileStatement, final MPPQueryContext context) { super(loadTsFileStatement, context); this.schemaAutoCreatorAndVerifier = new TreeSchemaAutoCreatorAndVerifier(this); } public LoadTsFileToTreeModelAnalyzer( - LoadTsFile loadTsFileTableStatement, MPPQueryContext context) { + final LoadTsFile loadTsFileTableStatement, final MPPQueryContext context) { super(loadTsFileTableStatement, context); this.schemaAutoCreatorAndVerifier = new TreeSchemaAutoCreatorAndVerifier(this); } @Override - public IAnalysis analyzeFileByFile(IAnalysis analysis) { + public IAnalysis analyzeFileByFile(final IAnalysis analysis) { checkBeforeAnalyzeFileByFile(analysis); if (analysis.isFinishQueryAfterAnalyze()) { return analysis; @@ -82,10 +82,10 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) { try { schemaAutoCreatorAndVerifier.flush(); - } catch (AuthException e) { + } catch (final AuthException e) { setFailAnalysisForAuthException(analysis, e); return analysis; - } catch (Exception e) { + } catch (final Exception e) { final String exceptionMessage = String.format( "Auto create or verify schema error when executing statement %s. Detail: %s.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java index 92c847b850cc0..89f1eaeef0687 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java @@ -227,9 +227,9 @@ private void doAutoCreateAndVerify() throws SemanticException, AuthException { if (loadTsFileAnalyzer.isVerifySchema()) { verifySchema(schemaTree); } - } catch (AuthException e) { + } catch (final AuthException e) { throw e; - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn("Auto create or verify schema error.", e); throw new SemanticException( String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index a7317b8d7d40b..24e053a370b83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -60,6 +60,7 @@ public class LoadSingleTsFileNode extends WritePlanNode { private final boolean isTableModel; private final String database; private final boolean deleteAfterLoad; + private final boolean loadWithMods; private final long writePointCount; private boolean needDecodeTsFile; @@ -71,7 +72,8 @@ public LoadSingleTsFileNode( final boolean isTableModel, final String database, final boolean deleteAfterLoad, - final long writePointCount) { + final long writePointCount, + final boolean loadWithMods) { super(id); this.tsFile = resource.getTsFile(); this.resource = resource; @@ -79,6 +81,7 @@ public LoadSingleTsFileNode( this.database = database; this.deleteAfterLoad = deleteAfterLoad; this.writePointCount = writePointCount; + this.loadWithMods = loadWithMods; } public boolean isTsFileEmpty() { @@ -140,6 +143,10 @@ public boolean isDeleteAfterLoad() { return deleteAfterLoad; } + public boolean isLoadWithMods() { + return loadWithMods; + } + public boolean isTableModel() { return isTableModel; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java index edc2e53b1375e..d479d07a22563 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadTsFileNode.java @@ -124,7 +124,8 @@ private List splitByPartitionForTreeModel(final Analysis analysis isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + statement.isLoadWithMods())); } return res; } @@ -146,7 +147,8 @@ private List splitByPartitionForTableModel( isTableModel.get(i), database, statement.isDeleteAfterLoad(), - statement.getWritePointCount(i))); + statement.getWritePointCount(i), + statement.isLoadWithMods())); } } return res; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java index 4fb261e15cb2b..eebd521451abc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java @@ -160,7 +160,7 @@ public LoadTsFileScheduler( public void start() { try { stateMachine.transitionToRunning(); - int tsFileNodeListSize = tsFileNodeList.size(); + final int tsFileNodeListSize = tsFileNodeList.size(); boolean isLoadSuccess = true; for (int i = 0; i < tsFileNodeListSize; ++i) { @@ -267,13 +267,15 @@ private boolean firstPhase(final LoadSingleTsFileNode node) { final TsFileDataManager tsFileDataManager = new TsFileDataManager(this, node, block); try { new TsFileSplitter( - node.getTsFileResource().getTsFile(), tsFileDataManager::addOrSendTsFileData) + node.getTsFileResource().getTsFile(), + tsFileDataManager::addOrSendTsFileData, + node.isLoadWithMods()) .splitTsFileByDataPartition(); if (!tsFileDataManager.sendAllTsFileData()) { stateMachine.transitionToFailed(new TSStatus(TSStatusCode.LOAD_FILE_ERROR.getStatusCode())); return false; } - } catch (IllegalStateException e) { + } catch (final IllegalStateException e) { stateMachine.transitionToFailed(e); LOGGER.warn( String.format( @@ -293,9 +295,9 @@ private boolean firstPhase(final LoadSingleTsFileNode node) { } private boolean dispatchOnePieceNode( - LoadTsFilePieceNode pieceNode, TRegionReplicaSet replicaSet) { + final LoadTsFilePieceNode pieceNode, final TRegionReplicaSet replicaSet) { allReplicaSets.add(replicaSet); - FragmentInstance instance = + final FragmentInstance instance = new FragmentInstance( new PlanFragment(fragmentId, pieceNode), fragmentId.genFragmentInstanceId(), @@ -304,11 +306,11 @@ private boolean dispatchOnePieceNode( queryContext.getTimeOut(), queryContext.getSession()); instance.setExecutorAndHost(new StorageExecutor(replicaSet)); - Future dispatchResultFuture = + final Future dispatchResultFuture = dispatcher.dispatch(Collections.singletonList(instance)); try { - FragInstanceDispatchResult result = + final FragInstanceDispatchResult result = dispatchResultFuture.get( CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds(), TimeUnit.SECONDS); if (!result.isSuccessful()) { @@ -328,21 +330,21 @@ private boolean dispatchOnePieceNode( status.getMessage()); } } - TSStatus status = result.getFailureStatus(); + final TSStatus status = result.getFailureStatus(); status.setMessage( String.format("Load %s piece error in 1st phase. Because ", pieceNode.getTsFile()) + status.getMessage()); stateMachine.transitionToFailed(status); // TODO: record more status return false; } - } catch (InterruptedException | ExecutionException | CancellationException e) { + } catch (final InterruptedException | ExecutionException | CancellationException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } LOGGER.warn("Interrupt or Execution error.", e); stateMachine.transitionToFailed(e); return false; - } catch (TimeoutException e) { + } catch (final TimeoutException e) { dispatchResultFuture.cancel(true); LOGGER.warn( String.format("Wait for loading %s time out.", LoadTsFilePieceNode.class.getName()), e); @@ -353,7 +355,7 @@ private boolean dispatchOnePieceNode( } private boolean secondPhase( - boolean isFirstPhaseSuccess, String uuid, TsFileResource tsFileResource) { + final boolean isFirstPhaseSuccess, final String uuid, final TsFileResource tsFileResource) { LOGGER.info("Start dispatching Load command for uuid {}", uuid); final File tsFile = tsFileResource.getTsFile(); final TLoadCommandReq loadCommandReq = @@ -363,10 +365,10 @@ private boolean secondPhase( try { loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe); loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource)); - Future dispatchResultFuture = + final Future dispatchResultFuture = dispatcher.dispatchCommand(loadCommandReq, allReplicaSets); - FragInstanceDispatchResult result = dispatchResultFuture.get(); + final FragInstanceDispatchResult result = dispatchResultFuture.get(); if (!result.isSuccessful()) { // TODO: retry. LOGGER.warn( @@ -377,13 +379,13 @@ private boolean secondPhase( allReplicaSets, TSStatusCode.representOf(result.getFailureStatus().getCode()).name(), result.getFailureStatus().getMessage()); - TSStatus status = result.getFailureStatus(); + final TSStatus status = result.getFailureStatus(); status.setMessage( String.format("Load %s error in 2nd phase. Because ", tsFile) + status.getMessage()); stateMachine.transitionToFailed(status); return false; } - } catch (IOException e) { + } catch (final IOException e) { LOGGER.warn( "Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}", isFirstPhaseSuccess, @@ -391,7 +393,7 @@ private boolean secondPhase( tsFile.getAbsolutePath()); stateMachine.transitionToFailed(e); return false; - } catch (InterruptedException | ExecutionException e) { + } catch (final InterruptedException | ExecutionException e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index b3e3c235db571..059ea18063b5b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -64,8 +64,9 @@ public class TsFileSplitter { private final File tsFile; private final Function consumer; - private Map offset2ChunkMetadata = new HashMap<>(); - private List deletions = new ArrayList<>(); + private final boolean loadWithMods; + private final Map offset2ChunkMetadata = new HashMap<>(); + private final List deletions = new ArrayList<>(); private Map> pageIndex2ChunkData = new HashMap<>(); private Map pageIndex2Times = new HashMap<>(); private boolean isTimeChunkNeedDecode = true; @@ -82,15 +83,19 @@ public class TsFileSplitter { private List> pageIndex2TimesList = null; private List isTimeChunkNeedDecodeList = new ArrayList<>(); - public TsFileSplitter(final File tsFile, final Function consumer) { + public TsFileSplitter( + final File tsFile, final Function consumer, final boolean loadWithMods) { this.tsFile = tsFile; this.consumer = consumer; + this.loadWithMods = loadWithMods; } @SuppressWarnings({"squid:S3776", "squid:S6541"}) public void splitTsFileByDataPartition() throws IOException, IllegalStateException { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { - getAllModification(deletions); + if (loadWithMods) { + getAllModification(deletions); + } if (!checkMagic(reader)) { throw new TsFileRuntimeException( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java index e425c709815c3..da02a2941e8ee 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/load/LoadTsFileNodeTest.java @@ -38,10 +38,10 @@ public class LoadTsFileNodeTest { @Test public void testLoadSingleTsFileNode() { - TsFileResource resource = new TsFileResource(new File("1")); - String database = "root.db"; - LoadSingleTsFileNode node = - new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L); + final TsFileResource resource = new TsFileResource(new File("1")); + final String database = "root.db"; + final LoadSingleTsFileNode node = + new LoadSingleTsFileNode(new PlanNodeId(""), resource, false, database, true, 0L, true); Assert.assertTrue(node.isDeleteAfterLoad()); Assert.assertEquals(resource, node.getTsFileResource()); Assert.assertEquals(database, node.getDatabase()); @@ -52,12 +52,12 @@ public void testLoadSingleTsFileNode() { try { node.clone(); Assert.fail(); - } catch (NotImplementedException ignored) { + } catch (final NotImplementedException ignored) { } try { node.splitByPartition(new Analysis()); Assert.fail(); - } catch (NotImplementedException ignored) { + } catch (final NotImplementedException ignored) { } Assert.assertEquals(0, node.allowedChildCount()); Assert.assertEquals("LoadSingleTsFileNode{tsFile=1, needDecodeTsFile=false}", node.toString()); @@ -66,7 +66,7 @@ public void testLoadSingleTsFileNode() { @Test public void testLoadTsFilePieceNode() { - LoadTsFilePieceNode node = new LoadTsFilePieceNode(new PlanNodeId(""), new File("1")); + final LoadTsFilePieceNode node = new LoadTsFilePieceNode(new PlanNodeId(""), new File("1")); Assert.assertEquals(0, node.getDataSize()); Assert.assertEquals(new ArrayList<>(), node.getAllTsFileData()); Assert.assertEquals(node.getTsFile(), new File("1")); @@ -75,18 +75,18 @@ public void testLoadTsFilePieceNode() { try { node.clone(); Assert.fail(); - } catch (NotImplementedException ignored) { + } catch (final NotImplementedException ignored) { } try { node.splitByPartition(new Analysis()); Assert.fail(); - } catch (NotImplementedException ignored) { + } catch (final NotImplementedException ignored) { } Assert.assertEquals(0, node.allowedChildCount()); Assert.assertEquals("LoadTsFilePieceNode{tsFile=1, dataSize=0}", node.toString()); - ByteBuffer buffer = ByteBuffer.allocate(1024); + final ByteBuffer buffer = ByteBuffer.allocate(1024); node.serialize(buffer); - LoadTsFilePieceNode node1 = (LoadTsFilePieceNode) LoadTsFilePieceNode.deserialize(buffer); + final LoadTsFilePieceNode node1 = (LoadTsFilePieceNode) LoadTsFilePieceNode.deserialize(buffer); Assert.assertEquals(node.getTsFile(), node1.getTsFile()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java index 9daee881371b4..f2c9d507f2f41 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/BatchedCompactionWithTsFileSplitterTest.java @@ -296,7 +296,8 @@ private void consumeChunkDataAndValidate(TsFileResource resource) throw new RuntimeException(e); } return true; - }); + }, + true); splitter.splitTsFileByDataPartition(); List splitResources = new ArrayList<>(); for (Map.Entry entry : writerMap.entrySet()) { From 18f49b271e9e59f78cc77583b682cf2d5d0a93b8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:33:20 +0800 Subject: [PATCH 08/13] Test --- .../apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 51 +++++++++++++++++++ .../plan/node/load/LoadSingleTsFileNode.java | 14 ++--- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 0b9d310c76774..30ff9acb086b5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -810,6 +810,57 @@ public void testLoadWithMods() throws Exception { } } + @Test + public void testLoadWithoutMods() throws Exception { + final long writtenPoint1; + // device 0, device 1, sg 0 + try (final TsFileGenerator generator = + new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) { + generator.registerTimeseries( + SchemaConfig.DEVICE_0, + Arrays.asList( + SchemaConfig.MEASUREMENT_00, + SchemaConfig.MEASUREMENT_01, + SchemaConfig.MEASUREMENT_02, + SchemaConfig.MEASUREMENT_03, + SchemaConfig.MEASUREMENT_04, + SchemaConfig.MEASUREMENT_05, + SchemaConfig.MEASUREMENT_06, + SchemaConfig.MEASUREMENT_07)); + generator.registerAlignedTimeseries( + SchemaConfig.DEVICE_1, + Arrays.asList( + SchemaConfig.MEASUREMENT_10, + SchemaConfig.MEASUREMENT_11, + SchemaConfig.MEASUREMENT_12, + SchemaConfig.MEASUREMENT_13, + SchemaConfig.MEASUREMENT_14, + SchemaConfig.MEASUREMENT_15, + SchemaConfig.MEASUREMENT_16, + SchemaConfig.MEASUREMENT_17)); + generator.generateData(SchemaConfig.DEVICE_0, 100000, PARTITION_INTERVAL / 10_000, false); + generator.generateData(SchemaConfig.DEVICE_1, 100000, PARTITION_INTERVAL / 10_000, true); + writtenPoint1 = generator.getTotalNumber(); + generator.generateDeletion(SchemaConfig.DEVICE_0, 10); + } + + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + + statement.execute( + String.format("load \"%s\" sglevel=2 load-with-mods=false", tmpDir.getAbsolutePath())); + + try (final ResultSet resultSet = + statement.executeQuery("select count(*) from root.** group by level=1,2")) { + if (resultSet.next()) { + Assert.assertEquals(writtenPoint1, resultSet.getLong("count(root.sg.test_0.*.*)")); + } else { + Assert.fail("This ResultSet is empty."); + } + } + } + } + @Test public void testLoadWithEmptyTsFile() throws Exception { try (final TsFileGenerator ignored = new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 24e053a370b83..f585078a39f26 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -183,7 +183,7 @@ public List getChildren() { } @Override - public void addChild(PlanNode child) { + public void addChild(final PlanNode child) { // Do nothing } @@ -203,17 +203,17 @@ public List getOutputColumnNames() { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) { + protected void serializeAttributes(final ByteBuffer byteBuffer) { // Do nothing } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException { + protected void serializeAttributes(final DataOutputStream stream) throws IOException { // Do nothing } @Override - public List splitByPartition(IAnalysis analysis) { + public List splitByPartition(final IAnalysis analysis) { throw new NotImplementedException("split load single TsFile is not implemented"); } @@ -243,20 +243,21 @@ public void clean() { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } - LoadSingleTsFileNode loadSingleTsFileNode = (LoadSingleTsFileNode) o; + final LoadSingleTsFileNode loadSingleTsFileNode = (LoadSingleTsFileNode) o; return Objects.equals(tsFile, loadSingleTsFileNode.tsFile) && Objects.equals(resource, loadSingleTsFileNode.resource) && Objects.equals(isTableModel, loadSingleTsFileNode.isTableModel) && Objects.equals(database, loadSingleTsFileNode.database) && Objects.equals(needDecodeTsFile, loadSingleTsFileNode.needDecodeTsFile) && Objects.equals(deleteAfterLoad, loadSingleTsFileNode.deleteAfterLoad) + && Objects.equals(loadWithMods, loadSingleTsFileNode.loadWithMods) && Objects.equals(localRegionReplicaSet, loadSingleTsFileNode.localRegionReplicaSet); } @@ -269,6 +270,7 @@ public int hashCode() { database, needDecodeTsFile, deleteAfterLoad, + loadWithMods, localRegionReplicaSet); } } From 398db9471b7239e5d86896b8bf7313ccc11f291c Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 19:16:25 +0800 Subject: [PATCH 09/13] Fix --- ...atementDataTypeConvertExecutionVisitor.java | 4 ++-- ...atementDataTypeConvertExecutionVisitor.java | 4 ++-- .../load/LoadTsFileToTableModelAnalyzer.java | 4 ++-- .../load/LoadTsFileToTreeModelAnalyzer.java | 4 ++-- .../plan/node/load/LoadSingleTsFileNode.java | 17 +++-------------- .../storageengine/load/LoadTsFileManager.java | 18 ++++++++++++++++-- .../pipe/receiver/IoTDBFileReceiver.java | 4 ++-- 7 files changed, 29 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index d7c96ee700c5e..09cd7afbb9c09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -36,10 +36,10 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.commons.io.FileUtils; import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -182,7 +182,7 @@ public Optional visitLoadFile( } if (loadTsFileStatement.isDeleteAfterLoad()) { - loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile); } LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 5e78dc1959a8e..c44f35ad8f454 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -35,9 +35,9 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.commons.io.FileUtils; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; @@ -148,7 +148,7 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) { } if (loadTsFileStatement.isDeleteAfterLoad()) { - loadTsFileStatement.getTsFiles().forEach(FileUtils::deleteQuietly); + loadTsFileStatement.getTsFiles().forEach(LoadTsFileManager::cleanTsFile); } LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java index 6a16f2e49fecf..28f5303c7fa1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTableModelAnalyzer.java @@ -36,12 +36,12 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.commons.io.FileUtils; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.file.metadata.IDeviceID; @@ -184,7 +184,7 @@ protected void analyzeSingleTsFile(final File tsFile) } catch (final LoadEmptyFileException loadEmptyFileException) { LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath()); if (isDeleteAfterLoad) { - FileUtils.deleteQuietly(tsFile); + LoadTsFileManager.cleanTsFile(tsFile); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java index 41fcb66c98ba0..431889172689c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileToTreeModelAnalyzer.java @@ -30,11 +30,11 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; -import org.apache.commons.io.FileUtils; import org.apache.tsfile.encrypt.EncryptParameter; import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.file.metadata.IDeviceID; @@ -168,7 +168,7 @@ protected void analyzeSingleTsFile(final File tsFile) throws IOException, AuthEx } catch (final LoadEmptyFileException loadEmptyFileException) { LOGGER.warn("Failed to load empty file: {}", tsFile.getAbsolutePath()); if (isDeleteAfterLoad) { - FileUtils.deleteQuietly(tsFile); + LoadTsFileManager.cleanTsFile(tsFile); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index f585078a39f26..5877dfecf8832 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -29,9 +29,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.LoadTsFileManager; import org.apache.tsfile.exception.NotImplementedException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -43,7 +42,6 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -228,17 +226,8 @@ public String toString() { } public void clean() { - try { - if (deleteAfterLoad) { - Files.deleteIfExists(tsFile.toPath()); - Files.deleteIfExists( - new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).toPath()); - Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath()); - Files.deleteIfExists( - new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).toPath()); - } - } catch (final IOException e) { - LOGGER.warn("Delete After Loading {} error.", tsFile, e); + if (loadWithMods) { + LoadTsFileManager.cleanTsFile(tsFile); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 9ab7e0862644a..0b4ffdaff5581 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -42,6 +42,7 @@ import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent; @@ -531,7 +532,7 @@ private void close() { } } if (dataPartition2ModificationFile != null) { - for (Map.Entry entry : + for (final Map.Entry entry : dataPartition2ModificationFile.entrySet()) { try { final ModificationFile modificationFile = entry.getValue(); @@ -547,7 +548,7 @@ private void close() { } try { Files.delete(taskDir.toPath()); - } catch (DirectoryNotEmptyException e) { + } catch (final DirectoryNotEmptyException e) { LOGGER.info("Task dir {} is not empty, skip deleting.", taskDir.getPath()); } catch (IOException e) { LOGGER.warn(MESSAGE_DELETE_FAIL, taskDir.getPath(), e); @@ -559,6 +560,19 @@ private void close() { } } + public static void cleanTsFile(final File tsFile) { + try { + Files.deleteIfExists(tsFile.toPath()); + Files.deleteIfExists( + new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).toPath()); + Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath()); + Files.deleteIfExists( + new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).toPath()); + } catch (final IOException e) { + LOGGER.warn("Delete After Loading {} error.", tsFile, e); + } + } + private class CleanupTask implements Runnable, Comparable { private final String uuid; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 1ab1be21454fd..ec4e6321e703e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -317,7 +317,7 @@ protected final TPipeTransferResp handleTransferFilePiece( writingFileWriter.write(req.getFilePiece()); return PipeTransferFilePieceResp.toTPipeTransferResp( RpcUtils.SUCCESS_STATUS, writingFileWriter.length()); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn( "Receiver id = {}: Failed to write file piece from req {}.", receiverId.get(), req, e); final TSStatus status = @@ -597,7 +597,7 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil status); } return new TPipeTransferResp(status); - } catch (Exception e) { + } catch (final Exception e) { LOGGER.warn( "Receiver id = {}: Failed to seal file {} from req {}.", receiverId.get(), files, req, e); return new TPipeTransferResp( From a03cca383d2c5c391b213e91296e45381812fe8e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 20:22:04 +0800 Subject: [PATCH 10/13] Fix --- .../test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 30ff9acb086b5..6128bf82c777c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -848,7 +848,9 @@ public void testLoadWithoutMods() throws Exception { final Statement statement = connection.createStatement()) { statement.execute( - String.format("load \"%s\" sglevel=2 load-with-mods=false", tmpDir.getAbsolutePath())); + String.format( + "load \"%s\" with ('database-level'='2', 'load-with-mods'='false')", + tmpDir.getAbsolutePath())); try (final ResultSet resultSet = statement.executeQuery("select count(*) from root.** group by level=1,2")) { From a5529e84a5e07e1d3b712dea4471e907c4d53bef Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 9 Dec 2024 20:29:38 +0800 Subject: [PATCH 11/13] Update LoadSingleTsFileNode.java --- .../plan/planner/plan/node/load/LoadSingleTsFileNode.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index 5877dfecf8832..46f28609c4401 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -226,7 +226,7 @@ public String toString() { } public void clean() { - if (loadWithMods) { + if (deleteAfterLoad) { LoadTsFileManager.cleanTsFile(tsFile); } } From 421ec334a84daa65148eec613292b39fa792c5f0 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Jan 2025 17:48:01 +0800 Subject: [PATCH 12/13] Update LoadTsFileAnalyzer.java --- .../db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 86c4da7e2a56c..b667b5eab4547 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -104,7 +104,9 @@ public abstract class LoadTsFileAnalyzer implements AutoCloseable { } LoadTsFileAnalyzer( - final LoadTsFile loadTsFileTableStatement, final boolean isGeneratedByPipe, final MPPQueryContext context) { + final LoadTsFile loadTsFileTableStatement, + final boolean isGeneratedByPipe, + final MPPQueryContext context) { this.loadTsFileTableStatement = loadTsFileTableStatement; this.tsFiles = loadTsFileTableStatement.getTsFiles(); this.statementString = loadTsFileTableStatement.toString(); From 5194b4b7f1d786a5286ce872a5083e69cf5170cd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Thu, 9 Jan 2025 18:04:06 +0800 Subject: [PATCH 13/13] Update TsFileSplitter.java --- .../iotdb/db/storageengine/load/splitter/TsFileSplitter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index afa115b1c4681..75b340185f02e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -83,7 +83,8 @@ public class TsFileSplitter { private List> pageIndex2TimesList = null; private List isTimeChunkNeedDecodeList = new ArrayList<>(); - public TsFileSplitter(File tsFile, TsFileDataConsumer consumer) { + public TsFileSplitter( + final File tsFile, final TsFileDataConsumer consumer, final boolean loadWithMods) { this.tsFile = tsFile; this.consumer = consumer; this.loadWithMods = loadWithMods;