Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.system.Backend;

import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -69,6 +71,11 @@ public RewriteResult executeGroupsConcurrently(List<RewriteDataGroup> groups, lo
List<RewriteGroupTask> tasks = Lists.newArrayList();
RewriteResultCollector resultCollector = new RewriteResultCollector(groups.size(), tasks);

// Get available BE count once before creating tasks
// This avoids calling getBackendsNumber() in each task during multi-threaded execution.
// Use compute group from connect context to align with actual BE selection for queries.
int availableBeCount = getAvailableBeCount();

// Create tasks with callbacks
for (RewriteDataGroup group : groups) {
RewriteGroupTask task = new RewriteGroupTask(
Expand All @@ -77,6 +84,7 @@ public RewriteResult executeGroupsConcurrently(List<RewriteDataGroup> groups, lo
dorisTable,
connectContext,
targetFileSizeBytes,
availableBeCount,
new RewriteGroupTask.RewriteResultCallback() {
@Override
public void onTaskCompleted(Long taskId) {
Expand Down Expand Up @@ -153,6 +161,18 @@ private void waitForTasksCompletion(RewriteResultCollector collector, int totalT
}
}

private int getAvailableBeCount() throws UserException {
ComputeGroup computeGroup = connectContext.getComputeGroup();
List<Backend> backends = computeGroup.getBackendList();
int availableBeCount = 0;
for (Backend backend : backends) {
if (backend.isAlive()) {
availableBeCount++;
}
}
return availableBeCount;
}

/**
* Result collector for concurrent rewrite tasks
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class RewriteGroupTask implements TransientTaskExecutor {
private final Long taskId;
private final AtomicBoolean isCanceled;
private final AtomicBoolean isFinished;
private final int availableBeCount;

// for canceling the task
private StmtExecutor stmtExecutor;
Expand All @@ -73,12 +74,14 @@ public RewriteGroupTask(RewriteDataGroup group,
IcebergExternalTable dorisTable,
ConnectContext connectContext,
long targetFileSizeBytes,
int availableBeCount,
RewriteResultCallback resultCallback) {
this.group = group;
this.transactionId = transactionId;
this.dorisTable = dorisTable;
this.connectContext = connectContext;
this.targetFileSizeBytes = targetFileSizeBytes;
this.availableBeCount = availableBeCount;
this.resultCallback = resultCallback;
this.taskId = UUID.randomUUID().getMostSignificantBits();
this.isCanceled = new AtomicBoolean(false);
Expand Down Expand Up @@ -234,6 +237,11 @@ private ConnectContext buildConnectContext() {
// Clone session variables from parent
taskContext.setSessionVariable(VariableMgr.cloneSessionVariable(connectContext.getSessionVariable()));

// Calculate optimal parallelism and determine distribution strategy
RewriteStrategy strategy = calculateRewriteStrategy();
// Pipeline engine uses parallelPipelineTaskNum to control instance parallelism.
taskContext.getSessionVariable().parallelPipelineTaskNum = strategy.parallelism;

// Set env and basic identities
taskContext.setEnv(Env.getCurrentEnv());
taskContext.setDatabase(connectContext.getDatabase());
Expand All @@ -252,9 +260,74 @@ private ConnectContext buildConnectContext() {
statementContext.setConnectContext(taskContext);
taskContext.setStatementContext(statementContext);

// Set GATHER distribution flag if needed (for small data rewrite)
statementContext.setUseGatherForIcebergRewrite(strategy.useGather);

return taskContext;
}

/**
* Calculate optimal rewrite strategy including parallelism and distribution mode.
*
* The core idea is to precisely control the number of output files:
* 1. Calculate expected file count based on data size and target file size
* 2. If expected file count is less than available BE count, use GATHER
* to collect data to a single node, avoiding excessive writers
* 3. Otherwise, limit per-BE parallelism so total writers <= expected files
*
* @return RewriteStrategy containing parallelism and distribution settings
*/
private RewriteStrategy calculateRewriteStrategy() {
// 1. Calculate expected output file count based on data size
long totalSize = group.getTotalSize();
int expectedFileCount = (int) Math.ceil((double) totalSize / targetFileSizeBytes);

// 2. Use available BE count passed from constructor
int availableBeCount = this.availableBeCount;
Preconditions.checkState(availableBeCount > 0,
"availableBeCount must be greater than 0 for rewrite task");

// 3. Get default parallelism from session variable (pipeline task num)
int defaultParallelism = connectContext.getSessionVariable().getParallelExecInstanceNum();

// 4. Determine strategy based on expected file count
boolean useGather = false;
int optimalParallelism;

// When expected files < available BEs, collect all data to single node
if (expectedFileCount < availableBeCount) {
// Small data volume: use GATHER to write to single node
// Keep parallelism <= expected files to avoid extra output files
useGather = true;
optimalParallelism = Math.max(1, Math.min(defaultParallelism, expectedFileCount));
} else {
// Larger data volume: limit per-BE parallelism so total writers <= expected files
int maxParallelismByFileCount = Math.max(1, expectedFileCount / availableBeCount);
optimalParallelism = Math.max(1, Math.min(defaultParallelism, maxParallelismByFileCount));
}

LOG.info("[Rewrite Task] taskId: {}, totalSize: {} bytes, targetFileSize: {} bytes, "
+ "expectedFileCount: {}, availableBeCount: {}, defaultParallelism: {}, "
+ "optimalParallelism: {}, useGather: {}",
taskId, totalSize, targetFileSizeBytes, expectedFileCount,
availableBeCount, defaultParallelism, optimalParallelism, useGather);

return new RewriteStrategy(optimalParallelism, useGather);
}

/**
* Strategy for rewrite operation containing parallelism and distribution settings.
*/
private static class RewriteStrategy {
final int parallelism;
final boolean useGather;

RewriteStrategy(int parallelism, boolean useGather) {
this.parallelism = parallelism;
this.useGather = useGather;
}
}

/**
* Callback interface for task completion
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ public enum TableFrom {
// For Iceberg rewrite operations: store file scan tasks to be used by IcebergScanNode
// TODO: better solution?
private List<org.apache.iceberg.FileScanTask> icebergRewriteFileScanTasks = null;
// For Iceberg rewrite operations: control whether to use GATHER distribution
// When true, data will be collected to a single node to avoid generating too many small files
private boolean useGatherForIcebergRewrite = false;
private boolean hasNestedColumns;

public StatementContext() {
Expand Down Expand Up @@ -1045,6 +1048,21 @@ public List<org.apache.iceberg.FileScanTask> getAndClearIcebergRewriteFileScanTa
return tasks;
}

/**
* Set whether to use GATHER distribution for Iceberg rewrite operations.
* When enabled, data will be collected to a single node to minimize output files.
*/
public void setUseGatherForIcebergRewrite(boolean useGather) {
this.useGatherForIcebergRewrite = useGather;
}

/**
* Check if GATHER distribution should be used for Iceberg rewrite operations.
*/
public boolean isUseGatherForIcebergRewrite() {
return this.useGatherForIcebergRewrite;
}

public boolean isSkipPrunePredicate() {
return skipPrunePredicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;

import java.util.ArrayList;
Expand Down Expand Up @@ -110,6 +111,15 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
// For Iceberg rewrite operations with small data volume,
// use GATHER distribution to collect data to a single node
// This helps minimize the number of output files
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && connectContext.getStatementContext() != null
&& connectContext.getStatementContext().isUseGatherForIcebergRewrite()) {
return PhysicalProperties.GATHER;
}

Set<String> partitionNames = targetTable.getPartitionNames();
if (!partitionNames.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
Expand Down
Loading
Loading