Skip to content

[WIP] feat(aqe): delay join decision & introduce broadcast join in AQE#1752

Draft
milenkovicm wants to merge 4 commits into
apache:mainfrom
milenkovicm:feat_dynamic_join_selection
Draft

[WIP] feat(aqe): delay join decision & introduce broadcast join in AQE#1752
milenkovicm wants to merge 4 commits into
apache:mainfrom
milenkovicm:feat_dynamic_join_selection

Conversation

@milenkovicm
Copy link
Copy Markdown
Contributor

@milenkovicm milenkovicm commented May 23, 2026

Overview

This commit implements dynamic join strategy selection in Ballista's Adaptive Query Execution (AQE) pipeline. Instead of choosing a join strategy at planning time (when data sizes are unknown), the plan now inserts a placeholder DynamicJoinSelectionExec node that defers the decision until runtime statistics are available.

Before (planning time)
  HashJoinExec / SortMergeJoinExec
      ↑ chosen by DataFusion optimizer (no runtime stats)

After (AQE flow)
  Step 1 – plan creation:
    DelayJoinSelectionRule → DynamicJoinSelectionExec  (placeholder)

  Step 2 – stage completes, stats available:
    SelectJoinRule → CollectLeft | LateCollectLeft | Hash | Sort | Repartition
                          ↑ decided using actual byte / row counts

The change means Ballista can now broadcast small tables or swap join sides based on real data sizes observed during execution, not just estimates from the logical plan.

Introduced planner rules consider few configuration values to decide join type:

  • datafusion.optimizer.hash_join_single_partition_threshold_rows, to select broadcast join or not
  • datafusion.optimizer.hash_join_single_partition_threshold, to select broadcast join or not
  • datafusion.optimizer.prefer_hash_join to select actual preferred join implementation

with dynamic join selection some TPCH jobs have less stages created

Screenshot 2026-05-23 at 20 51 03

compared to static scheduling

Screenshot 2026-05-23 at 20 55 20

TODO

  • broadcast join gives bad results for TPCH 2, 13, 16
  • configuration to disable rule ballista.planner.adaptive_join.enabled
  • test with sort join
  • decision should we stick with ballista.optimizer.broadcast_join_threshold_bytes or go with datafusion configuration values
  • more testing
  • TPCDS Q72 test
  • code clean up

Context

this PR is part of #1359

@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 2f4e895 to 4a69e05 Compare May 23, 2026 20:27
Comment thread ballista/scheduler/src/state/aqe/execution_plan/dynamic_join.rs Outdated
Comment thread ballista/scheduler/src/state/aqe/mod.rs Outdated
Comment thread ballista/scheduler/src/state/aqe/mod.rs Outdated
@milenkovicm
Copy link
Copy Markdown
Contributor Author

Q2 (Static Plan)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Job 0buY2Mw physical plan:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SortPreservingMergeExec: [s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], fetch=100
  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], preserve_partitioning=[true]
    ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, s_acctbal@5, s_comment@6, n_name@8]
        CoalescePartitionsExec
          HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2, s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8, n_name@9]
            FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
            ProjectionExec: expr=[p_partkey@2 as p_partkey, p_mfgr@3 as p_mfgr, s_name@4 as s_name, s_address@5 as s_address, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@9 as ps_supplycost, n_name@0 as n_name, n_regionkey@1 as n_regionkey]
              HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@4)], projection=[n_name@1, n_regionkey@2, p_partkey@3, p_mfgr@4, s_name@5, s_address@6, s_phone@8, s_acctbal@9, s_comment@10, ps_supplycost@11]
                DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_name, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                ProjectionExec: expr=[p_partkey@6 as p_partkey, p_mfgr@7 as p_mfgr, s_name@0 as s_name, s_address@1 as s_address, s_nationkey@2 as s_nationkey, s_phone@3 as s_phone, s_acctbal@4 as s_acctbal, s_comment@5 as s_comment, ps_supplycost@8 as ps_supplycost]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@2)], projection=[s_name@1, s_address@2, s_nationkey@3, s_phone@4, s_acctbal@5, s_comment@6, p_partkey@7, p_mfgr@8, ps_supplycost@10]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment], file_type=parquet, predicate=DynamicFilter [ empty ]
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_suppkey@3, ps_supplycost@4]
                      CoalescePartitionsExec
                        FilterExec: p_size@3 = 15 AND p_type@2 LIKE %BRASS, projection=[p_partkey@0, p_mfgr@1]
                          DataSourceExec: file_groups={7 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-0.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-1.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-10.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-11.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-12.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-13.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-2.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-3.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-4.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-5.parquet], ...]}, projection=[p_partkey, p_mfgr, p_type, p_size], file_type=parquet, predicate=p_size@5 = 15 AND p_type@4 LIKE %BRASS, pruning_predicate=p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1, required_guarantees=[p_size in (15)]
                      DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        ProjectionExec: expr=[min(partsupp.ps_supplycost)@1 as min(partsupp.ps_supplycost), ps_partkey@0 as ps_partkey]
          AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
            RepartitionExec: partitioning=Hash([ps_partkey@0], 8), input_partitions=8
              AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
                  FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
                    DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
                  ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, n_regionkey@0 as n_regionkey]
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@2)], projection=[n_regionkey@1, ps_partkey@2, ps_supplycost@3]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                      ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, s_nationkey@0 as s_nationkey]
                        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@1)], projection=[s_nationkey@1, ps_partkey@2, ps_supplycost@4]
                          DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_nationkey], file_type=parquet, predicate=DynamicFilter [ empty ]
                          DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ]

Q2 (AQE)

-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Job V3vvoKt physical plan:
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
AdaptiveDatafusionExec: is_final=true, plan_id=3, stage_id=6, stage_resolved=true
  SortExec: TopK(fetch=100), expr=[s_acctbal@0 DESC, n_name@2 ASC NULLS LAST, s_name@1 ASC NULLS LAST, p_partkey@3 ASC NULLS LAST], preserve_partitioning=[false]
    ProjectionExec: expr=[s_acctbal@5 as s_acctbal, s_name@2 as s_name, n_name@7 as n_name, p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_address@3 as s_address, s_phone@4 as s_phone, s_comment@6 as s_comment]
      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@1), (ps_supplycost@7, min(partsupp.ps_supplycost)@0)], projection=[p_partkey@0, p_mfgr@1, s_name@2, s_address@3, s_phone@4, s_acctbal@5, s_comment@6, n_name@8]
        HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@9)], projection=[p_partkey@1, p_mfgr@2, s_name@3, s_address@4, s_phone@5, s_acctbal@6, s_comment@7, ps_supplycost@8, n_name@9]
          FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
            DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
          ProjectionExec: expr=[p_partkey@2 as p_partkey, p_mfgr@3 as p_mfgr, s_name@4 as s_name, s_address@5 as s_address, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@9 as ps_supplycost, n_name@0 as n_name, n_regionkey@1 as n_regionkey]
            HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@4)], projection=[n_name@1, n_regionkey@2, p_partkey@3, p_mfgr@4, s_name@5, s_address@6, s_phone@8, s_acctbal@9, s_comment@10, ps_supplycost@11]
              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_name, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
              ProjectionExec: expr=[p_partkey@0 as p_partkey, p_mfgr@1 as p_mfgr, s_name@3 as s_name, s_address@4 as s_address, s_nationkey@5 as s_nationkey, s_phone@6 as s_phone, s_acctbal@7 as s_acctbal, s_comment@8 as s_comment, ps_supplycost@2 as ps_supplycost]
                HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(ps_suppkey@2, s_suppkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_supplycost@3, s_name@5, s_address@6, s_nationkey@7, s_phone@8, s_acctbal@9, s_comment@10]
                  ExchangeExec: partitioning=None, plan_id=4, stage_id=3, stage_resolved=true, broadcast=true
                    HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(p_partkey@0, ps_partkey@0)], projection=[p_partkey@0, p_mfgr@1, ps_suppkey@3, ps_supplycost@4]
                      ExchangeExec: partitioning=None, plan_id=0, stage_id=0, stage_resolved=true, broadcast=true
                        FilterExec: p_size@3 = 15 AND p_type@2 LIKE %BRASS, projection=[p_partkey@0, p_mfgr@1]
                          DataSourceExec: file_groups={7 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-0.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-1.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-10.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-11.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-12.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-13.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-2.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-3.parquet], [Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-4.parquet, Users/marko/TMP/tpch_data/tpch-data-sf1/part/part-5.parquet], ...]}, projection=[p_partkey, p_mfgr, p_type, p_size], file_type=parquet, predicate=p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS AND p_size@5 = 15 AND p_type@4 LIKE %BRASS, pruning_predicate=p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1 AND p_size_null_count@2 != row_count@3 AND p_size_min@0 <= 15 AND 15 <= p_size_max@1, required_guarantees=[p_size in (15)]
                      DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
                  DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
        ExchangeExec: partitioning=None, plan_id=5, stage_id=5, stage_resolved=true, broadcast=true
          ProjectionExec: expr=[min(partsupp.ps_supplycost)@1 as min(partsupp.ps_supplycost), ps_partkey@0 as ps_partkey]
            AggregateExec: mode=FinalPartitioned, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
              ExchangeExec: partitioning=Hash([ps_partkey@0], 8), plan_id=6, stage_id=4, stage_resolved=true
                AggregateExec: mode=Partial, gby=[ps_partkey@0 as ps_partkey], aggr=[min(partsupp.ps_supplycost)]
                  HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(r_regionkey@0, n_regionkey@2)], projection=[ps_partkey@1, ps_supplycost@2]
                    FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]
                      DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/region/part-0.parquet]]}, projection=[r_regionkey, r_name], file_type=parquet, predicate=r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE AND r_name@1 = EUROPE, pruning_predicate=r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1 AND r_name_null_count@2 != row_count@3 AND r_name_min@0 <= EUROPE AND EUROPE <= r_name_max@1, required_guarantees=[r_name in (EUROPE)]
                    ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, n_regionkey@0 as n_regionkey]
                      HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(n_nationkey@0, s_nationkey@2)], projection=[n_regionkey@1, ps_partkey@2, ps_supplycost@3]
                        DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/nation/part-0.parquet]]}, projection=[n_nationkey, n_regionkey], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ] AND DynamicFilter [ empty ]
                        ProjectionExec: expr=[ps_partkey@1 as ps_partkey, ps_supplycost@2 as ps_supplycost, s_nationkey@0 as s_nationkey]
                          HashJoinExec: mode=Partitioned, join_type=Inner, on=[(s_suppkey@0, ps_suppkey@1)], projection=[s_nationkey@1, ps_partkey@2, ps_supplycost@4]
                            ExchangeExec: partitioning=Hash([s_suppkey@0], 8), plan_id=2, stage_id=2, stage_resolved=true
                              DataSourceExec: file_groups={1 group: [[Users/marko/TMP/tpch_data/tpch-data-sf1/supplier/part-0.parquet]]}, projection=[s_suppkey, s_nationkey], file_type=parquet
                            ExchangeExec: partitioning=Hash([ps_suppkey@1], 8), plan_id=1, stage_id=1, stage_resolved=true
                              DataSourceExec: file_groups={8 groups: [[Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-0.parquet:0..2385925, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:0..1749648], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-1.parquet:1749648..2367980, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-10.parquet:0..2354492, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:0..1162749], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-11.parquet:1162749..2356504, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-12.parquet:0..2355805, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:0..586013], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-13.parquet:586013..2352367, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:0..2369219], [Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-2.parquet:2369219..2369944, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-3.parquet:0..2369786, Users/marko/TMP/tpch_data/tpch-data-sf1/partsupp/part-4.parquet:0..1765062], ...]}, projection=[ps_partkey, ps_suppkey, ps_supplycost], file_type=parquet

@milenkovicm
Copy link
Copy Markdown
Contributor Author

milenkovicm commented May 24, 2026

Partitions get messed up in the last stage

@milenkovicm
Copy link
Copy Markdown
Contributor Author

Note to myself: check #1643

#1643 mentions pushing down

FilterExec: r_name@1 = EUROPE, projection=[r_regionkey@0]

And doing broadcast join, this is true for aqe implementation

@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 4a69e05 to dbfacfa Compare May 25, 2026 13:21
@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from dbfacfa to 6335c37 Compare May 25, 2026 15:09
@milenkovicm milenkovicm force-pushed the feat_dynamic_join_selection branch from 6335c37 to d803f06 Compare May 25, 2026 16:57
@milenkovicm milenkovicm changed the title WIP feat(aqe): delay join decision & introduce broadcast join in AQE [WIP] feat(aqe): delay join decision & introduce broadcast join in AQE May 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant