Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintain pool selection for the minimizeDataMovement instance partition assignment strategy #11953

Merged

Conversation

jackjlli
Copy link
Contributor

@jackjlli jackjlli commented Nov 5, 2023

This PR maintains the existing pool selection for the minimizeDataMovement instance partition assignment strategy.

This scenario would be useful for the case when a new pool is added and Pinot admin would like to keep the current replica group assignment to the existing pool as well as leveraging the new pool for the new replica group.

Incompatible (public API change)

The constructor of InstanceAssignmentConfig is changed

@codecov-commenter
Copy link

codecov-commenter commented Nov 5, 2023

Codecov Report

Attention: 40 lines in your changes are missing coverage. Please review.

Comparison is base (17db0fd) 61.69% compared to head (a286bd8) 61.72%.

Files Patch % Lines
...e/assignment/instance/InstanceTagPoolSelector.java 32.35% 22 Missing and 1 partial ⚠️
...nstance/InstanceReplicaGroupPartitionSelector.java 94.58% 9 Missing and 6 partials ⚠️
...mmon/assignment/InstanceAssignmentConfigUtils.java 0.00% 1 Missing ⚠️
...assignment/instance/InstancePartitionSelector.java 50.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #11953      +/-   ##
============================================
+ Coverage     61.69%   61.72%   +0.02%     
  Complexity      207      207              
============================================
  Files          2424     2424              
  Lines        132340   132484     +144     
  Branches      20436    20473      +37     
============================================
+ Hits          81651    81777     +126     
- Misses        44693    44702       +9     
- Partials       5996     6005       +9     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.68% <87.57%> (+26.65%) ⬆️
java-21 61.61% <87.57%> (+0.03%) ⬆️
skip-bytebuffers-false 61.70% <87.57%> (+0.01%) ⬆️
skip-bytebuffers-true 61.59% <87.57%> (+0.03%) ⬆️
temurin 61.72% <87.57%> (+0.02%) ⬆️
unittests 61.72% <87.57%> (+0.02%) ⬆️
unittests1 46.90% <75.00%> (-0.01%) ⬇️
unittests2 27.69% <86.64%> (+0.06%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not access InstanceReplicaGroupPartitionConfig in InstanceTagPoolSelector. We can consider moving minimizeDataMovement flag into InstanceAssignmentConfig

@jackjlli
Copy link
Contributor Author

jackjlli commented Nov 7, 2023

We should not access InstanceReplicaGroupPartitionConfig in InstanceTagPoolSelector. We can consider moving minimizeDataMovement flag into InstanceAssignmentConfig

That's good point! How about we just pass the boolean _minimizeDataMovement parameter down to InstanceTagPoolSelector class, because that's the only parameter needed from InstanceReplicaGroupPartitionConfig in that class?
Moving the boolean config up to InstanceAssignmentConfig is a good idea, but that seems to be a backward incompatible change, which is not trivial though.

@jackjlli jackjlli force-pushed the maintain-pool-selection-for-minimizeDataMovement branch from 50e0c45 to 50cda35 Compare November 8, 2023 04:37
@jackjlli
Copy link
Contributor Author

jackjlli commented Nov 8, 2023

Discussed with @Jackie-Jiang offline. The _minimizeDataMovement config is put to the InstanceAssignmentConfig level so that it applies to both pool selection and instance selection. The _minimizeDataMovement config inside InstanceReplicaGroupPartitionConfig class is marked as Deprecated and will be removed in the next official release.

@jackjlli jackjlli force-pushed the maintain-pool-selection-for-minimizeDataMovement branch 3 times, most recently from 993aa6b to 95ed621 Compare November 8, 2023 05:36
@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes Configuration Config changes (addition/deletion/change in behavior) labels Nov 13, 2023
Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to keep the current config still supported

Let me know if you want me to review the implementation details

@jackjlli jackjlli force-pushed the maintain-pool-selection-for-minimizeDataMovement branch from bb38609 to d9ae2e9 Compare November 15, 2023 06:02
@jackjlli
Copy link
Contributor Author

Thanks @Jackie-Jiang! The PR is ready to be reviewed now. It'd be great if you can help review it.

// Keep the same pool for the replica group if it's already been used for the table.
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we only checking the common replica groups? We should try to match the pool to existing replica groups, then assign the remaining pools.
There are several scenarios to cover:

  • Same pool and replica groups
  • pools < replica groups (or single pool)
  • pools > replica groups

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason of checking the common replica groups is that it's not always the case that numReplicaGroups gets incremented. If the number of replica groups is reduced, we don't actually care what was used for the stale RG which is no longer needed. That's why the common one is used here.

The scenarios you mentioned can be covered by using the min heap in Line 112, which is to gather the pool number as well as the number of times to be chosen. The one with the least frequent usage would always be chosen to assign to a RG. Keep in mind that 1 RG can only have 1 pool (RG -> pool), while 1 pool may have more than 1 RG (pool -> [RG]).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say we used to have 2 RGs, and now we reduce it to 1 RG, we should still check all existing RGs and pick the pool with the most common instances so that we can keep minimize movement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me put the analysis for RG reduction and increment respectively here.

For the RG reduction scenario, it's always the RG(s) with higher numbers that got removed. Let's say numRG got reduced from 2 to 1 (i.e. one out of two replica groups got reduced), it's always the RG1 instead of RG0 that would be removed. In this case, when calculating the new number of instances per RG, we should always use the updated number of RGs in a pool (which comes from poolToReplicaGroupIdsMap) to calculate that (detailed logic can be seen in Line 164 of this class).

For the RG increment scenario, it's always the RG(s) with higher numbers that got added. In this case, the pool(s) with least number of previous usage would be chosen, which are either 1) the pools would never be used at all (i.e. from a unused pool), or 2) the least frequent chosen pool would be picked up (i.e. from an existing pool). The order is maintained by a min heap, so that the unused pool would always be chosen before choosing any existing pools.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that we didn't really pick the optimal pool to minimize the data movement. Currently we pick a pool when any existing server shows up in the pool, even if there are more servers shared with another pool. This algorithm works well when servers are not moved from one pool to another, but might not return best pool otherwise.
Another algorithm which always give the best pool is to track the shared server count within all pools, and pick the pool with most shared servers.

More importantly, the current algorithm could cause wrong assignment in the following scenario:
Existing instance partitions: RG0: [s0, s1], RG1: [s2, s3]
Pools: P0: [s0, s1, s2], P1: [s3, s4, s5]
Current algorithm will assign both PRs to P0, but P0 doesn't have enough servers to hold both RGs
To make it work the same way as default assignment, we need to track the maximum RGs assigned to a pool, and not assign more RGs to a pool when it already reaches the max.

I'd suggest adding a test to catch such scenario

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good scenario! I've adjusted the logic to consider the scenario when instances got moved across pools, which breaks the assumption that 1 RG can only have 1 pool (1 RG -> 1 pool) for the existing instance partition mapping.
A unit test called testSelectPoolsWhenExistingReplicaGroupMapsToMultiplePools is also added to capture this scenario in this PR. PTAL.

// Keep the same pool for the replica group if it's already been used for the table.
int existingNumPartitions = _existingInstancePartitions.getNumPartitions();
int existingNumReplicaGroups = _existingInstancePartitions.getNumReplicaGroups();
int numCommonReplicaGroups = Math.min(numReplicaGroups, existingNumReplicaGroups);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Say we used to have 2 RGs, and now we reduce it to 1 RG, we should still check all existing RGs and pick the pool with the most common instances so that we can keep minimize movement.

@jackjlli jackjlli force-pushed the maintain-pool-selection-for-minimizeDataMovement branch from ff4b642 to 83911ab Compare December 7, 2023 23:20
@Ferix9288
Copy link
Contributor

Hello~ I've been lurking for the past months or so haha. Any chance this can be merged in soon as our company really dislikes our current workaround for this problem? Thanks so much again @jackjlli and @Jackie-Jiang 🙏

@jackjlli
Copy link
Contributor Author

Hey @Ferix9288 , sorry for the late reply! I've updated the logic for this PR and hopefully it can be merged to meet your need soon.

Copy link
Contributor

@somandal somandal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yet to review the tests

@@ -79,13 +83,26 @@ public boolean equals(Object obj) {
}

public static class AscendingIntPairComparator implements Comparator<IntPair> {
private boolean _ascending;

public AscendingIntPairComparator(boolean ascending) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recommend renaming this class since it is no longer a strictly "Ascending" comparator. The boolean you added allows both ascending and descending comparisons.

@@ -56,6 +56,8 @@ public class InstanceReplicaGroupPartitionConfig extends BaseJsonConfig {
"Name of the column used for partition, if not provided table level replica group will be used")
private final String _partitionColumn;

// TODO: remove this config in the next official release
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a question: we'll have to update all table configs on our end to remove this once it is removed, right? Will we see failures for existing tables if this is deleted in the next release but we still have table configs setting this in the InstanceReplicaGroupPartitionConfig?

List<InstanceConfig> instanceConfigsInPool = poolToInstanceConfigsMap.get(pool);
instanceConfigsInPool.forEach(k -> candidateInstances.add(k.getInstanceName()));
// Get the maximum number of replica groups per pool.
int maxNumberOfReplicaGroupPerPool = numReplicaGroups / pools.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here is confusing. Should this use the ceil() of the division? What if the numReplicaGroups isn't a multiple of number of pools? e.g. 3 replica groups across 2 pools? This will set the max to 1 instead of 2.

Or is this intentionally the floor? In which case can you update the comment and variable name to reflect that this should be minimum number of RGs/pool?

while (!maxHeap.isEmpty()) {
Pairs.IntPair pair = maxHeap.remove();
int poolNumber = pair.getRight();
for (int i = 0; i < maxNumberOfReplicaGroupPerPool; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if there is a code simplification opportunity here. Instead of running this outer loop, can you just extract out the relevant group ids from existingReplicaGroupIdToExistingInstancesMap, sort by size ascending and assign the top maxNumberOfReplicaGroupPerPool number of target groups if larger than 0?

Also I guess if you do want to keep this for loop you can move it to be after the following, right?

 Set<Integer> existingReplicaGroups = existingPoolToExistingReplicaGroupIdsMap.get(poolNumber);
            if (existingReplicaGroups == null || existingReplicaGroups.isEmpty()) {
              continue;
            }

I don't see how the above will change for each run

Comment on lines 138 to 140
if (!existingPoolsToExistingInstancesMap.containsKey(existingPool)) {
existingPoolsToExistingInstancesMap.put(existingPool, new HashSet<>());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need this. You're already doing a computeIfAbsent on the next line

@Jackie-Jiang Jackie-Jiang force-pushed the maintain-pool-selection-for-minimizeDataMovement branch from 08efc8c to a286bd8 Compare February 1, 2024 07:17
@Jackie-Jiang Jackie-Jiang merged commit a8b1fa3 into master Feb 1, 2024
18 of 19 checks passed
@Jackie-Jiang Jackie-Jiang deleted the maintain-pool-selection-for-minimizeDataMovement branch February 1, 2024 17:00
@Jackie-Jiang Jackie-Jiang added the incompatible Indicate PR that introduces backward incompatibility label Feb 1, 2024
suyashpatel98 pushed a commit to suyashpatel98/pinot that referenced this pull request Feb 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Configuration Config changes (addition/deletion/change in behavior) feature incompatible Indicate PR that introduces backward incompatibility release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants