Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -633,4 +633,58 @@ protected long getRealFileSplitSize(long blockSize) {
}
return realSplitSize;
}

/**
* Estimate the total number of splits based on file sizes and split size,
* and adjust the split size if the estimated total exceeds the limit.
*
* @param fileSizes list of file sizes in bytes
* @param baseSplitSize the base split size to use (from getRealFileSplitSize)
* @return the adjusted split size that ensures total split count doesn't exceed maxFileSplitsNum
*/
protected long adjustSplitSizeForTotalLimit(List<Long> fileSizes, long baseSplitSize) {
int maxFileSplitsNum = sessionVariable.getMaxFileSplitsNum();
if (maxFileSplitsNum <= 0 || fileSizes.isEmpty()) {
return baseSplitSize;
}

// Estimate total split count with current split size
long estimatedTotalSplits = 0;
for (long fileSize : fileSizes) {
if (fileSize > 0) {
// Estimate splits for this file: ceil(fileSize / splitSize)
long splitsForFile = (fileSize + baseSplitSize - 1) / baseSplitSize;
estimatedTotalSplits += splitsForFile;
}
}

// If estimated total is within limit, use the base split size
if (estimatedTotalSplits <= maxFileSplitsNum) {
return baseSplitSize;
}

// Calculate total file size
long totalFileSize = 0;
for (long fileSize : fileSizes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You iterate fileSizes twice.
Only need one

totalFileSize += fileSize;
}

if (totalFileSize <= 0) {
return baseSplitSize;
}

// Calculate the minimum split size needed to stay within the limit
// minSplitSize = ceil(totalFileSize / maxFileSplitsNum)
long minSplitSize = (totalFileSize + maxFileSplitsNum - 1) / maxFileSplitsNum;

// Use the larger of the base split size and the minimum required split size
long adjustedSplitSize = Math.max(baseSplitSize, minSplitSize);

if (LOG.isDebugEnabled()) {
LOG.debug("Estimated total splits: {}, max allowed: {}, adjusted split size from {} to {}",
estimatedTotalSplits, maxFileSplitsNum, baseSplitSize, adjustedSplitSize);
}

return adjustedSplitSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,49 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti
int parallelNum = sessionVariable.getParallelExecInstanceNum();
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum);
}

if (!needSplit) {
// No need to split, process files directly
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
allFiles.addAll(FileSplitter.splitFile(status.getPath(),
Long.MAX_VALUE,
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
isSplittable, fileCacheValue.getPartitionValues(),
new HiveSplitCreator(fileCacheValue.getAcidInfo())));
}
}
}
return;
}

// Collect file sizes for split size adjustment
List<Long> fileSizes = Lists.newArrayList();
long representativeBlockSize = 0;
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
fileSizes.add(status.getLength());
if (representativeBlockSize == 0 && status.getBlockSize() > 0) {
representativeBlockSize = status.getBlockSize();
}
}
}
}

// Calculate base split size and adjust if needed to limit total split count
long baseSplitSize = getRealFileSplitSize(representativeBlockSize);
long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize);

// Split files using the adjusted split size
for (HiveMetaStoreCache.FileCacheValue fileCacheValue : fileCaches) {
if (fileCacheValue.getFiles() != null) {
boolean isSplittable = fileCacheValue.isSplittable();
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
allFiles.addAll(FileSplitter.splitFile(status.getPath(),
// set block size to Long.MAX_VALUE to avoid splitting the file.
getRealFileSplitSize(needSplit ? status.getBlockSize() : Long.MAX_VALUE),
adjustedSplitSize,
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
isSplittable, fileCacheValue.getPartitionValues(),
new HiveSplitCreator(fileCacheValue.getAcidInfo())));
Expand All @@ -336,8 +372,24 @@ private void getFileSplitByPartitions(HiveMetaStoreCache cache, List<HivePartiti

private void splitAllFiles(List<Split> allFiles,
List<HiveMetaStoreCache.HiveFileStatus> hiveFileStatuses) throws IOException {
// Collect file sizes for split size adjustment
List<Long> fileSizes = Lists.newArrayList();
long representativeBlockSize = 0;
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
fileSizes.add(status.getLength());
if (representativeBlockSize == 0 && status.getBlockSize() > 0) {
representativeBlockSize = status.getBlockSize();
}
}

// Calculate base split size and adjust if needed to limit total split count
long baseSplitSize = getRealFileSplitSize(representativeBlockSize);
long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize);

// Split files using the adjusted split size
for (HiveMetaStoreCache.HiveFileStatus status : hiveFileStatuses) {
allFiles.addAll(FileSplitter.splitFile(status.getPath(), getRealFileSplitSize(status.getBlockSize()),
allFiles.addAll(FileSplitter.splitFile(status.getPath(),
adjustedSplitSize,
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.data.BinaryRow;
Expand Down Expand Up @@ -293,8 +294,26 @@ public List<Split> getSplits(int numBackends) throws UserException {
// partition data.
// And for counting the number of selected partitions for this paimon table.
Map<BinaryRow, Map<String, String>> partitionInfoMaps = new HashMap<>();
// if applyCountPushdown is true, we can't split the DataSplit
long realFileSplitSize = getRealFileSplitSize(applyCountPushdown ? Long.MAX_VALUE : 0);

// Collect file sizes for split size adjustment (only for native reader files that need splitting)
List<Long> fileSizes = Lists.newArrayList();
if (!applyCountPushdown) {
for (DataSplit dataSplit : dataSplits) {
Optional<List<RawFile>> optRawFiles = dataSplit.convertToRawFiles();
if (!forceJniScanner && supportNativeReader(optRawFiles)) {
List<RawFile> rawFiles = optRawFiles.get();
for (RawFile file : rawFiles) {
fileSizes.add(file.length());
}
}
}
}

// Calculate base split size and adjust if needed to limit total split count
long baseSplitSize = applyCountPushdown ? Long.MAX_VALUE : getRealFileSplitSize(0);
long adjustedSplitSize = applyCountPushdown ? Long.MAX_VALUE
: adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize);

for (DataSplit dataSplit : dataSplits) {
SplitStat splitStat = new SplitStat();
splitStat.setRowCount(dataSplit.rowCount());
Expand Down Expand Up @@ -335,7 +354,7 @@ public List<Split> getSplits(int numBackends) throws UserException {
try {
List<Split> dorisSplits = FileSplitter.splitFile(
locationPath,
realFileSplitSize,
adjustedSplitSize,
null,
file.length(),
-1,
Expand Down Expand Up @@ -384,7 +403,7 @@ public List<Split> getSplits(int numBackends) throws UserException {

// We need to set the target size for all splits so that we can calculate the
// proportion of each split later.
splits.forEach(s -> s.setTargetSplitSize(realFileSplitSize));
splits.forEach(s -> s.setTargetSplitSize(adjustedSplitSize));

this.selectedPartitionNum = partitionInfoMaps.size();
return splits;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,44 @@ public List<Split> getSplits(int numBackends) throws UserException {
needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, numBackends, totalFileNum);
}

if (!needSplit) {
// No need to split, process files directly
for (TBrokerFileStatus fileStatus : fileStatuses) {
try {
splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()),
Long.MAX_VALUE,
null, fileStatus.getSize(),
fileStatus.getModificationTime(), fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
} catch (IOException e) {
LOG.warn("get file split failed for TVF: {}", fileStatus.getPath(), e);
throw new UserException(e);
}
}
return splits;
}

// Collect file sizes for split size adjustment
List<Long> fileSizes = Lists.newArrayList();
long representativeBlockSize = 0;
for (TBrokerFileStatus fileStatus : fileStatuses) {
fileSizes.add(fileStatus.getSize());
if (representativeBlockSize == 0 && fileStatus.getBlockSize() > 0) {
representativeBlockSize = fileStatus.getBlockSize();
}
}

// Calculate base split size and adjust if needed to limit total split count
long baseSplitSize = getRealFileSplitSize(representativeBlockSize);
long adjustedSplitSize = adjustSplitSizeForTotalLimit(fileSizes, baseSplitSize);

// Split files using the adjusted split size
for (TBrokerFileStatus fileStatus : fileStatuses) {
try {
// Use the adjusted split size, but still respect individual file's block size
long finalSplitSize = Math.max(adjustedSplitSize, getRealFileSplitSize(fileStatus.getBlockSize()));
splits.addAll(FileSplitter.splitFile(LocationPath.of(fileStatus.getPath()),
getRealFileSplitSize(needSplit ? fileStatus.getBlockSize() : Long.MAX_VALUE),
finalSplitSize,
null, fileStatus.getSize(),
fileStatus.getModificationTime(), fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
Expand Down
27 changes: 27 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,10 @@ public class SessionVariable implements Serializable, Writable {
// Split size for ExternalFileScanNode. Default value 0 means use the block size of HDFS/S3.
public static final String FILE_SPLIT_SIZE = "file_split_size";

// Maximum total number of splits to prevent OOM when file_split_size is too small.
// The system will automatically adjust file_split_size if the estimated total split count exceeds this limit.
public static final String MAX_FILE_SPLITS_NUM = "max_file_splits_num";

// Target file size in bytes for Iceberg write operations
public static final String ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES = "iceberg_write_target_file_size_bytes";

Expand Down Expand Up @@ -2162,6 +2166,21 @@ public boolean isEnableHboNonStrictMatchingMode() {
@VariableMgr.VarAttr(name = FILE_SPLIT_SIZE, needForward = true)
public long fileSplitSize = 0;

// Maximum total number of splits across all files. If the estimated total split count exceeds this value,
// the file_split_size will be automatically increased to limit the total split count.
// Default value 1000000 means at most 1000000 splits will be generated for the entire query.
// Set to 0 to disable this limit.
@VariableMgr.VarAttr(
name = MAX_FILE_SPLITS_NUM,
fuzzy = true,
description = {"所有文件的最大 split 总数,用于防止 file_split_size 设置过小时产生过多 split 导致 OOM。"
+ "系统会在 split 前估算总数,如果超过限制会自动增大 file_split_size。默认值 1000000。设置为 0 表示不限制。",
"Maximum total number of splits across all files to prevent OOM when file_split_size is too small. "
+ "The system will estimate the total split count before splitting and automatically increase "
+ "file_split_size if needed. Default value is 1000000. Set to 0 to disable this limit."},
needForward = true)
public int maxFileSplitsNum = 1000000;

// Target file size for Iceberg write operations
// Default 0 means use config::iceberg_sink_max_file_size
@VariableMgr.VarAttr(name = ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES, needForward = true)
Expand Down Expand Up @@ -4200,6 +4219,14 @@ public void setFileSplitSize(long fileSplitSize) {
this.fileSplitSize = fileSplitSize;
}

public int getMaxFileSplitsNum() {
return maxFileSplitsNum;
}

public void setMaxFileSplitsNum(int maxFileSplitsNum) {
this.maxFileSplitsNum = maxFileSplitsNum;
}

public long getIcebergWriteTargetFileSizeBytes() {
return icebergWriteTargetFileSizeBytes;
}
Expand Down
Loading