Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Coordinator segment balancing/loading algorithm #7458

Open
leventov opened this issue Apr 12, 2019 · 4 comments
Open

New Coordinator segment balancing/loading algorithm #7458

leventov opened this issue Apr 12, 2019 · 4 comments

Comments

@leventov
Copy link
Member

leventov commented Apr 12, 2019

Motivation and goal

Currently, there is no segment balancing/loading algorithm. There is just code that happens to behave somehow, likely that nobody understands, how. It leads to incoherent/repeated configurations (#7159) and defects (#7202, #7383, #6329). It should also be possible to significantly speed up balancing/loading by not waiting until all segment movements from the previous balancing burst are done (#7159, #7344).

The goal of this proposal is to establish terminology, introduce a coherent and more operationally-friendly set of configuration parameters, and define the balancing/loading algorithm.

Terminology

First, let's establish terminology.

There are two groups of segments:

  • Replicated segments: governed by LoadRules.
  • Broadcasted segments: governed by BroadcastDistributionRules.

Unreplicated segments are such replicated segments that should but are not currently loaded by any historical server.

Underreplicated segments (in a tier) are such replicated segments whose target replication count (replicants) in the tier is higher than the number of historical servers in the tier loading the segment at the moment. In the algorithm below, unreplicated and underreplicated (or fully replicated) segments are considered disjoint groups: unreplicated segments are not called underreplicated in each tier with zero currently loading servers. But since a segment is loaded by at least one server in any tier it becomes underreplicated in all other tiers where it should be loaded.

Fully replicated segments (in a tier) are such replicated segments whose target replication count in the tier is equal to the number of historical servers in the tier loading the segment at the moment.

Overreplicated segments (in a tier) are such replicated segments whose target replication count in the tier is lower than the number of historical servers in the tier loading the segment at the moment.

Underbroadcasted segments (in a tier) are such broadcasted segments that are not loaded on some servers where they should be loaded. These servers should include decommissioning servers, except for the decommissioning servers that only hold broadcasted segments at the moment (see #7383).

Configurations

Existing configuration, still used

  • maxSegmentsInNodeLoadingQueue.

New configurations

  • numBestServersChoosingFunction, a JavaScript function. Defaults to function (numServersInTier, targetReplicants) { return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(numBestServersChoosingFunctionDefaultLogBase)); }
  • numBestServersChoosingFunctionDefaultLogBase, defaults to 5. Determines the specific shape of the default numBestServersChoosingFunction which determines the number of best servers (according to CostBalancerStrategy) in a tier to consider loading a segment on. The single best server may be unavailable for loading if its loading queue is already full (with many different definitions of "full" which will be explained below). In the current algorithm, numBestServersChoosingFunction is implicitly return targetReplicants; (if we were to define it), in other words, we always consider only targetReplicants "best" servers in a tier for loading.

numBestServersChoosingFunction (or numBestServersChoosingFunctionDefaultLogBase if used to adjust the default function) is the single parameter allowing to balance between the loading speed and the precision: a higher number allows higher balancing thoughput, but balancing decisions may be less optimal.

With the default function equivalent to return targetReplicants + Math.floor(Math.log(numServersInTier) / Math.log(5));, the behavior will be as "precise" as the current behavior in tiers of less than 5 servers, +1 server "imprecision allowance" in tiers between 6 and 25 servers, +2 servers in tiers between 26 and 125 servers, etc.

  • loadingMinPercent, defaults to 70.
  • decommissioningMinPercent, defaults to 10.
  • decommissioningMaxPercent, defaults to 70, or decommissioningMaxPercentOfMaxSegmentsToMove if this legacy parameter is specified.
  • balancingMinPercent, defaults to 20.

There are no "loadingMaxPercent" and "balancingMaxPercent" because there is no reason to cap loading and balancing (as long as all "minPercents" are satisfied).

loadingMinPercent + decommissioningMinPercent + balancingMinPercent must be equal to 100. If only one of these three parameters is specified, the other two are determined by applying their default proportions to 100 - theSpecifiedMinPercent. If two of these three parameters are specified, the third one is determined by complementing to 100.

These are "minPercents" and "maxPercents" in maxSegmentsInNodeLoadingQueue, not maxSegmentsToMove. This is not reflected in the names of the parameters to avoid over specifying the algorithm (like for decommissioningMaxPercentOfMaxSegmentsToMove, the parameter name should become obsolete very soon after the introduction).

  • segmentLoadingMaxSecondsBeforeAlerting, defaults to 900 (15 minutes), or replicantLifetime * druid.coordinator.period if the legacy replicantLifetime parameter is specified.

Retired configurations

  • maxSegmentsToMove
  • replicationThrottleLimit,
  • decommissioningMaxPercentOfMaxSegmentsToMove, unless used to default decommissioningMaxPercent for backward compatibility.
  • replicantLifetime, unless used to default segmentLoadingMaxSecondsBeforeAlerting for backward compatibility.

Proposed algorithm

1. Unreplicated segments

A "loading" stage.

  1. Use CostBalancerStrategy to determine top B "best" servers in every tier where the segment should be loaded, excluding decommissioning servers, where B is determined by numBestServersChoosingFunction (in each tier differently).
  2. Determine the "primary" tier (see Priority on loading for primary replica #4757)
  3. Schedule loading of the segment on one server in the primary tier.

Limit: on this stage, loading queue of any server can't contain more than maxSegmentsInNodeLoadingQueue * loadingMinPercent segments that were scheduled to be loaded on the server during one of the "loading" stages of the algorithm, including this stage. This may include segment loading requests hanging since the previous Coordinator's run (see #7159). Below, this kind of "typed" node loading queue limit is shortly identified as (relative).

If a segment can't be loaded on any server in the "primary" tier because loading queues of all B "best" servers are already sufficiently full with "loading" requests, step 3. is repeated for non-primary tiers.

After this stage, unreplicated segments are split into the following groups:

  • ToBecomeFullyReplicated (per tier)
  • ToBecomeUnderreplicated (per tier)
  • StillUnreplicated

2. Underbroadcasted segments

A "loading" stage.

In every tier where the segment is underbroadcasted, schedule loading of the segment on all servers in a tier where the segment is not currently loaded.

Limit: loadingMinPercent (relative).

After this stage, underbroadcasted segments are split into the following groups:

  • ToBecomeFullyBroadcasted (per tier)
  • StillUnderbroadcasted (per tier)

3. Underreplicated segments

A "loading" stage.

The following steps are performed in every tier where a segment is underreplicated:

  1. Determine top B "best" servers in the tier for the segment.
  2. Schedule loading of the segment on as many servers (among those determined on the previous step) as needed to match the target replication count in the tier. If the segment is also overreplicated in some other tier, add a callback to schedule unloading the segment from one of the servers where it is currently loaded in the other tier, beyond the target replication count in that tier, upon successful loading in the tier where the segment is underreplicated. (If there are more than one server where the segment is loaded in the tier where it is overreplicated and the target replication count in that tier is greater than zero, determining the server to unload the segment from may require to consult to CostBalancerStrategy.)

Limit: loadingMinPercent (relative).

After this stage, underreplicated segments are split into the following groups:

  • ToBecomeFullyReplicated (per tier). These per-tier groups are initialized during stage 1, segments are added to the existing groups.
  • StillUnderreplicated (per tier)

4. ToBecomeUnderreplicated segments

A "loading" stage.

In every tier where a segment is underreplicated, schedule loading of the segment on as many servers (among the top B "best" servers determined for the segment during stage 1) as needed to match the target replication count in the tier.

Limit: loadingMinPercent (relative).

After this stage, ToBecomeUnderreplicated segments are split into the following groups:

  • ToBecomeFullyReplicated (per tier). These per-tier groups are initialized during stage 1, segments are added to the existing groups.
  • ToBecomeUnderreplicated (per tier). In other words, segments that weren't scheduled for loading on enough servers in the tier due to the loadingMinPercent limit remain in the group where they were at the beginning of this stage.

5. Move replicated segments away from decommissioning servers

A "decommissioning" stage.

The following steps are performed in every tier, for every replicated segment loaded on one of the decommissioning servers in the tier:

  1. Determine top B "best" servers in the tier for the segment, or reuse already computed top B servers during stage 3 if the segment is underreplicated in the tier.
  2. Schedule loading of the segment on the servers determined on the previous step. Add a callback to schedule unloading the segment from one of the decommissioning servers upon successful loading. (This type of operation is currently implemented in DruidCoordinator.moveSegment().)

Limit: decommissioningMinPercent (relative).

After this stage, ToMoveAwayFromDecommServers segments are split into the following groups:

  • ToBecomeNotLoadedOnDecommServers (per tier)
  • StillLoadedOnDecommServers (per tier)

6. First revisit of loading segments

A "loading" stage.

Repeat:

  1. Stage 1 for StillUnreplicated segments,
  2. Stage 2 for StillUnderbroadcasted segments,
  3. Stage 3 for StillUnderreplicated segments,
  4. Stage 4 for ToBecomeUnderreplicated segments,

With the difference that the computed top B "best" servers are reused from the previous stages, and with a different limit (otherwise this stage wouldn't make much sense):

Limit: on this stage, loading queue of any server can't contain more than maxSegmentsInNodeLoadingQueue * (loadingMinPercent + decommissioningMinPercent) segments. This may include segment loading requests hanging since the previous Coordinator's run.

7. Segment balancing

A "balancing" stage.

There is a loop performed for every tier in the cluster. Steps of the loop:

  1. Choose a segment randomly among fully replicated and underreplicated segments (as of the beginning of the algorithm run) that need to be loaded in the tier.
  2. If the chosen segment has already been visited during this loop for the tier, mark this iteration as "failed" and proceed to the next iteration.
  3. Determine top B "best" servers in the tier for the segment, or reuse already computed top B servers during stage 3 or 5.
  4. If the segment shouldn't be moved because the servers where it is currently loaded are all among the top B "best" servers for the segment, mark this iteration as "failed" and proceed to the next iteration.

Alternative design: we may decide that a segment movement is worthwhile if there is any server where the segment is not loaded (among the top B) "better" than any server where the segment is currently loaded, rather than only if one of the servers, where the segment is currently loaded, is out of the top B "best" servers. However, avoid making "too precise" balancing movements like moving a segment from the server ranked second to the server ranked first by cost (according to CostBalancerStrategy) leaves more room for segment movements of relatively higher importance, before the loading queues of the servers are sufficiently filled up.
5. If the segment can't be moved because the loading queues of all destination servers (those among the top B, while some servers where the segment is currently loaded are out of the top B) are already sufficiently full (balancingMinPercent (relative)), add the segment to the ToBeBalanced group (for the tier), mark the iteration as "failed", and proceed the the next iteration.
6. Mark the iteration as "successful" and schedule a segment movement (like on stage 5, step 2).

The loop exits when among 100 last iterations, at least 80 of them are "failed".

Limit: balancingMinPercent (relative).

Comment: the 80/100 "failed" iterations condition replaces the static bound maxSegmentsToMove. Its advantage is that this condition is independent of the tier size. It should work for all tiers from containing a single server (then all 100 first iterations will fail and the loop will exit) to hundreds of servers. I don't know if 80 is the good threshold (and if it should be different in tiers of different sizes) because we don't have #5987 yet running in our clusters so we don't have good visibility into real "moved" and "unmoved" counts now. I need some input from the community to determine that.

8. Second revisit of loading segments

A "loading" stage.

Repeat stage 6, with the difference that loading queues of all servers can contain up to maxSegmentsInNodeLoadingQueue segments on this stage. This limit is also implicitly in place during all previous stages. This type of limit is shortly identified as full maxSegmentsInNodeLoadingQueue below.

9. StillLoadedOnDecommServers segments

A "decommissioning" stage.

Repeat stage 5 for StillLoadedOnDecommServers segments and with a different limit: decommissioningMaxPercent (relative).

10. ToBeBalanced segments

A "balancing" stage.

In every tier, for every ToBeBalanced segment in that tier, schedule as many as possible movements of the segment from the servers where the segment is currently loaded that are out of the top B best servers in the tier for the segment to one of the servers within the top B.

Limit: full maxSegmentsInNodeLoadingQueue.

11. Repeat segment balancing

Repeat stage 7 with a different limit: full maxSegmentsInNodeLoadingQueue. A segment is considered already visited on step 2 if it was visited during stage 7. Also, during this stage, segments don't need to be added to the ToBeBalanced group.

12. Unload excessively overreplicated segments

For every segment, if the total replication count in all tiers exceeds the sum of target replication counts in all tiers, schedule unloading of the segment from as many servers in the tiers where the segment is overreplicated as needed to match the total replication count with the sum of target replication counts. If there is more than one server where the segment is loaded in any tier where it is overreplicated and the target replication count in that tier is greater than zero, determining the servers to unload the segment from may require to consult to CostBalancerStrategy.

Note that the replicated segments to which one of DropRules currently applies should be unloaded during this stage.

"Non-excessively overreplicated" segments should be scheduled for unloaded in callbacks during stage 3.

13. Unload broadcasted segments from decommissioning servers

For every decommissioning server that is currently holding only broadcasted segments, schedule unloading all these segments from the server.

Rationale

Compared to the current behavior of Coordinator, the proposed algorithm prioritizes availability as much as possible in the face of abrupt changes in the cluster, for example, when a lot of Historical nodes become unavailable and a lot of segments become unreplicated because of that. During stage 1, only a single replica for every unreplicated segment is scheduled for loading, giving a chance for other unreplicated segments to be scheduled for loading before loading queues of all "good" servers are filled up.

Note that top B "best" servers are reused between the stages without recomputing, although some loading decisions that happen in between may affect the scores of the servers. This is done not to repeat expensive computations many times in CostBalancerStrategy for every segment: in the proposed algorithm, a segment may be visited up to five times during different stages in a context of a single tier (an underreplicated segment can be visited during stages 3, 6, 7, 8, and 10). I think this shouldn't affect the quality of cost-based balancing much, however, this opinion is not grounded in anything.

Implementation notes

I don't have a firm understanding or confidence in what I write below in this section because I didn't advance much in the implementation yet.

To handle many different groups of segments (up to dozens, depending on the number of tiers in the cluster) I plan to use bitsets. The index corresponds to the position of the segment in the DruidCoordinatorRuntimeParams.availableSegments (to be renamed into usedSegments in #7306). To allow effective indexing, this data structure is turned into a sorted array instead of TreeSet. set.contains()-like operations are implemented via Arrays.binarySearch(). (This is pretty much how Guava's ImmutableSortedSet works.)

LoadQueuePeon implementations should internally handle three different types of segment loading requests: "loading", "decommissioning", and "balancing".

As I mentioned in #7159, SegmentReplicantLookup should become a concurrent data structure.

Operational impact

I think the new algorithm should become the default immediately, but I'll keep the old version of the code and provide an option to switch to the old implementation if the new algorithm will fail in production for some unforeseen reasons or bugs.

It doesn't seem reasonable to me to keep the old implementation around for more than one Druid release.


FYI @egor-ryashin @gianm @clintropolis

FYI @jihoonson, see the parts of the algorithm related to BroadcastDistributionRules.

@drcrallen
Copy link
Contributor

There was discussion previously about what can/should be done in an extension vs what must be done in core druid. Can you highlight what is needed as far as changes to the core code vs what changes can be put in extensions?

Ideally there would be an extension point that showcases the new behavior, and if that ends up working well it becomes adopted as the core mechanism (see things like the caffeine cache extension and data-sketches as examples of this methodology).

@leventov
Copy link
Member Author

leventov commented Apr 12, 2019

There was discussion previously about what can/should be done in an extension vs what must be done in core druid.

Could you please link to that discussion?

Can you highlight what is needed as far as changes to the core code vs what changes can be put in extensions?

The whole algorithm described in this proposal needs to be in core. Something that may not need to be core is query metrics-based balancing (as opposed to the current cost-based balancing). (Out of scope of this proposal.)

A fallback option to the old behavior should be present, but it should be off by default and removed timely because I don't see any point in supporting the old behavior. The old (current) implementation has many problems (see issues referenced in this proposal, but there are many more, at some point into the development of the new algorithm I stopped putting them into separate issues) and I don't see why would anybody want to use it instead of the new algorithm, other than for the sake of stability, when some rough edges will be found in the new algorithm. On the other hand, the new algorithm is not that different from the current implementation (as, for example, metrics-based vs. cost-based balancing, again) and it's expected to work at least as well as the current implementation in all cases.

@stale
Copy link

stale bot commented Jun 20, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Jun 20, 2019
@leventov leventov added the Evergreen Stalebot will ignore this issue label Jun 26, 2019
@stale
Copy link

stale bot commented Jun 26, 2019

This issue is no longer marked as stale.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants