Skip to content

[SPARK-55887][CONNECT] Special handling for CollectLimitExec/CollectTailExec to avoid full table scans#54685

Closed
LuciferYang wants to merge 6 commits intoapache:masterfrom
LuciferYang:connect-collect-limit
Closed

[SPARK-55887][CONNECT] Special handling for CollectLimitExec/CollectTailExec to avoid full table scans#54685
LuciferYang wants to merge 6 commits intoapache:masterfrom
LuciferYang:connect-collect-limit

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

@LuciferYang LuciferYang commented Mar 9, 2026

What changes were proposed in this pull request?

This PR updates SparkConnectPlanExecution to use executeCollect() instead of execute() when processing CollectLimitExec and CollectTailExec physical plans.

In Spark Connect, operations like head(), take(), and tail() are translated into CollectLimitExec or CollectTailExec physical nodes. Previously, these were executed via the standard execute() path, which often resulted in scanning all partitions before reducing the results.

By switching to executeCollect(), Spark Connect now leverages the optimized executeTake() and executeTail() implementations already present in Spark Classic. These optimizations ensure that only the necessary partitions are scanned (e.g., scanning only the first partition for head(1)), significantly reducing I/O and task overhead.

Why are the changes needed?

Parity with Spark Classic behavior and performance optimization.

In Spark Classic, Dataset.collect() (and by extension head/take/tail) uses plan.executeCollect(). This path includes optimizations to avoid full table scans:

  • CollectLimitExec uses executeTake(): It starts by scanning only the first partition and incrementally scans more only if the limit isn't met.
  • CollectTailExec uses executeTail(): It starts scanning from the last partition backwards.

In Spark Connect (Before this PR), SparkConnectPlanExecution used plan.execute(). For a limit(1) query on a 100-partition table, this would launch 100 tasks (one for each partition's LocalLimit), causing unnecessary computation and resource usage.

Example Scenario:
Running spark.range(0, 10000, 1, 100).limit(1).collect():

  • Classic: Launches 1 task (scans partition 0).
  • Connect (Before): Launches 100 tasks (scans partitions 0-99).
  • Connect (After): Launches 1 task (scans partition 0).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added new test cases in SparkConnectServiceSuite to verify the task count reduction:

  • test("SPARK-55887: Use executeCollect for limit to avoid full scan"): Verified that limit(1) on a 100-partition DataFrame triggers significantly fewer tasks than partitions (expected: 1 task).
  • test("SPARK-55887: Use executeCollect for tail to avoid full scan"): Verified that tail(1) on a 100-partition DataFrame triggers significantly fewer tasks than partitions (expected: 1 task).

Was this patch authored or co-authored using generative AI tooling?

Test cases were generated with the assistance of Gemini 3.

@LuciferYang LuciferYang marked this pull request as draft March 9, 2026 08:50
@LuciferYang
Copy link
Copy Markdown
Contributor Author

test first

@LuciferYang LuciferYang changed the title Special handling of CollectLimitExec/CollectTailExec [SPARK-55887][CONNECT] Special handling for CollectLimitExec/CollectTailExec to avoid full table scans Mar 9, 2026
@LuciferYang LuciferYang marked this pull request as ready for review March 9, 2026 09:52
sendBatch(bytes, count, offset)
offset += count
}
case collectLimit: CollectLimitExec =>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fix might bring additional memory pressure to the Connect server. However, I think we can implement a simple fix first and then look for a better solution later.

@LuciferYang LuciferYang marked this pull request as draft March 9, 2026 13:37
@LuciferYang
Copy link
Copy Markdown
Contributor Author

Let me test it in the production environment.

@LuciferYang LuciferYang marked this pull request as ready for review March 10, 2026 02:37
@LuciferYang
Copy link
Copy Markdown
Contributor Author

Let me test it in the production environment.

The test indicates that the function is working ok.

@LuciferYang
Copy link
Copy Markdown
Contributor Author

Thank you @dongjoon-hyun

offset += count
}
case collectLimit: CollectLimitExec =>
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we use names like collectLimitArrow/collectTailArrow?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@dongjoon-hyun
Copy link
Copy Markdown
Member

Merged to master for Apache Spark 4.2.0. Thank you, @LuciferYang and all.

@LuciferYang
Copy link
Copy Markdown
Contributor Author

Thanks @dongjoon-hyun @zhengruifeng @yikf

@LuciferYang LuciferYang deleted the connect-collect-limit branch March 17, 2026 08:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants