From ce979f2327e3e76c86ba3c12c3b7c6a4d7276877 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Sat, 9 May 2026 12:26:59 +0800 Subject: [PATCH 1/4] Fix that load tsfile may skip time-only aligned chunks --- .../load/splitter/TsFileSplitter.java | 9 +- .../load/TsFileSplitterTest.java | 157 ++++++++++++++++++ 2 files changed, 163 insertions(+), 3 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java index bbfd8f1bb30c..e8480a935170 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java @@ -166,6 +166,12 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte isAligned = ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK); + if (isAligned) { + pageIndex2Times = new HashMap<>(); + pageIndex2ChunkData = new HashMap<>(); + isTimeChunkNeedDecode = true; + } + IChunkMetadata chunkMetadata = offset2ChunkMetadata.get(chunkOffset - Byte.BYTES); // When loading TsFile with Chunk in data zone but no matched ChunkMetadata // at the end of file, this Chunk needs to be skipped. @@ -359,9 +365,6 @@ private void storeTimeChunkContext() { pageIndex2TimesList.add(pageIndex2Times); pageIndex2ChunkDataList.add(pageIndex2ChunkData); isTimeChunkNeedDecodeList.add(isTimeChunkNeedDecode); - pageIndex2Times = new HashMap<>(); - pageIndex2ChunkData = new HashMap<>(); - isTimeChunkNeedDecode = true; } private void switchToTimeChunkContextOfCurrentMeasurement( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java new file mode 100644 index 000000000000..6610880567e9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/TsFileSplitterTest.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.splitter; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +public class TsFileSplitterTest { + + @Test + public void testSplitTableTimeOnlyAlignedChunk() throws Exception { + final File sourceTsFile = new File("split-table-time-only-source.tsfile"); + final File targetTsFile = new File("split-table-time-only-target.tsfile"); + final IDeviceID deviceID = new StringArrayDeviceID("table1", "tagA"); + + try { + writeTableTsFileWithTimeOnlyChunk(sourceTsFile, deviceID); + + final List emittedChunkDataList = new ArrayList<>(); + final TsFileSplitter splitter = + new TsFileSplitter( + sourceTsFile, + tsFileData -> { + if (tsFileData instanceof ChunkData) { + emittedChunkDataList.add((ChunkData) tsFileData); + } + return true; + }); + splitter.splitTsFileByDataPartition(); + + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + try (final TsFileIOWriter writer = new TsFileIOWriter(targetTsFile)) { + writer.setSchema(createSchema()); + IDeviceID currentDeviceID = null; + for (final ChunkData chunkData : emittedChunkDataList) { + if (!Objects.equals(currentDeviceID, chunkData.getDevice())) { + if (Objects.nonNull(currentDeviceID)) { + writer.endChunkGroup(); + } + writer.startChunkGroup(chunkData.getDevice()); + currentDeviceID = chunkData.getDevice(); + } + + writeSerializedChunkDataToWriter(chunkData, writer); + } + if (Objects.nonNull(currentDeviceID)) { + writer.endChunkGroup(); + } + writer.endFile(); + } + + Assert.assertEquals(1, emittedChunkDataList.size()); + try (final TsFileSequenceReader reader = + new TsFileSequenceReader(targetTsFile.getAbsolutePath())) { + final List chunkMetadataList = + reader.getAlignedChunkMetadata(deviceID, false); + Assert.assertEquals(1, chunkMetadataList.size()); + Assert.assertEquals( + 2, chunkMetadataList.get(0).getTimeChunkMetadata().getStatistics().getCount()); + Assert.assertTrue(chunkMetadataList.get(0).getValueChunkMetadataList().isEmpty()); + } + } finally { + if (sourceTsFile.exists()) { + Assert.assertTrue(sourceTsFile.delete()); + } + if (targetTsFile.exists()) { + Assert.assertTrue(targetTsFile.delete()); + } + } + } + + private void writeTableTsFileWithTimeOnlyChunk(final File tsFile, final IDeviceID deviceID) + throws Exception { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.setSchema(createSchema()); + writer.startChunkGroup(deviceID); + + final AlignedChunkWriterImpl chunkWriter = + new AlignedChunkWriterImpl(Collections.emptyList()); + chunkWriter.write(100); + chunkWriter.write(101); + chunkWriter.writeToFileWriter(writer); + + writer.endChunkGroup(); + writer.endFile(); + } + } + + private Schema createSchema() { + final List tableSchemaList = + Arrays.asList( + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64)); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD); + + final Schema schema = new Schema(); + schema.registerTableSchema(new TableSchema("table1", tableSchemaList, columnCategoryList)); + return schema; + } + + private void writeSerializedChunkDataToWriter( + final ChunkData chunkData, final TsFileIOWriter writer) throws Exception { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (final DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + chunkData.serialize(dataOutputStream); + } + ((ChunkData) + TsFileData.deserialize(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()))) + .writeToFileWriter(writer); + } +} From a4055a5a3d3e2e7242a062aaaed805108e545a64 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 11 May 2026 10:53:34 +0800 Subject: [PATCH 2/4] fix LoadTsFileAnalyzer may count timestamp in point count --- .../iotdb/db/it/IoTDBLoadLastCacheIT.java | 1 - .../manual/enhanced/IoTDBPipeClusterIT.java | 34 +++ .../plan/analyze/load/LoadTsFileAnalyzer.java | 1 + .../load/LoadTsFileTableSchemaCache.java | 24 ++- .../analyze/load/LoadTsFileAnalyzerTest.java | 198 ++++++++++++++++++ 5 files changed, 248 insertions(+), 10 deletions(-) create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java index 755b3aef7586..4fd1f6fab913 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java @@ -86,7 +86,6 @@ public static Collection data() { new Object[][] { {LastCacheLoadStrategy.CLEAN_ALL}, {LastCacheLoadStrategy.UPDATE}, - {LastCacheLoadStrategy.UPDATE_NO_BLOB}, {LastCacheLoadStrategy.CLEAN_DEVICE} }); } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index 4350d4072bb0..d28d1d8c21dc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -1,3 +1,4 @@ + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,6 +20,8 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced; +import java.util.Arrays; +import java.util.HashSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; @@ -36,6 +39,7 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils; import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; import org.apache.iotdb.rpc.TSStatusCode; @@ -1001,4 +1005,34 @@ public void testNegativeTimestamp() throws Exception { TableModelUtils.assertData("test", "test", -200, 100, receiverEnv, handleFailure); } } + + @Test + public void testHistoryDataWithEmptyField() { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "CREATE DATABASE iot_table_stream_attr", + "USE iot_table_stream_attr", + "CREATE TABLE table1 (region STRING TAG, device_id STRING TAG, model_id STRING ATTRIBUTE, maintenance STRING ATTRIBUTE COMMENT 'maintenance', temperature FLOAT FIELD COMMENT 'temperature', humidity STRING ATTRIBUTE COMMENT 'humidity', plant_id STRING TAG) COMMENT 'table1'", + String.format( + "create pipe test with source ('inclusion'='all') with sink('node-urls'='%s')", + receiverEnv.getDataNodeWrapper(0).getIpAndPortString()), + "select * from table1 order by time", + "INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('north', null, 'd101', 'red', null, '2025-11-26 13:38:00', 91.0, null), (null, '1003', null, null, 'maint-a', '2025-11-26 13:39:00', null, '36.2'), (null, null, null, 'green', 'maint-b', '2025-11-26 13:40:00', 88.8, '34.9')", + "INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('south', '1005', 'd105', null, null, '2025-11-26 13:41:00', 87.5, null)", + "INSERT INTO table1(region, plant_id, device_id, model_id, maintenance, time, temperature, humidity) VALUES ('west', '1006', 'd106', 'blue', 'maint-c', '2025-11-26 13:42:00', null, '36.8')"), + BaseEnv.TABLE_SQL_DIALECT); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from iot_table_stream_attr.table1 order by time", + "time,region,device_id,model_id,maintenance,temperature,humidity,plant_id,", + new HashSet<>( + Arrays.asList( + "2025-11-26T13:38:00.000Z,north,d101,red,null,91.0,null,null,", + "2025-11-26T13:39:00.000Z,null,null,null,maint-a,null,36.2,1003,", + "2025-11-26T13:40:00.000Z,null,null,green,maint-b,88.8,34.9,null,", + "2025-11-26T13:41:00.000Z,south,d105,null,null,87.5,null,1005,", + "2025-11-26T13:42:00.000Z,west,d106,blue,maint-c,null,36.8,1006,")), + (String) null); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 9d350d97c82e..79e06845afb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -632,6 +632,7 @@ private static long getWritePointCount( Map> device2TimeseriesMetadata) { return device2TimeseriesMetadata.values().stream() .flatMap(List::stream) + .filter(timeseriesMetadata -> !timeseriesMetadata.getMeasurementId().isEmpty()) .mapToLong(t -> t.getStatistics().getCount()) .sum(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java index 6754468bad53..b9fafa2d087f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileTableSchemaCache.java @@ -137,15 +137,8 @@ public void setTableSchemaMap( } public void autoCreateAndVerify(final IDeviceID device) throws LoadAnalyzeException { - try { - if (ModificationUtils.isDeviceDeletedByMods(currentModifications, currentTimeIndex, device)) { - return; - } - } catch (final IllegalPathException e) { - LOGGER.warn( - "Failed to check if device {} is deleted by mods. Will see it as not deleted.", - device, - e); + if (isDeviceDeletedByMods(device)) { + return; } try { @@ -167,6 +160,19 @@ public void autoCreateAndVerify(final IDeviceID device) throws LoadAnalyzeExcept } } + public boolean isDeviceDeletedByMods(final IDeviceID device) { + try { + return ModificationUtils.isDeviceDeletedByMods( + currentModifications, currentTimeIndex, device); + } catch (final IllegalPathException e) { + LOGGER.warn( + "Failed to check if device {} is deleted by mods. Will see it as not deleted.", + device, + e); + return false; + } + } + private void addDevice(final IDeviceID device) { final String tableName = device.getTableName(); long memoryUsageSizeInBytes = 0; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java new file mode 100644 index 000000000000..f868f6397a42 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.analyze.load; + +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.QueryId; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.enums.ColumnCategory; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.file.metadata.TableSchema; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.read.TsFileSequenceReaderTimeseriesMetadataIterator; +import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.apache.tsfile.write.writer.TsFileIOWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class LoadTsFileAnalyzerTest { + + private int dataNodeId; + + @Before + public void setUp() { + dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0); + } + + @After + public void tearDown() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(dataNodeId); + } + + @Test + public void testAnalyzeSingleTableFileShouldNotCountTimestampInPointCount() throws Exception { + final File tsFile = new File("load-table-mixed-null-device.tsfile"); + writeTableTsFileWithMixedDevices(tsFile); + + final LoadTsFile statement = + new LoadTsFile(null, tsFile.getAbsolutePath(), Collections.emptyMap()).setDatabase("db"); + final TrackingLoadTsFileTableSchemaCache schemaCache = new TrackingLoadTsFileTableSchemaCache(); + try (final LoadTsFileAnalyzer analyzer = + new LoadTsFileAnalyzer(statement, false, new MPPQueryContext(new QueryId("test"))); + final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + injectTableSchemaCache(analyzer, schemaCache); + + final Method method = + LoadTsFileAnalyzer.class.getDeclaredMethod( + "doAnalyzeSingleTableFile", + File.class, + TsFileSequenceReader.class, + TsFileSequenceReaderTimeseriesMetadataIterator.class, + java.util.Map.class); + method.setAccessible(true); + + final TsFileSequenceReaderTimeseriesMetadataIterator timeseriesMetadataIterator = + new TsFileSequenceReaderTimeseriesMetadataIterator(reader, false); + method.invoke( + analyzer, tsFile, reader, timeseriesMetadataIterator, reader.getTableSchemaMap()); + } finally { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + } + + Assert.assertEquals(1, statement.getResources().size()); + final TsFileResource resource = statement.getResources().get(0); + Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagA")); + Assert.assertTrue(containsDevice(resource.getDevices(), "table1", "tagB")); + Assert.assertEquals(6L, statement.getWritePointCount(0)); + Assert.assertTrue(schemaCache.containsDevice("table1", "tagA")); + Assert.assertTrue(schemaCache.containsDevice("table1", "tagB")); + Assert.assertEquals(2, schemaCache.getVerifiedDeviceCount()); + } + + private void writeTableTsFileWithMixedDevices(final File tsFile) throws Exception { + if (tsFile.exists()) { + Assert.assertTrue(tsFile.delete()); + } + + final List tableSchemaList = + Arrays.asList( + new MeasurementSchema("tag1", TSDataType.STRING), + new MeasurementSchema("s1", TSDataType.INT64), + new MeasurementSchema("s2", TSDataType.DOUBLE)); + final List columnCategoryList = + Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD, ColumnCategory.FIELD); + + final Schema schema = new Schema(); + schema.registerTableSchema(new TableSchema("table1", tableSchemaList, columnCategoryList)); + try (final TsFileIOWriter writer = new TsFileIOWriter(tsFile)) { + writer.setSchema(schema); + + writeDevice(writer, tableSchemaList, new String[] {"table1", "tagA"}, false); + writeDevice(writer, tableSchemaList, new String[] {"table1", "tagB"}, true); + + writer.endFile(); + } + } + + private void writeDevice( + final TsFileIOWriter writer, + final List tableSchemaList, + final String[] deviceSegments, + final boolean areAllFieldsNull) + throws Exception { + writer.startChunkGroup(new StringArrayDeviceID(deviceSegments)); + + final AlignedChunkWriterImpl chunkWriter = + new AlignedChunkWriterImpl(tableSchemaList.subList(1, tableSchemaList.size())); + for (int i = 0; i < 3; i++) { + final long time = 100 + i; + chunkWriter.getTimeChunkWriter().write(time); + chunkWriter.getValueChunkWriterByIndex(0).write(time, (long) i, areAllFieldsNull); + chunkWriter.getValueChunkWriterByIndex(1).write(time, 0.5 + i, areAllFieldsNull); + } + chunkWriter.writeToFileWriter(writer); + writer.endChunkGroup(); + } + + private void injectTableSchemaCache( + final LoadTsFileAnalyzer analyzer, final TrackingLoadTsFileTableSchemaCache schemaCache) + throws Exception { + final Field tableSchemaCacheField = + LoadTsFileAnalyzer.class.getDeclaredField("tableSchemaCache"); + tableSchemaCacheField.setAccessible(true); + tableSchemaCacheField.set(analyzer, schemaCache); + } + + private boolean containsDevice(final Set devices, final String... expectedSegments) { + return devices.stream() + .anyMatch(device -> Arrays.equals(device.getSegments(), expectedSegments)); + } + + private static class TrackingLoadTsFileTableSchemaCache extends LoadTsFileTableSchemaCache { + + private final Set> verifiedDevices = new HashSet<>(); + + private TrackingLoadTsFileTableSchemaCache() throws LoadRuntimeOutOfMemoryException { + super(null, new MPPQueryContext(new QueryId("load_test")), false); + } + + @Override + public void autoCreateAndVerify(final IDeviceID device) { + verifiedDevices.add(Arrays.asList(device.getSegments())); + } + + @Override + public boolean isDeviceDeletedByMods(final IDeviceID device) { + return false; + } + + private boolean containsDevice(final String... expectedSegments) { + return verifiedDevices.contains(Arrays.asList((Object[]) expectedSegments)); + } + + private int getVerifiedDeviceCount() { + return verifiedDevices.size(); + } + } +} From 4be9a133c81ac6b555c4b620e8dae0b5f41c17de Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 11 May 2026 11:45:05 +0800 Subject: [PATCH 3/4] Fix tablet parsing --- .../manual/enhanced/IoTDBPipeClusterIT.java | 5 +- ...sertionEventTableParserTabletIterator.java | 143 ++++++++++++------ .../analyze/load/LoadTsFileAnalyzerTest.java | 1 - 3 files changed, 97 insertions(+), 52 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java index d28d1d8c21dc..ad283d4a02cc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java @@ -1,4 +1,3 @@ - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -20,8 +19,6 @@ package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced; -import java.util.Arrays; -import java.util.HashSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; @@ -53,7 +50,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index f05cf872c798..9d11d51d31a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -101,6 +101,7 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator columnTypes; private List measurementList; private List dataTypeList; + private List fieldSchemaList; private int deviceIdSize; private List modsInfoList; @@ -194,7 +195,7 @@ public boolean hasNext() { long size = 0; List iChunkMetadataList = - reader.getAlignedChunkMetadata(pair.left, true); + reader.getAlignedChunkMetadata(pair.left, false); Iterator chunkMetadataIterator = iChunkMetadataList.iterator(); @@ -213,27 +214,7 @@ public boolean hasNext() { continue; } - Iterator iChunkMetadataIterator = - alignedChunkMetadata.getValueChunkMetadataList().iterator(); - while (iChunkMetadataIterator.hasNext()) { - IChunkMetadata iChunkMetadata = iChunkMetadataIterator.next(); - if (iChunkMetadata == null) { - iChunkMetadataIterator.remove(); - continue; - } - - if (!modifications.isEmpty() - && ModsOperationUtil.isAllDeletedByMods( - pair.getLeft(), - iChunkMetadata.getMeasurementUid(), - alignedChunkMetadata.getStartTime(), - alignedChunkMetadata.getEndTime(), - modifications)) { - iChunkMetadataIterator.remove(); - } - } - - if (alignedChunkMetadata.getValueChunkMetadataList().isEmpty()) { + if (areAllFieldsDeletedByMods(pair.getLeft(), alignedChunkMetadata)) { chunkMetadataIterator.remove(); continue; } @@ -267,6 +248,7 @@ public boolean hasNext() { dataTypeList = new ArrayList<>(); columnTypes = new ArrayList<>(); measurementList = new ArrayList<>(); + fieldSchemaList = new ArrayList<>(); for (int i = 0; i < columnSchemaSize; i++) { final IMeasurementSchema schema = tableSchema.getColumnSchemas().get(i); @@ -280,6 +262,9 @@ public boolean hasNext() { measurementList.add(measurementName); dataTypeList.add(schema.getType()); } + if (ColumnCategory.FIELD.equals(columnCategory)) { + fieldSchemaList.add(schema); + } } } deviceIdSize = dataTypeList.size(); @@ -331,9 +316,9 @@ private Tablet buildNextTablet() { tablet = new Tablet( tableName, - measurementList, - dataTypeList, - columnTypes, + new ArrayList<>(measurementList), + new ArrayList<>(dataTypeList), + new ArrayList<>(columnTypes), rowCountAndMemorySize.getLeft()); tablet.initBitMaps(); isFirstRow = false; @@ -376,6 +361,20 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta long size = timeChunkSize; final List valueChunkList = new ArrayList<>(); + final Map valueChunkMetadataMap = + alignedChunkMetadata.getValueChunkMetadataList().stream() + .filter(Objects::nonNull) + .filter( + metadata -> + !isFieldDeletedByMods( + metadata.getMeasurementUid(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime())) + .collect( + Collectors.toMap( + IChunkMetadata::getMeasurementUid, + metadata -> metadata, + (left, right) -> left)); // To ensure that the Tablet has the same alignedChunk column as the current one, // you need to create a new Tablet to fill in the data. @@ -392,50 +391,98 @@ private void initChunkReader(final AbstractAlignedChunkMetadata alignedChunkMeta measurementList.subList(deviceIdSize, measurementList.size()).clear(); dataTypeList.subList(deviceIdSize, dataTypeList.size()).clear(); - for (; offset < alignedChunkMetadata.getValueChunkMetadataList().size(); ++offset) { - final IChunkMetadata metadata = alignedChunkMetadata.getValueChunkMetadataList().get(offset); + boolean hasSelectedField = fieldSchemaList.isEmpty(); + boolean hasSelectedNonNullChunk = false; + for (; offset < fieldSchemaList.size(); ++offset) { + final IMeasurementSchema schema = fieldSchemaList.get(offset); + if (isFieldDeletedByMods( + schema.getMeasurementName(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime())) { + continue; + } + + final IChunkMetadata metadata = valueChunkMetadataMap.get(schema.getMeasurementName()); + Chunk chunk = null; if (metadata != null) { - final Chunk chunk = reader.readMemChunk((ChunkMetadata) metadata); - size += PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); - if (size > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { - if (valueChunkList.isEmpty()) { + chunk = reader.readMemChunk((ChunkMetadata) metadata); + final long newSize = size + PipeMemoryWeightUtil.calculateChunkRamBytesUsed(chunk); + if (newSize > allocatedMemoryBlockForChunk.getMemoryUsageInBytes()) { + if (!hasSelectedNonNullChunk) { // If the first chunk exceeds the memory limit, we need to allocate more memory + size = newSize; PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForChunk, size); - columnTypes.add(ColumnCategory.FIELD); - measurementList.add(metadata.getMeasurementUid()); - dataTypeList.add(metadata.getDataType()); - valueChunkList.add(chunk); - ++offset; + } else { + break; } - break; } else { - // Record the column information corresponding to Meta to fill in Tablet - columnTypes.add(ColumnCategory.FIELD); - measurementList.add(metadata.getMeasurementUid()); - dataTypeList.add(metadata.getDataType()); - valueChunkList.add(chunk); + size = newSize; } + hasSelectedNonNullChunk = true; } + columnTypes.add(ColumnCategory.FIELD); + measurementList.add(schema.getMeasurementName()); + dataTypeList.add(schema.getType()); + valueChunkList.add(chunk); + hasSelectedField = true; } - if (offset >= alignedChunkMetadata.getValueChunkMetadataList().size()) { + if (offset >= fieldSchemaList.size()) { currentChunkMetadata = null; } + if (!hasSelectedField) { + this.chunkReader = null; + this.batchData = null; + return; + } + this.chunkReader = new TableChunkReader(timeChunk, valueChunkList, null); this.modsInfoList = ModsOperationUtil.initializeMeasurementMods(deviceID, measurementList, modifications); } + private boolean areAllFieldsDeletedByMods( + final IDeviceID currentDeviceID, final AbstractAlignedChunkMetadata alignedChunkMetadata) { + if (modifications.isEmpty() || fieldSchemaList.isEmpty()) { + return false; + } + + for (final IMeasurementSchema schema : fieldSchemaList) { + if (!ModsOperationUtil.isAllDeletedByMods( + currentDeviceID, + schema.getMeasurementName(), + alignedChunkMetadata.getStartTime(), + alignedChunkMetadata.getEndTime(), + modifications)) { + return false; + } + } + return true; + } + + private boolean isFieldDeletedByMods( + final String measurementID, final long startTime, final long endTime) { + return !modifications.isEmpty() + && ModsOperationUtil.isAllDeletedByMods( + deviceID, measurementID, startTime, endTime, modifications); + } + private boolean fillMeasurementValueColumns( final BatchData data, final Tablet tablet, final int rowIndex) { - final TsPrimitiveType[] primitiveTypes = data.getVector(); + final TsPrimitiveType[] primitiveTypes = + Objects.nonNull(data.getVector()) ? data.getVector() : new TsPrimitiveType[0]; boolean needFillTime = false; + boolean hasNonDeletedField = dataTypeList.size() == deviceIdSize; for (int i = deviceIdSize, size = dataTypeList.size(); i < size; i++) { - final TsPrimitiveType primitiveType = primitiveTypes[i - deviceIdSize]; - if (primitiveType == null - || ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i))) { + final TsPrimitiveType primitiveType = + i - deviceIdSize < primitiveTypes.length ? primitiveTypes[i - deviceIdSize] : null; + final boolean isDeleted = ModsOperationUtil.isDelete(data.currentTime(), modsInfoList.get(i)); + if (!isDeleted) { + hasNonDeletedField = true; + } + if (primitiveType == null || isDeleted) { switch (dataTypeList.get(i)) { case TEXT: case BLOB: @@ -480,7 +527,7 @@ private boolean fillMeasurementValueColumns( throw new UnSupportedDataTypeException("UnSupported" + primitiveType.getDataType()); } } - return needFillTime; + return needFillTime || hasNonDeletedField; } private void fillDeviceIdColumns( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java index f868f6397a42..d6bdb1e37feb 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzerTest.java @@ -28,7 +28,6 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.TableSchema; From 58816f38f8f711d67ceaa1cb9686398e0947dde4 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 11 May 2026 12:11:51 +0800 Subject: [PATCH 4/4] add test --- .../it/db/it/IoTDBLoadTsFileIT.java | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java index a58e0633b742..b351ace5a4da 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java @@ -28,7 +28,10 @@ import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.TableSchema; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.TsFileWriter; +import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.After; @@ -486,6 +489,76 @@ private String convert2TableSQL( return tableCreation; } + @Test + public void testLoadWithAllFieldsNullRows() throws Exception { + final List schemas = + Arrays.asList( + new MeasurementSchema("f1", TSDataType.INT32), + new MeasurementSchema("f2", TSDataType.INT64)); + final List columnCategories = + Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD); + + final File file = new File(tmpDir, "1-0-0-0.tsfile"); + final int totalRows = 20; + + try (final TsFileWriter tsFileWriter = new TsFileWriter(file)) { + tsFileWriter.registerTableSchema( + new TableSchema(SchemaConfig.TABLE_0, schemas, columnCategories)); + + final List columnNames = Arrays.asList("f1", "f2"); + final List dataTypes = Arrays.asList(TSDataType.INT32, TSDataType.INT64); + final Tablet tablet = + new Tablet(SchemaConfig.TABLE_0, columnNames, dataTypes, columnCategories); + + for (int r = 0; r < totalRows; r++) { + final int row = tablet.getRowSize(); + tablet.addTimestamp(row, (r + 1) * 1000L); + } + tsFileWriter.writeTable(tablet); + } + + try (final Connection connection = + EnvFactory.getEnv().getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute(String.format("create database if not exists %s", SchemaConfig.DATABASE_0)); + statement.execute(String.format("use %s", SchemaConfig.DATABASE_0)); + statement.execute( + String.format( + "load '%s' with ('database'='%s')", file.getAbsolutePath(), SchemaConfig.DATABASE_0)); + + try (final ResultSet resultSet = + statement.executeQuery(String.format("select count(*) from %s", SchemaConfig.TABLE_0))) { + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(totalRows, resultSet.getLong(1)); + } + + try (final ResultSet resultSet = + statement.executeQuery( + String.format("select count(f1), count(f2) from %s", SchemaConfig.TABLE_0))) { + Assert.assertTrue(resultSet.next()); + Assert.assertEquals(0, resultSet.getLong(1)); + Assert.assertEquals(0, resultSet.getLong(2)); + } + + try (final ResultSet resultSet = + statement.executeQuery( + String.format("select time, f1, f2 from %s order by time", SchemaConfig.TABLE_0))) { + int count = 0; + while (resultSet.next()) { + final long time = resultSet.getLong("time"); + final int expectedTime = (count + 1) * 1000; + Assert.assertEquals(expectedTime, time); + resultSet.getInt("f1"); + Assert.assertTrue(resultSet.wasNull()); + resultSet.getLong("f2"); + Assert.assertTrue(resultSet.wasNull()); + count++; + } + Assert.assertEquals(totalRows, count); + } + } + } + private static class SchemaConfig { private static final String DATABASE_0 = "root"; private static final String TABLE_0 = "test";