Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Adaptive Query Execution #2176

Merged
merged 35 commits into from
May 7, 2024
Merged

[PERF] Adaptive Query Execution #2176

merged 35 commits into from
May 7, 2024

Conversation

samster25
Copy link
Member

@samster25 samster25 commented Apr 24, 2024

  • Implements AQE at query stage boundaries such as HashJoin, SMJ, Repartition, GroupedAgg, etc
  • In the case of binary ops, we rank the two paths and choose the one with lower cost
  • Also implements a better Approximate Statistics to rank the partial plans at forks
  • Implements AQE for both PyRunner and RayRunner
  • Fix bug in RayRunner build_partitions where it didn't forward partial metadata
  • Flag to enable AQE via DaftExecutionConfig or env variable DAFT_ENABLE_AQE=1
  • Follow on:
  • Turn on AQE testing in CI
  • Implement AQE Rules such as dynamic coalesce or DPP

Copy link

codecov bot commented Apr 26, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 85.08%. Comparing base (252721e) to head (0628788).
Report is 2 commits behind head on main.

❗ Current head 0628788 differs from pull request most recent head 0efd84b. Consider uploading reports for the commit 0efd84b to get more accurate results

Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #2176      +/-   ##
==========================================
- Coverage   85.64%   85.08%   -0.56%     
==========================================
  Files          71       68       -3     
  Lines        7661     7367     -294     
==========================================
- Hits         6561     6268     -293     
+ Misses       1100     1099       -1     

see 22 files with indirect coverage changes

@samster25 samster25 changed the title Query Stages [PERF] Query Stages Apr 26, 2024
@samster25 samster25 changed the title [PERF] Query Stages [PERF] Adaptive Query Execution May 6, 2024
@samster25 samster25 marked this pull request as ready for review May 6, 2024 23:35
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚀 🚀 🚀

@@ -302,6 +302,9 @@ def num_partitions(self) -> int | None:
def size_bytes(self) -> int | None:
return self.value.size_bytes() if self.value is not None else None

def num_rows(self) -> int | None:
return len(self.value) if self.value is not None else None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have an assertion in the AdaptivePhysicalPlanScheduler that cache_entry.num_rows() is not None, is there any case in which that won't be true (i.e. will cache_entry.value ever be None at that point)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, I was just following the convention above

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed offline!

adaptive_planner = builder.to_adaptive_physical_plan_scheduler(daft_execution_config)
while not adaptive_planner.is_done():
source_id, plan_scheduler = adaptive_planner.next()
# don't store partition sets in variable to avoid reference
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, good call.

daft/runners/pyrunner.py Outdated Show resolved Hide resolved
metadatas = [PartitionMetadata.from_table(p) for p in partitions]
assert len(partial_metadatas) == len(partitions), f"{len(partial_metadatas)} vs {len(partitions)}"

metadatas = [PartitionMetadata.from_table(p).merge_with_partial(m) for p, m in zip(partitions, partial_metadatas)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I was just doing something similar in the executor branch. 😄

src/daft-plan/src/physical_plan.rs Outdated Show resolved Hide resolved
}
Self::Project(Project { input, .. })
| Self::MonotonicallyIncreasingId(MonotonicallyIncreasingId { input, .. }) => {
// TODO(sammy), we need the schema to estimate the new size per row
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we can always tweak logical->physical translation to add the schema to the physical Project and MontonicallyIncreasingId structs for this case! Obviously not a blocking issue, though.

src/daft-plan/src/physical_plan.rs Outdated Show resolved Hide resolved
@samster25 samster25 enabled auto-merge (squash) May 7, 2024 22:19
@samster25 samster25 merged commit b61461f into main May 7, 2024
27 checks passed
@samster25 samster25 deleted the sammy/query-stage-emitter branch May 7, 2024 22:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants