Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ public AbstractUnassignedScanJob(StatementContext statementContext, PlanFragment
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}

/**
* Compute assigned scan jobs using a two-phase parallelization strategy:
* <ol>
* <li><b>Cross-machine parallelization</b> ({@link #multipleMachinesParallelization}):
* For each tablet / scan range, select the best replica and its hosting backend worker.
* This groups scan ranges by the worker that will process them.</li>
* <li><b>Intra-machine parallelization</b> ({@link #insideMachineParallelization}):
* Within each worker, split the assigned scan ranges into one or more instances
* based on the degree of parallelism. Supports local shuffle mode to further
* increase parallelism without rescanning data.</li>
* </ol>
* After both phases, {@link #fillUpAssignedJobs} provides a hook for subclasses to
* supply fallback instances when no workers could be selected (e.g. all tablets pruned).
*
* @param distributeContext the distribute context for worker selection and parallelism config
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return the list of assigned scan jobs, each bound to a worker with its tablet ranges
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand All @@ -59,16 +77,63 @@ public List<AssignedJob> computeAssignedJobs(
return fillUpAssignedJobs(assignedJobs, distributeContext.workerManager, inputJobs);
}

/**
* Hook for subclasses to supply fallback instances when the normal parallelization
* produces an empty result. For example, when all tablets of a table have been pruned
* (e.g. TABLET(1234) with a non-existent tablet id), this method can create a single
* empty instance to keep the fragment alive and return an empty result set.
*
* @param assignedJobs the list produced by {@link #insideMachineParallelization};
* may be empty if no workers could be selected
* @param workerManager the worker manager used to select a random fallback worker
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return the (possibly augmented) list of assigned jobs; default returns unchanged
*/
protected List<AssignedJob> fillUpAssignedJobs(
List<AssignedJob> assignedJobs,
DistributedPlanWorkerManager workerManager,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
return assignedJobs;
}

/**
* Cross-machine parallelization: for each tablet / scan range of the scan nodes
* in this fragment, select the best replica and its hosting {@link DistributedPlanWorker}.
* The result groups all scan ranges by the worker that will process them.
* <p>
* This is the first phase of the two-phase parallelization. The returned map drives
* the second phase ({@link #insideMachineParallelization}) where each worker's ranges
* are further split into individual instances.
*
* @param distributeContext the distribute context for worker selection and parallelism config
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return a map from selected worker to its {@link UninstancedScanSource} containing
* the raw scan ranges assigned to that worker, not yet split into instances
*/
protected abstract Map<DistributedPlanWorker, UninstancedScanSource> multipleMachinesParallelization(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs);

/**
* Intra-machine parallelization: for each worker, split its assigned scan ranges
* into one or more {@link AssignedJob} instances. This is the second phase of
* the two-phase parallelization, following {@link #multipleMachinesParallelization}.
* <p>
* For each worker entry, the method:
* <ol>
* <li>Computes the max parallelism from the scan source (e.g. tablet count).</li>
* <li>Determines the final instance count via {@link #degreeOfParallelism},
* capped by the fragment's {@code parallelExecNum} and tablet count.</li>
* <li>Splits scan ranges evenly across instances (default mode) or creates
* local shuffle instances that share a single scan source to add
* parallelism without rescanning data ({@link #assignLocalShuffleJobs}).</li>
* </ol>
*
* @param workerToScanRanges map from worker to its un-instanced scan ranges,
* produced by {@link #multipleMachinesParallelization}
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @param distributeContext the distribute context for parallelism configuration
* @return the list of assigned jobs, each bound to a worker with its portion of scan ranges
*/
protected List<AssignedJob> insideMachineParallelization(
Map<DistributedPlanWorker, UninstancedScanSource> workerToScanRanges,
ListMultimap<ExchangeNode, AssignedJob> inputJobs,
Expand Down Expand Up @@ -104,10 +169,33 @@ protected List<AssignedJob> insideMachineParallelization(
return instances;
}

/**
* Whether the fragment should use a serial source operator followed by local
* shuffle to add intra-machine parallelism. When true, data is first gathered
* through one exchange, then locally shuffled to multiple instances on the same
* machine, allowing parallel computation without rescanning the source data.
*
* @param distributeContext the distribute context; for load jobs, the connect
* context is passed as null to avoid serial source
* @return true if the fragment has a serial source operator and should use
* local shuffle to increase parallelism
*/
protected boolean useLocalShuffleToAddParallel(DistributeContext distributeContext) {
return fragment.useSerialSource(distributeContext.isLoadJob ? null : statementContext.getConnectContext());
}

/**
* Split the given scan source evenly into {@code instanceNum} partitions and
* create one {@link StaticAssignedJob} per partition, all on the same worker.
* Each instance scans a disjoint subset of the tablet ranges, dividing the
* total scan workload among the instances.
*
* @param scanSource the full scan source (e.g. all tablets assigned to this worker)
* @param instanceNum the number of instances to split into
* @param instances the output list receiving newly created assigned jobs
* @param context the connect context for generating instance IDs
* @param worker the worker that will host all of the instances
*/
protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// split the scanRanges to some partitions, one partition for one instance
Expand All @@ -127,6 +215,22 @@ protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, List<
}
}

/**
* Create local shuffle instances on the given worker. The first instance scans
* all data, and remaining instances receive an empty scan source — they share
* the first instance's scan result via local shuffle on the same BE.
* This avoids rescanning the same data multiple times while still adding
* parallelism for downstream operators (e.g. aggregation).
* <p>
* All instances share the same {@code shareScanId}, signaling to the backend
* that they belong to the same shared-scan group.
*
* @param scanSource the full scan source (all data for this worker)
* @param instanceNum the total number of local shuffle instances to create
* @param instances the output list receiving newly created {@link LocalShuffleAssignedJob}s
* @param context the connect context for generating instance IDs
* @param worker the worker that will host all local shuffle instances
*/
protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, List<AssignedJob> instances,
ConnectContext context, DistributedPlanWorker worker) {
// only generate one instance to scan all data, in this step
Expand Down Expand Up @@ -161,6 +265,25 @@ protected void assignLocalShuffleJobs(ScanSource scanSource, int instanceNum, Li
}
}

/**
* Compute the number of parallel instances for this fragment.
* The result is bounded by several constraints:
* <ul>
* <li>If the fragment has unpartitioned data distribution, returns 1.</li>
* <li>If query cache is enabled, returns {@code maxParallel} (one instance per
* tablet required for cache lookup).</li>
* <li>If the single OLAP scan node qualifies for single-instance optimization
* (e.g. LIMIT with no conjuncts), returns 1 to save resources.</li>
* <li>If local shuffle is active, returns the fragment's {@code parallelExecNum}.</li>
* <li>Otherwise, returns {@code min(maxParallel, max(parallelExecNum, 1))},
* i.e. capped by the actual tablet count.</li>
* </ul>
*
* @param maxParallel the maximum possible parallelism (e.g. total tablet count
* or bucket count on this worker)
* @param useLocalShuffleToAddParallel whether local shuffle is active
* @return the number of instances to create for this worker
*/
protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddParallel) {
Preconditions.checkArgument(maxParallel > 0, "maxParallel must be positive");
if (!fragment.getDataPartition().isPartitioned()) {
Expand Down Expand Up @@ -188,6 +311,15 @@ protected int degreeOfParallelism(int maxParallel, boolean useLocalShuffleToAddP
return Math.min(maxParallel, Math.max(fragment.getParallelExecNum(), 1));
}

/**
* Create a single empty instance assigned to a random available worker.
* Used by subclasses in {@link #fillUpAssignedJobs} as a fallback when normal
* parallelization produces no instances (e.g. all tablets/data pruned away),
* ensuring the fragment can still execute and return an empty result.
*
* @param workerManager the worker manager to select a random worker from
* @return a singleton list containing one empty assigned job
*/
protected List<AssignedJob> fillUpSingleEmptyInstance(DistributedPlanWorkerManager workerManager) {
long catalogId = Env.getCurrentInternalCatalog().getId();
if (scanNodes != null && scanNodes.size() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ public UnassignedAllBEJob(StatementContext statementContext, PlanFragment fragme
}

// ExchangeNode -> upstreamFragment -> AssignedJob(instances of upstreamFragment)
/**
* Compute assigned jobs that deploy one instance on every available backend.
* This is used for dictionary sink fragments where data must be loaded onto
* all BEs. Supports two loading modes:
* <ul>
* <li><b>Full load</b>: when source data version has changed, redeploy to all BEs
* with parallelism matching the upstream fragment instance count.</li>
* <li><b>Partial load</b>: when only some BEs are outdated, deploy only to those
* outdated BEs to avoid redundant work.</li>
* </ul>
* Each BE gets one instance with an empty {@link DefaultScanSource} (the actual
* scan data comes from the upstream exchange).
*
* @param distributeContext the distribute context providing the worker manager
* @param inputJobs multimap from child exchange nodes to their assigned jobs,
* used to determine the expected instance count for full loads
* @return one assigned job per target backend
*/
@Override
public List<AssignedJob> computeAssignedJobs(DistributeContext distributeContext,
ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ public UnassignedGatherJob(
super(statementContext, fragment, ImmutableList.of(), exchangeToChildJob);
}

/**
* Compute assigned jobs for a gather (single-node) fragment.
* All instances are placed on a single randomly selected worker.
* When {@code useSerialSource} is true, multiple local shuffle instances
* are created on the same worker to add intra-machine parallelism:
* the first instance scans all data from the upstream exchange and
* local-shuffles it to the other local instances for parallel processing.
*
* @param distributeContext the distribute context for worker selection
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return one or more assigned jobs, all on the same selected worker
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ public static boolean canApply(List<ScanNode> scanNodes) {
return true;
}

/**
* Compute a single assigned job that gathers scan ranges from all
* {@link org.apache.doris.planner.DataGenScanNode} sources in this fragment.
* All scan ranges from each DataGenScanNode are collected into one
* {@link DefaultScanSource} and placed on a randomly selected worker.
*
* @param distributeContext the distribute context for worker selection
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return a list containing exactly one assigned job with all scan ranges merged
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ public UnassignedGroupCommitJob(StatementContext statementContext,
super(statementContext, fragment, scanNodes, exchangeToChildJob);
}

/**
* Compute a single assigned job bound to the group commit merge backend.
* The target backend is determined by {@link StatementContext#getGroupCommitMergeBackend()},
* ensuring the group commit sink executes on the specific BE designated for merging.
*
* @param distributeContext the distribute context (unused — worker is fixed by group commit logic)
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return a list containing exactly one assigned job on the group commit merge backend
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public interface UnassignedJob extends TreeNode<UnassignedJob> {

ListMultimap<ExchangeNode, UnassignedJob> getExchangeToChildJob();

/**
* Compute and return the list of {@link AssignedJob}s for this fragment.
* This is the core method that transforms an unassigned fragment-level job into
* concrete parallel instances, each bound to a specific {@link DistributedPlanWorker}
* and carrying its assigned {@link ScanSource} (data ranges).
*
* @param distributeContext
* the distribute context containing worker manager, selected workers, and other
* planner state needed for worker selection and parallelism decisions
* @param inputJobs
* multimap from child {@link ExchangeNode} to their already-assigned jobs;
* provides the child fragment instance layout used by shuffle/gather jobs
* to determine their own instance count and worker placement
* @return the list of assigned jobs, each representing one fragment instance scheduled
* on a specific worker with its data source
*/
List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ public UnassignedLocalTVFSinkJob(
this.backendId = backendId;
}

/**
* Compute a single assigned job on the designated backend for local TVF sink.
* The target backend is determined by {@code backendId}. If the specified backend
* is not alive, an {@link IllegalStateException} is thrown. This ensures
* INSERT INTO local(...) writes to the correct node's local disk.
*
* @param distributeContext the distribute context (unused — worker is fixed by backendId)
* @param inputJobs multimap from child exchange nodes to their assigned jobs
* @return a list containing exactly one assigned job on the designated backend
* @throws IllegalStateException if the target backend is not available
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ public UnassignedQueryConstantJob(StatementContext statementContext, PlanFragmen
super(statementContext, fragment, ImmutableList.of(), ArrayListMultimap.create());
}

/**
* Compute a single assigned job on a randomly selected worker for constant queries
* (e.g. SELECT 1, SELECT * FROM VALUES(...)). Such queries have no data scan,
* so a single instance with an empty {@link DefaultScanSource} suffices.
*
* @param distributeContext the distribute context for random worker selection
* @param inputJobs unused — constant queries have no child fragments
* @return a list containing exactly one assigned job on a random worker
*/
@Override
public List<AssignedJob> computeAssignedJobs(
DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) {
Expand Down
Loading
Loading