diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java index 474e08922a7eeb..bc31213a994ad2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java @@ -633,4 +633,58 @@ protected long getRealFileSplitSize(long blockSize) { } return realSplitSize; } + + /** + * Estimate the total number of splits based on file sizes and split size, + * and adjust the split size if the estimated total exceeds the limit. + * + * @param fileSizes list of file sizes in bytes + * @param baseSplitSize the base split size to use (from getRealFileSplitSize) + * @return the adjusted split size that ensures total split count doesn't exceed maxFileSplitsNum + */ + protected long adjustSplitSizeForTotalLimit(List fileSizes, long baseSplitSize) { + int maxFileSplitsNum = sessionVariable.getMaxFileSplitsNum(); + if (maxFileSplitsNum <= 0 || fileSizes.isEmpty()) { + return baseSplitSize; + } + + // Estimate total split count with current split size + long estimatedTotalSplits = 0; + for (long fileSize : fileSizes) { + if (fileSize > 0) { + // Estimate splits for this file: ceil(fileSize / splitSize) + long splitsForFile = (fileSize + baseSplitSize - 1) / baseSplitSize; + estimatedTotalSplits += splitsForFile; + } + } + + // If estimated total is within limit, use the base split size + if (estimatedTotalSplits <= maxFileSplitsNum) { + return baseSplitSize; + } + + // Calculate total file size + long totalFileSize = 0; + for (long fileSize : fileSizes) { + totalFileSize += fileSize; + } + + if (totalFileSize <= 0) { + return baseSplitSize; + } + + // Calculate the minimum split size needed to stay within the limit + // minSplitSize = ceil(totalFileSize / maxFileSplitsNum) + long minSplitSize = (totalFileSize + maxFileSplitsNum - 1) / maxFileSplitsNum; + + // Use the larger of the base split size and the minimum required split size + long adjustedSplitSize = Math.max(baseSplitSize, minSplitSize); + + if (LOG.isDebugEnabled()) { + LOG.debug("Estimated total splits: {}, max allowed: {}, adjusted split size from {} to {}", + estimatedTotalSplits, maxFileSplitsNum, baseSplitSize, adjustedSplitSize); + } + + return adjustedSplitSize; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java index 744423f622cce8..74c88abbf2861d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java @@ -319,13 +319,49 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List fileSizes = Lists.newArrayList(); + long representativeBlockSize = 0; + for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { + if (fileCacheValue.getFiles() != null) { + for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { + fileSizes.add(status.getLength()); + if (representativeBlockSize == 0 && status.getBlockSize() > 0) { + representativeBlockSize = status.getBlockSize(); + } + } + } + } + + // Calculate base split size and adjust if needed to limit total split count + long baseSplitSize = getRealFileSplitSize(representativeBlockSize); + long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize); + + // Split files using the adjusted split size for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) { if (fileCacheValue.getFiles() != null) { boolean isSplittable = fileCacheValue.isSplittable(); for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) { allFiles.addAll(FileSplitter.splitFile(status.getPath(), - // set block size to Long.MAX_VALUE to avoid splitting the file. - getRealFileSplitSize(needSplit ? status.getBlockSize() : Long.MAX_VALUE), + adjustedSplitSize, status.getBlockLocations(), status.getLength(), status.getModificationTime(), isSplittable, fileCacheValue.getPartitionValues(), new HiveSplitCreator(fileCacheValue.getAcidInfo()))); @@ -336,8 +372,24 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List allFiles, List hiveFileStatuses) throws IOException { + // Collect file sizes for split size adjustment + List fileSizes = Lists.newArrayList(); + long representativeBlockSize = 0; + for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { + fileSizes.add(status.getLength()); + if (representativeBlockSize == 0 && status.getBlockSize() > 0) { + representativeBlockSize = status.getBlockSize(); + } + } + + // Calculate base split size and adjust if needed to limit total split count + long baseSplitSize = getRealFileSplitSize(representativeBlockSize); + long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize); + + // Split files using the adjusted split size for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) { - allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()), + allFiles.addAll(FileSplitter.splitFile(status.getPath(), + adjustedSplitSize, status.getBlockLocations(), status.getLength(), status.getModificationTime(), status.isSplittable(), status.getPartitionValues(), new HiveSplitCreator(status.getAcidInfo()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 85aa0bdc96c3db..40f3b5b1f7322d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -47,6 +47,7 @@ import org.apache.doris.thrift.TTableFormatFileDesc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.paimon.data.BinaryRow; @@ -293,8 +294,26 @@ public List getSplits(int numBackends) throws UserException { // partition data. // And for counting the number of selected partitions for this paimon table. Map> partitionInfoMaps = new HashMap<>(); - // if applyCountPushdown is true, we can't split the DataSplit - long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0); + + // Collect file sizes for split size adjustment (only for native reader files that need splitting) + List fileSizes = Lists.newArrayList(); + if (!applyCountPushdown) { + for (DataSplit dataSplit : dataSplits) { + Optional> optRawFiles = dataSplit.convertToRawFiles(); + if (!forceJniScanner && supportNativeReader(optRawFiles)) { + List rawFiles = optRawFiles.get(); + for (RawFile file : rawFiles) { + fileSizes.add(file.length()); + } + } + } + } + + // Calculate base split size and adjust if needed to limit total split count + long baseSplitSize = applyCountPushdown ? Long.MAX_VALUE : getRealFileSplitSize(0); + long adjustedSplitSize = applyCountPushdown ? Long.MAX_VALUE + : adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize); + for (DataSplit dataSplit : dataSplits) { SplitStat splitStat = new SplitStat(); splitStat.setRowCount(dataSplit.rowCount()); @@ -335,7 +354,7 @@ public List getSplits(int numBackends) throws UserException { try { List dorisSplits = FileSplitter.splitFile( locationPath, - realFileSplitSize, + adjustedSplitSize, null, file.length(), -1, @@ -384,7 +403,7 @@ public List getSplits(int numBackends) throws UserException { // We need to set the target size for all splits so that we can calculate the // proportion of each split later. - splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize)); + splits.forEach(s -> s.setTargetSplitSize(adjustedSplitSize)); this.selectedPartitionNum = partitionInfoMaps.size(); return splits; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index e75675597622d3..fd7c9c770e30cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -146,10 +146,44 @@ public List getSplits(int numBackends) throws UserException { needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum); } + if (!needSplit) { + // No need to split, process files directly + for (TBrokerFileStatus fileStatus : fileStatuses) { + try { + splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()), + Long.MAX_VALUE, + null, fileStatus.getSize(), + fileStatus.getModificationTime(), fileStatus.isSplitable, null, + FileSplitCreator.DEFAULT)); + } catch (IOException e) { + LOG.warn("get file split failed for TVF: {}", fileStatus.getPath(), e); + throw new UserException(e); + } + } + return splits; + } + + // Collect file sizes for split size adjustment + List fileSizes = Lists.newArrayList(); + long representativeBlockSize = 0; + for (TBrokerFileStatus fileStatus : fileStatuses) { + fileSizes.add(fileStatus.getSize()); + if (representativeBlockSize == 0 && fileStatus.getBlockSize() > 0) { + representativeBlockSize = fileStatus.getBlockSize(); + } + } + + // Calculate base split size and adjust if needed to limit total split count + long baseSplitSize = getRealFileSplitSize(representativeBlockSize); + long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize); + + // Split files using the adjusted split size for (TBrokerFileStatus fileStatus : fileStatuses) { try { + // Use the adjusted split size, but still respect individual file's block size + long finalSplitSize = Math.max(adjustedSplitSize, getRealFileSplitSize(fileStatus.getBlockSize())); splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()), - getRealFileSplitSize(needSplit ? fileStatus.getBlockSize() : Long.MAX_VALUE), + finalSplitSize, null, fileStatus.getSize(), fileStatus.getModificationTime(), fileStatus.isSplitable, null, FileSplitCreator.DEFAULT)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 08df528997fa46..245e31fc209fd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -511,6 +511,10 @@ public class SessionVariable implements Serializable, Writable { // Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3. public static final String FILE_SPLIT_SIZE = "file_split_size"; + // Maximum total number of splits to prevent OOM when file_split_size is too small. + // The system will automatically adjust file_split_size if the estimated total split count exceeds this limit. + public static final String MAX_FILE_SPLITS_NUM = "max_file_splits_num"; + // Target file size in bytes for Iceberg write operations public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = "iceberg_write_target_file_size_bytes"; @@ -2162,6 +2166,21 @@ public boolean isEnableHboNonStrictMatchingMode() { @VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true) public long fileSplitSize = 0; + // Maximum total number of splits across all files. If the estimated total split count exceeds this value, + // the file_split_size will be automatically increased to limit the total split count. + // Default value 1000000 means at most 1000000 splits will be generated for the entire query. + // Set to 0 to disable this limit. + @VariableMgr.VarAttr( + name = MAX_FILE_SPLITS_NUM, + fuzzy = true, + description = {"所有文件的最大 split 总数,用于防止 file_split_size 设置过小时产生过多 split 导致 OOM。" + + "系统会在 split 前估算总数,如果超过限制会自动增大 file_split_size。默认值 1000000。设置为 0 表示不限制。", + "Maximum total number of splits across all files to prevent OOM when file_split_size is too small. " + + "The system will estimate the total split count before splitting and automatically increase " + + "file_split_size if needed. Default value is 1000000. Set to 0 to disable this limit."}, + needForward = true) + public int maxFileSplitsNum = 1000000; + // Target file size for Iceberg write operations // Default 0 means use config::iceberg_sink_max_file_size @VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, needForward = true) @@ -4200,6 +4219,14 @@ public void setFileSplitSize(long fileSplitSize) { this.fileSplitSize = fileSplitSize; } + public int getMaxFileSplitsNum() { + return maxFileSplitsNum; + } + + public void setMaxFileSplitsNum(int maxFileSplitsNum) { + this.maxFileSplitsNum = maxFileSplitsNum; + } + public long getIcebergWriteTargetFileSizeBytes() { return icebergWriteTargetFileSizeBytes; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java new file mode 100644 index 00000000000000..3edd142658cdc4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.fs.DirectoryLister; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class HiveScanNodeTest { + @Mock + private SessionVariable sv; + + @Mock + private HMSExternalTable hmsTable; + + @Mock + private DirectoryLister directoryLister; + + @Mock + private HiveMetaStoreCache cache; + + @Test + public void testMaxFileSplitsNum() throws UserException, IOException { + TupleDescriptor desc = new TupleDescriptor(new TupleId(3)); + // Use reflection to set the table field + try { + java.lang.reflect.Field tableField = TupleDescriptor.class.getDeclaredField("table"); + tableField.setAccessible(true); + tableField.set(desc, hmsTable); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + // Mock required for constructor + org.apache.doris.datasource.hive.HMSExternalCatalog mockCatalog = Mockito.mock(org.apache.doris.datasource.hive.HMSExternalCatalog.class); + Mockito.when(hmsTable.getCatalog()).thenReturn(mockCatalog); + Mockito.when(mockCatalog.bindBrokerName()).thenReturn(null); + + HiveScanNode hiveScanNode = new HiveScanNode(new PlanNodeId(1), desc, false, sv, directoryLister); + + // Create large files that would generate many splits with small file_split_size + // Total size: 1GB = 1024 * 1024 * 1024 bytes + long totalFileSize = 1024L * 1024 * 1024; + long fileSize1 = 400L * 1024 * 1024; // 400MB + long fileSize2 = 300L * 1024 * 1024; // 300MB + long fileSize3 = 324L * 1024 * 1024; // 324MB + + HiveMetaStoreCache.HiveFileStatus fileStatus1 = new HiveMetaStoreCache.HiveFileStatus(); + fileStatus1.setPath(LocationPath.of("file:///test/f1.parquet")); + fileStatus1.setLength(fileSize1); + fileStatus1.setBlockSize(128L * 1024 * 1024); // 128MB block size + fileStatus1.setModificationTime(System.currentTimeMillis()); + fileStatus1.setSplittable(true); + fileStatus1.setBlockLocations(null); + + HiveMetaStoreCache.HiveFileStatus fileStatus2 = new HiveMetaStoreCache.HiveFileStatus(); + fileStatus2.setPath(LocationPath.of("file:///test/f2.parquet")); + fileStatus2.setLength(fileSize2); + fileStatus2.setBlockSize(128L * 1024 * 1024); + fileStatus2.setModificationTime(System.currentTimeMillis()); + fileStatus2.setSplittable(true); + fileStatus2.setBlockLocations(null); + + HiveMetaStoreCache.HiveFileStatus fileStatus3 = new HiveMetaStoreCache.HiveFileStatus(); + fileStatus3.setPath(LocationPath.of("file:///test/f3.parquet")); + fileStatus3.setLength(fileSize3); + fileStatus3.setBlockSize(128L * 1024 * 1024); + fileStatus3.setModificationTime(System.currentTimeMillis()); + fileStatus3.setSplittable(true); + fileStatus3.setBlockLocations(null); + + HiveMetaStoreCache.FileCacheValue fileCacheValue = new HiveMetaStoreCache.FileCacheValue(); + fileCacheValue.getFiles().add(fileStatus1); + fileCacheValue.getFiles().add(fileStatus2); + fileCacheValue.getFiles().add(fileStatus3); + fileCacheValue.setSplittable(true); + fileCacheValue.setPartitionValues(Collections.emptyList()); + + List fileCaches = new ArrayList<>(); + fileCaches.add(fileCacheValue); + + // Base split size for testing + long baseSplitSize = 10L * 1024 * 1024; // 10MB, small split size + + // Use reflection to access protected method adjustSplitSizeForTotalLimit + HiveScanNode spyHiveScanNode = Mockito.spy(hiveScanNode); + java.lang.reflect.Method adjustMethod; + try { + adjustMethod = org.apache.doris.datasource.FileQueryScanNode.class + .getDeclaredMethod("adjustSplitSizeForTotalLimit", List.class, long.class); + adjustMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + + // Test case 1: max_file_splits_num = 50 (should limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(50); + // Manually call the adjustment logic to test + List fileSizes = new ArrayList<>(); + fileSizes.add(fileSize1); + fileSizes.add(fileSize2); + fileSizes.add(fileSize3); + long minExpectedSplitSize1 = (totalFileSize + 50 - 1) / 50; + long adjustedSplitSize1; + try { + adjustedSplitSize1 = (Long) adjustMethod.invoke(spyHiveScanNode, fileSizes, baseSplitSize); + } catch (IllegalAccessException | java.lang.reflect.InvocationTargetException e) { + throw new RuntimeException(e); + } + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize1 + ", got: " + adjustedSplitSize1, + adjustedSplitSize1 >= minExpectedSplitSize1); + + // Test case 2: max_file_splits_num = 20 (should further limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(20); + long minExpectedSplitSize2 = (totalFileSize + 20 - 1) / 20; + long adjustedSplitSize2; + try { + adjustedSplitSize2 = (Long) adjustMethod.invoke(spyHiveScanNode, fileSizes, baseSplitSize); + } catch (IllegalAccessException | java.lang.reflect.InvocationTargetException e) { + throw new RuntimeException(e); + } + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize2 + ", got: " + adjustedSplitSize2, + adjustedSplitSize2 >= minExpectedSplitSize2); + + // Test case 3: max_file_splits_num = 0 (no limit) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(0); + long adjustedSplitSize3; + try { + adjustedSplitSize3 = (Long) adjustMethod.invoke(spyHiveScanNode, fileSizes, baseSplitSize); + } catch (IllegalAccessException | java.lang.reflect.InvocationTargetException e) { + throw new RuntimeException(e); + } + Assert.assertEquals("Without limit, should use base split size", + baseSplitSize, adjustedSplitSize3); + + // Test case 4: max_file_splits_num = 200 (large limit, should not adjust) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(200); + long adjustedSplitSize4; + try { + adjustedSplitSize4 = (Long) adjustMethod.invoke(spyHiveScanNode, fileSizes, baseSplitSize); + } catch (IllegalAccessException | java.lang.reflect.InvocationTargetException e) { + throw new RuntimeException(e); + } + // With large limit, should use original split size + Assert.assertEquals("With large limit, should use base split size", + baseSplitSize, adjustedSplitSize4); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java index 93afa390530e6e..98ad5c16b9da94 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java @@ -364,6 +364,119 @@ public void testValidateIncrementalReadParams() throws UserException { } } + @Test + public void testMaxFileSplitsNum() throws UserException { + TupleDescriptor desc = new TupleDescriptor(new TupleId(3)); + PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), desc, false, sv); + paimonScanNode.setSource(new PaimonSource()); + + // Create large files that would generate many splits with small file_split_size + // Total size: 1GB = 1024 * 1024 * 1024 bytes (smaller for faster test) + long totalFileSize = 1024L * 1024 * 1024; + long fileSize1 = 400L * 1024 * 1024; // 400MB + long fileSize2 = 300L * 1024 * 1024; // 300MB + long fileSize3 = 324L * 1024 * 1024; // 324MB + + DataFileMeta dfm1 = DataFileMeta.forAppend("f1.parquet", fileSize1, 1, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + DataFileMeta dfm2 = DataFileMeta.forAppend("f2.parquet", fileSize2, 2, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + DataFileMeta dfm3 = DataFileMeta.forAppend("f3.parquet", fileSize3, 3, SimpleStats.EMPTY_STATS, 1, 1, 1, + Collections.emptyList(), null, null, null, null); + + BinaryRow binaryRow = BinaryRow.singleColumn(1); + DataSplit ds1 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow) + .withBucket(1) + .withBucketPath("file://b1") + .withDataFiles(Collections.singletonList(dfm1)) + .build(); + + DataSplit ds2 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow) + .withBucket(1) + .withBucketPath("file://b1") + .withDataFiles(Collections.singletonList(dfm2)) + .build(); + + DataSplit ds3 = DataSplit.builder() + .rawConvertible(true) + .withPartition(binaryRow) + .withBucket(1) + .withBucketPath("file://b1") + .withDataFiles(Collections.singletonList(dfm3)) + .build(); + + PaimonScanNode spyPaimonScanNode = Mockito.spy(paimonScanNode); + Mockito.doReturn(new ArrayList() { + { + add(ds1); + add(ds2); + add(ds3); + } + }).when(spyPaimonScanNode).getPaimonSplitFromAPI(); + + // Mock SessionVariable behavior + Mockito.when(sv.isForceJniScanner()).thenReturn(false); + Mockito.when(sv.getIgnoreSplitType()).thenReturn("NONE"); + long baseSplitSize = 10L * 1024 * 1024; // 10MB, small split size + Mockito.when(sv.getFileSplitSize()).thenReturn(baseSplitSize); + mockNativeReader(spyPaimonScanNode); + + // Test case 1: max_file_splits_num = 50 (should limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(50); + List splits1 = spyPaimonScanNode.getSplits(1); + // With 1GB total and 10MB split size, would generate ~102 splits without limit + // With limit of 50, should generate at most 50 splits (allow small tolerance due to split algorithm) + Assert.assertTrue("Split count should be limited to around 50, actual: " + splits1.size(), + splits1.size() <= 52); // Allow small tolerance for split algorithm boundary cases + // Verify split size was adjusted: minSplitSize = ceil(1GB / 50) = ~20MB + long minExpectedSplitSize1 = (totalFileSize + 50 - 1) / 50; + for (org.apache.doris.spi.Split split : splits1) { + PaimonSplit paimonSplit = (PaimonSplit) split; + Assert.assertNotNull("Split should have target split size", paimonSplit.getTargetSplitSize()); + // Adjusted split size should be at least ceil(totalFileSize / maxFileSplitsNum) + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize1 + ", got: " + paimonSplit.getTargetSplitSize(), + paimonSplit.getTargetSplitSize() >= minExpectedSplitSize1); + } + + // Test case 2: max_file_splits_num = 20 (should further limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(20); + List splits2 = spyPaimonScanNode.getSplits(1); + Assert.assertTrue("Split count should be limited to around 20, actual: " + splits2.size(), + splits2.size() <= 22); // Allow small tolerance for split algorithm boundary cases + // Adjusted split size should be at least ceil(1GB / 20) = ~50MB + long minExpectedSplitSize2 = (totalFileSize + 20 - 1) / 20; + for (org.apache.doris.spi.Split split : splits2) { + PaimonSplit paimonSplit = (PaimonSplit) split; + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize2 + ", got: " + paimonSplit.getTargetSplitSize(), + paimonSplit.getTargetSplitSize() >= minExpectedSplitSize2); + } + + // Test case 3: max_file_splits_num = 0 (no limit) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(0); + List splits3 = spyPaimonScanNode.getSplits(1); + // Without limit, should generate splits based on file_split_size (10MB) + // Expected: approximately ceil(1GB / 10MB) = 102 splits + long expectedSplitsWithoutLimit = (totalFileSize + baseSplitSize - 1) / baseSplitSize; + Assert.assertTrue("Without limit, should generate more splits. Expected around: " + + expectedSplitsWithoutLimit + ", got: " + splits3.size(), + splits3.size() >= expectedSplitsWithoutLimit - 5); // Allow some tolerance + + // Test case 4: max_file_splits_num = 200 (large limit, should not adjust) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(200); + List splits4 = spyPaimonScanNode.getSplits(1); + // With large limit, should use original split size (10MB) + // Should generate approximately the same number of splits as case 3 + Assert.assertTrue("With large limit, should generate similar number of splits. Expected around: " + + expectedSplitsWithoutLimit + ", got: " + splits4.size(), + splits4.size() >= expectedSplitsWithoutLimit - 5); + } + private void mockJniReader(PaimonScanNode spyNode) { Mockito.doReturn(false).when(spyNode).supportNativeReader(ArgumentMatchers.any(Optional.class)); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java new file mode 100644 index 00000000000000..a84ece702b684e --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.tvf.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.FunctionGenTable; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.FileSplit; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.spi.Split; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TFileType; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.ArrayList; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class TVFScanNodeTest { + @Mock + private SessionVariable sv; + + @Mock + private FunctionGenTable functionGenTable; + + @Mock + private ExternalFileTableValuedFunction tableValuedFunction; + + @Test + public void testMaxFileSplitsNum() throws UserException { + TupleDescriptor desc = new TupleDescriptor(new TupleId(3)); + Mockito.when(functionGenTable.getTvf()).thenReturn(tableValuedFunction); + desc.setTable(functionGenTable); + Mockito.when(tableValuedFunction.getTFileType()).thenReturn(TFileType.FILE_LOCAL); + + TVFScanNode tvfScanNode = new TVFScanNode(new PlanNodeId(1), desc, false, sv); + + // Create large files that would generate many splits with small file_split_size + // Total size: 1GB = 1024 * 1024 * 1024 bytes + long totalFileSize = 1024L * 1024 * 1024; + long fileSize1 = 400L * 1024 * 1024; // 400MB + long fileSize2 = 300L * 1024 * 1024; // 300MB + long fileSize3 = 324L * 1024 * 1024; // 324MB + + TBrokerFileStatus fileStatus1 = new TBrokerFileStatus(); + fileStatus1.setPath("file:///test/f1.parquet"); + fileStatus1.setSize(fileSize1); + fileStatus1.setBlockSize(128L * 1024 * 1024); // 128MB block size + fileStatus1.setModificationTime(System.currentTimeMillis()); + fileStatus1.setIsSplitable(true); + + TBrokerFileStatus fileStatus2 = new TBrokerFileStatus(); + fileStatus2.setPath("file:///test/f2.parquet"); + fileStatus2.setSize(fileSize2); + fileStatus2.setBlockSize(128L * 1024 * 1024); + fileStatus2.setModificationTime(System.currentTimeMillis()); + fileStatus2.setIsSplitable(true); + + TBrokerFileStatus fileStatus3 = new TBrokerFileStatus(); + fileStatus3.setPath("file:///test/f3.parquet"); + fileStatus3.setSize(fileSize3); + fileStatus3.setBlockSize(128L * 1024 * 1024); + fileStatus3.setModificationTime(System.currentTimeMillis()); + fileStatus3.setIsSplitable(true); + + List fileStatuses = new ArrayList<>(); + fileStatuses.add(fileStatus1); + fileStatuses.add(fileStatus2); + fileStatuses.add(fileStatus3); + + Mockito.when(tableValuedFunction.getFileStatuses()).thenReturn(fileStatuses); + + // Base split size for testing + long baseSplitSize = 10L * 1024 * 1024; // 10MB, small split size + Mockito.when(sv.getFileSplitSize()).thenReturn(baseSplitSize); + + // Test case 1: max_file_splits_num = 50 (should limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(50); + List splits1 = tvfScanNode.getSplits(1); + // With 1GB total and 10MB split size, would generate ~102 splits without limit + // With limit of 50, should generate at most 50 splits (allow small tolerance due to split algorithm) + Assert.assertTrue("Split count should be limited to around 50, actual: " + splits1.size(), + splits1.size() <= 52); // Allow small tolerance for split algorithm boundary cases + // Verify split size was adjusted: minSplitSize = ceil(1GB / 50) = ~20MB + long minExpectedSplitSize1 = (totalFileSize + 50 - 1) / 50; + for (Split split : splits1) { + FileSplit fileSplit = (FileSplit) split; + Assert.assertNotNull("Split should have target split size", fileSplit.getTargetSplitSize()); + // Adjusted split size should be at least ceil(totalFileSize / maxFileSplitsNum) + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize1 + ", got: " + fileSplit.getTargetSplitSize(), + fileSplit.getTargetSplitSize() >= minExpectedSplitSize1); + } + + // Test case 2: max_file_splits_num = 20 (should further limit split count) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(20); + List splits2 = tvfScanNode.getSplits(1); + Assert.assertTrue("Split count should be limited to around 20, actual: " + splits2.size(), + splits2.size() <= 22); // Allow small tolerance for split algorithm boundary cases + // Adjusted split size should be at least ceil(1GB / 20) = ~50MB + long minExpectedSplitSize2 = (totalFileSize + 20 - 1) / 20; + for (Split split : splits2) { + FileSplit fileSplit = (FileSplit) split; + Assert.assertTrue("Split size should be adjusted to limit split count. Expected at least: " + + minExpectedSplitSize2 + ", got: " + fileSplit.getTargetSplitSize(), + fileSplit.getTargetSplitSize() >= minExpectedSplitSize2); + } + + // Test case 3: max_file_splits_num = 0 (no limit) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(0); + List splits3 = tvfScanNode.getSplits(1); + // Without limit, should generate splits based on file_split_size (10MB) + // Expected: approximately ceil(1GB / 10MB) = 102 splits + long expectedSplitsWithoutLimit = (totalFileSize + baseSplitSize - 1) / baseSplitSize; + Assert.assertTrue("Without limit, should generate more splits. Expected around: " + + expectedSplitsWithoutLimit + ", got: " + splits3.size(), + splits3.size() >= expectedSplitsWithoutLimit - 5); // Allow some tolerance + + // Test case 4: max_file_splits_num = 200 (large limit, should not adjust) + Mockito.when(sv.getMaxFileSplitsNum()).thenReturn(200); + List splits4 = tvfScanNode.getSplits(1); + // With large limit, should use original split size (10MB) + // Should generate approximately the same number of splits as case 3 + Assert.assertTrue("With large limit, should generate similar number of splits. Expected around: " + + expectedSplitsWithoutLimit + ", got: " + splits4.size(), + splits4.size() >= expectedSplitsWithoutLimit - 5); + } +} diff --git a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy index 4117501eff2c3c..44dd9104411196 100644 --- a/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy +++ b/regression-test/suites/external_table_p0/hive/test_hive_compress_type.groovy @@ -48,7 +48,7 @@ suite("test_hive_compress_type", "p0,external,hive,external_docker,external_dock sql """set file_split_size=8388608""" explain { sql("select count(*) from test_compress_partitioned") - contains "inputSplitNum=82, totalFileSize=734675596, scanRanges=82" + contains "inputSplitNum=16, totalFileSize=734675596, scanRanges=16" contains "partition=8/8" }