Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] [WIP] [Execution] Execution model prototype #2278

Closed
wants to merge 1 commit into from

Conversation

clarkzinzow
Copy link
Contributor

@clarkzinzow clarkzinzow commented May 15, 2024

This PR contains a prototype of a new execution model. The new execution model is designed with:

  • AQE as a first-class citizen,
  • division of operator scheduling into exchange operators and pipelinable task trees,
  • pipelining scheduling policies that are executor-agnostic (i.e. are shared across local and distributed executors),
  • task execution and fusion that are executor-agnostic,
  • resource accounting shared across executors,
  • all execution implemented in Rust.

This PR contains a proof-of-concept local executor + a scheduling implementation for a handful of operations (in-memory scan, file scan, project, filter, repartition, sort, collect).

TODOs

Pre-merge TODOs

  • Implement limit exchange + limit sink
  • Implement hash join exchange
  • [Prereq for montonically increasing ID] Support in-scheduler mutable state protocol for partition task ops that need to adjust their task configuration based on the input metadata. E.g., monotonically increasing ID needs to bind the partition number to each partition task.
  • Implement monotonically increasing ID op.
  • Support in-scheduler mutable state protocol for partition task ops that need to adjust their task configuration based on the output metadata of previous partition tasks for that same op. E.g., row number needs to bind the running row count to each partition task, and limit should bind the remaining limit to each partition task as an optimization.
  • Fix redundant task ID creation.
  • Fix intermediate queue ordering.
  • Consolidate bulk and streaming partition task scheduler logic.
  • Have schedulers select over future waiting, wait timeout, and SIGINT/SIGTERM signals.
  • [Timeboxed - lifetime + Send + Sync issues] Use tokio runtime + spawn for streaming sink execution instead of background thread
  • Port stage planner to visitor pattern
  • Port physical plan → partition task tree builder to visitor pattern
  • [Timeboxed] Try to create static correspondence between e.g. partition task with scan task input + leaf partition task node + scan task input
  • Make exchange/sink-imposed backpressure policy for partition task scheduler pluggable
  • Implement partial metadata
  • Unit tests

Post-merge TODOs

  • [Follow-up - Ray executor] Make PartitionTaskOp serializable with enum + PartitionTaskOp.to_type() that returns the appropriate variant
  • [Follow-up - Ray executor] Instead of having static executor and partition ref from planner-down, allow both to be dynamic so exchange operator could dynamically choose whether to execute locally or remote

@clarkzinzow clarkzinzow changed the title [WIP] [Execution] Execution model prototype [FEAT] [WIP] [Execution] Execution model prototype May 15, 2024
@github-actions github-actions bot added the enhancement New feature or request label May 15, 2024
@clarkzinzow clarkzinzow force-pushed the clark/execution-model-prototype branch 2 times, most recently from a214af0 to a278d60 Compare June 4, 2024 21:56
@clarkzinzow clarkzinzow force-pushed the clark/execution-model-prototype branch from a278d60 to ae19a1f Compare June 4, 2024 23:15
@jaychia jaychia closed this Sep 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants