Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage]partition assignment refactor #12079

Merged
merged 11 commits into from Dec 7, 2023

Conversation

walterddr
Copy link
Contributor

@walterddr walterddr commented Dec 1, 2023

step 2 in #12015

this PR:

  1. loosen the criteria for "co-assigning workers to the same partition to avoid shuffling" to non-leaf children
  2. reshape the rule of partition assignment & mailbox rule.
    • At logical plan time:
      • ExchangeNode now knows whether an exchange is "pre-partitioned" --> e.g. the input partition is the same desired by the exchange (exchange might still be needed if the keys are the same but functions are different)
    • At worker assignment time:
      • For intermediate-stage, ExchangeNode with "pre-partitioned" is required for assigning the same set of workers from the 1st child;
      • For leaf-stage, worker assignment is only controlled by tableOptions - e.g. assigned as table partition
    • At mailbox assignment time:
      • For all mailboxes: to determine whether a "direct" exchange can apply.
        • check partition_parallelism
          • if both sender/receiver are leaf (semi-join, partition_parallelism shouldn't apply; num_worker_sender == num_worker_receiver = num_partition_of_table (both))
          • if both sender/receiver are intermediate stage, partition_parallelism shouldn't apply num_worker_sender == num_worker_receiver)
          • if sender is leaf and receiver is intermediate: num_worker_received == num_worker_sender * partition_parallelism; num_worker_sender == num_partition_of_table)
        • check partition_function
          • sender stage plan metadata partition function == receiving stage plan metadata partition function
      • NOTE:
        • Exchange "pre-partitioned" is necessary and sufficient to assign direct exchange for 1st child and intermediate stage;
        • for partition function check it is also necessary to check if the mailbox send is using the same partition function as the receiving stage metadata indicates (tracked in TODO)

Work List

  • redefine partition info in DispatchablePlanMetadata and clear boundary between server/worker assignment and partitioned/non-partitioned optimization;
  • add partition function info as part of the validation step during worker assignment

TODO:

  • allow select partition function via config for all exchanges; nowadays it is hardcoded to "table hash function" for leaf; and "abs_hash_order_invariant" hash for non-leaf stages (if not using direct exchange);
    • once allowed, the condition for prepartition assignment can be loosen further
  • throw when "colocated hint is given but prepartition assignment condition check fails"
  • allow utilizing the same hash-function to recompute hashes when doing parallelism fan-out

@codecov-commenter
Copy link

codecov-commenter commented Dec 1, 2023

Codecov Report

Attention: 40 lines in your changes are missing coverage. Please review.

Comparison is base (3474db4) 61.68% compared to head (154f6ef) 61.48%.
Report is 5 commits behind head on master.

Files Patch % Lines
.../org/apache/pinot/query/routing/WorkerManager.java 75.88% 24 Missing and 10 partials ⚠️
...ery/planner/physical/MailboxAssignmentVisitor.java 58.33% 0 Missing and 5 partials ⚠️
.../query/planner/logical/RelToPlanNodeConverter.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #12079      +/-   ##
============================================
- Coverage     61.68%   61.48%   -0.20%     
- Complexity     1151     1153       +2     
============================================
  Files          2391     2406      +15     
  Lines        130211   130851     +640     
  Branches      20141    20218      +77     
============================================
+ Hits          80317    80457     +140     
- Misses        44046    44513     +467     
- Partials       5848     5881      +33     
Flag Coverage Δ
custom-integration1 ?
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 27.63% <0.00%> (-34.02%) ⬇️
java-21 34.85% <76.19%> (-0.11%) ⬇️
skip-bytebuffers-false 61.46% <76.19%> (-0.22%) ⬇️
skip-bytebuffers-true 34.83% <76.19%> (?)
temurin 61.48% <76.19%> (-0.20%) ⬇️
unittests 61.48% <76.19%> (-0.20%) ⬇️
unittests1 46.56% <76.19%> (-0.39%) ⬇️
unittests2 27.63% <0.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@walterddr walterddr changed the title [multistage][draft] partition assignment refactor [multistage]partition assignment refactor Dec 4, 2023
@walterddr walterddr marked this pull request as ready for review December 4, 2023 23:31
@walterddr walterddr force-pushed the pr_partition_assignment_refactor branch 2 times, most recently from ad29058 to 78f9d7a Compare December 4, 2023 23:36
Copy link
Contributor Author

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CC @Jackie-Jiang to take a look

@@ -58,15 +58,22 @@
public class WorkerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerManager.class);
private static final Random RANDOM = new Random();
private static final String DEFAULT_PARTITION_FUNCTION = "Hashcode";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is the best way to go. i am also ok with having to ask this pass in as mandatory

}

private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions) {
private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically the class should be named "PartitionTableInfo". this doesn't indicate "co-locate" at all

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either change it or add a TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

{
"description": "hint table without partitioning should throw exception",
"sql": "EXPLAIN PLAN FOR SELECT * FROM d /*+ tableOptions(partition_key='col1', partition_size='4') */ LIMIT 10",
"expectedException": "Error composing query plan for:.*"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: fix this test, it should always check the nested reason b/c that's always wrapped in parser throw or planner throw. doesn't make sense to check just the top level msg

Comment on lines 86 to 87
"ignored": true,
"comment": "partition parallelism mismatched in hint, this query shouldn't work at all",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we decided to allow this but i guess we should not. putting an ignore here first but i think we should not allow this and should throw exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: allowed here but it will revert back to generic assignment

@Jackie-Jiang Jackie-Jiang added refactor multi-stage Related to the multi-stage query engine labels Dec 5, 2023
Rong Rong added 6 commits December 6, 2023 15:21
this field is to indicate whether the exchange (or mailbox) requires to reshuffle the data
isPartition semantically means if the data is already partitioned, which can be true, just not the same partition as the mailbox/exchange expects
fix several rebase error
@walterddr walterddr force-pushed the pr_partition_assignment_refactor branch from a63a686 to 325c1b2 Compare December 6, 2023 23:22
Rong Rong added 2 commits December 6, 2023 15:53
1. we don't support rehash by pre-hash method (exchange doesn't support the same with table) so check all pre-partition instead of check single child
2. set default for exchange and default for table separately
3. fix tests
}

private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions) {
private ColocatedTableInfo getColocatedTableInfo(String tableName, String partitionKey, int numPartitions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either change it or add a TODO

Preconditions.checkState(numPartitions > 0, "'%s' must be positive, got: %s",
PinotHintOptions.TableHintOptions.PARTITION_SIZE, numPartitions);

String partitionFunction = tableOptions.getOrDefault(PinotHintOptions.TableHintOptions.PARTITION_FUNCTION,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't add default. In most cases this is murmur but we should not assume it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be backward-incompatible, all existing queries with table hints without hash function will fail

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. Currently we assume all table options using the same partition function. Putting Murmur as the default might be safer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// leaf-to-intermediate condition
return numSenders * sender.getPartitionParallelism() == numReceivers
&& sender.getPartitionFunction() != null
&& sender.getPartitionFunction().equals(receiver.getPartitionFunction());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should compare ignore cases

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@walterddr walterddr merged commit e207844 into apache:master Dec 7, 2023
19 checks passed
saurabhd336 pushed a commit to saurabhd336/pinot that referenced this pull request Feb 9, 2024
* reword isPartition --> to isPrePartitioned
this field is to indicate whether the exchange (or mailbox) requires to reshuffle the data
isPartition semantically means if the data is already partitioned, which can be true, just not the same partition as the mailbox/exchange expects
* allow partition info to pull all the way through 
* adding hash-function as part of the physical plan info
only 2 partition functions are supported, one from table; one from shuffle (shuffle is non-configurable at the moment)
* limitation: we don't support rehash by pre-hash method (exchange doesn't support the same with table) so check all children for pre-partition is necessary instead of just checking single child
* clean ups: rename colocated to partition table info; compare partition function in case insensitive manner; changing default to murmur and modify all tests

---------

Co-authored-by: Rong Rong <rongr@startree.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multi-stage Related to the multi-stage query engine refactor
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants