Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Sharding allocation strategy based on slice ranges #32418

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

patriknw
Copy link
Member

  • for the database sharding we want to reduce number of db connections when using many Akka nodes
  • allocate entity sharding by slice ranges, which is also used by the database sharding
  • thereby the db connections from one Akka node will go to one database, instead of to all

Early draft so far.

* for the database sharding we want to reduce number of db connections when using many Akka nodes
* allocate entity sharding by slice ranges, which is also used by the database sharding
* thereby the db connections from one Akka node will go to one database, instead of to all
val regionsByMbr = regionsByMember(currentShardAllocations.keySet)
val regions = regionsByMbr.keysIterator.toIndexedSeq.sorted(Member.ageOrdering).map(regionsByMbr(_))
val rangeSize = NumberOfSlices / regions.size
val i = slice / rangeSize
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A problem with this first naive approach is that adding/removing nodes will reshuffle many shards. Consistent hashing would be nice, but since it's (dynamic) ranges I don't see how that can be used. I have an idea that it can instead find existing adjacent shards and prefer allocation to same region.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tracked what the updated algorithm is yet, but feels like this issue could always be there in some form.

The optimal allocation (however it ends up there) will have all the shards for a slice range together. When these need to be reallocated, the optimal is to allocate as many as possible to one node again, but that either leads to reshuffles, or needing to accept unbalanced distributions, or they need to be redistributed over the other nodes creating fragmentation for the slice range.

But maybe in practice the find-neighbours approach naturally ends up balancing the tradeoff over time?

@patriknw
Copy link
Member Author

@johanandren @pvlugter I have something with decent results for the simulations. Can you take a look before I continue with tuning and real tests.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far.

Comment on lines 121 to 122
// This covers the rounding case for the last region, which we just distribute over all regions.
// May also happen if member for that region has been removed, but that should be a rare case.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This covers the rounding case for the last region, which we just distribute over all regions.
// May also happen if member for that region has been removed, but that should be a rare case.
// This covers the rounding case for the last slice, which we just distribute over all regions.
// May also happen if member for that region has been removed, but that should be a rare case.

val overfill = 2
val maxShards = (NumberOfSlices / currentShardAllocations.size) + overfill

// FIXME take a look at ShardSuitabilityOrdering for member status and appVersion preference
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems quite important, or I think it will try to stick to allocating to old nodes until quite a number of cluster nodes has rolled, as it prefers neighbours. Maybe I'm missing something with the maxShards protecting against that?

emptyRebalanceResult
} else {
// this is the number of slices per region that we are aiming for
val overfill = 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it align with the overfill on allocation? (just the 1 in diff but seems strange)

val overfill = 1
val targetSize = NumberOfSlices / sortedRegionEntries.size + overfill
val selected = Vector.newBuilder[ShardId]
// FIXME ShardSuitabilityOrdering isn't used, but it seems better to use most shards first, combine them?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing the handling of leaving regions and app version from there because there if we don't want to use it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol, that sentence was not parseable, just wanted to note that the ShardSuitabilityOrdering looks at member leaving and app version, so if we don't want to use it we should probably do something around those properties as well.

@patriknw patriknw force-pushed the wip-sharding-by-slice-patriknw branch from 1634bec to e67e5c1 Compare May 21, 2024 09:39

findRegionWithNeighbor(slice, sortedRegionEntries) match {
case Some(regionWithNeighbor) =>
Future.successful(regionWithNeighbor)
Copy link
Member Author

@patriknw patriknw May 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had an idea that I thought would be good, and implemented in 1d5d05b. Reverted because "try distributions" didn't show an improvement in reduction of connections. The idea was to also look at the already allocated range from min to max slice in that region, and if the slice is outside of the optimal range try a to find a region with lower/upper slice neighbor instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still puzzled by that this didn't work, maybe I made some mistake in the implementation. I might debug it a little more.

// These are not real tests, but can be useful for exploring the algorithm and tuning
"SliceRangeShardAllocationStrategy simulations" must {

"try distributions" ignore {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few examples of the result:

total of 73 connections from 20 nodes to 8 backend ranges, reduction by 87
total of 88 connections from 20 nodes to 16 backend ranges, reduction by 232

total of 84 connections from 50 nodes to 8 backend ranges, reduction by 316
total of 98 connections from 50 nodes to 16 backend ranges, reduction by 702

total of 125 connections from 100 nodes to 8 backend ranges, reduction by 675
total of 141 connections from 100 nodes to 16 backend ranges, reduction by 1459

And connection here should be read as db connection pool, so multiply by a factor of 10 or size of the pool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so for the numbers we've been testing with (around 25 nodes and 8 slice ranges) it's a decent improvement, but still over 3x the optimal number of connections. And for higher number of nodes it's easier to get affinity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say a connection pool of max 20, then 73*20/8=274 connections per db should still be fine.
I'd say that it's more important for the larger clusters, so good that it looks better for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea that I have been pondering is if we should have a pre-allocation phase where it would allocate all shards in order 0-1023. Then it would be near perfect affinity. Would have to be triggered from the outside of the allocation strategy itself, in the coordinator, or by something asking the coordinator for shard homes. Could be triggered when reaching min-nr-of-members.

However, it would only help for the initial allocation and not for rebalance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a small change that had a significant improvement for the smaller clusters. I adjust how far to look for neighbors depending on optimal range size, previously it was hardcoded to 10. bc9f86d

New results:

total of 46 connections from 20 nodes to 8 backend ranges, reduction by 114
total of 67 connections from 20 nodes to 16 backend ranges, reduction by 253

total of 84 connections from 50 nodes to 8 backend ranges, reduction by 316
total of 98 connections from 50 nodes to 16 backend ranges, reduction by 702

total of 124 connections from 100 nodes to 8 backend ranges, reduction by 676
total of 138 connections from 100 nodes to 16 backend ranges, reduction by 1462

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Makes sense that it has more opportunity to coalesce with looking further.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking pretty good, esp for the bigger cluster sizes when running the "try distributions" case.

override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId

override def shardId(entityId: String): String = {
// FIXME shall we have the Persistence extension dependency here, or re-implement sliceForPersistenceId?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional dependency on persistence seems fine to me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that this is only useful together with database sharding, i.e. persistence included. I was even thinking that it should only be documented in https://doc.akka.io/docs/akka-persistence-r2dbc/current/data-partition.html

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that makes sense

if (slice >= numberOfSlices)
throw new IllegalArgumentException("slice must be between 0 and 1023. Use `ShardBySliceMessageExtractor`.")

val sortedRegionEntries = regionEntriesFor(currentShardAllocations).toVector.sorted(shardSuitabilityOrdering)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice, covers leaving and app version without repeating that logic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, had to make a parameter for using least shards or not in the ordering

// prefer the node with the least allocated shards
JInteger.compare(allocatedShardsX.size, allocatedShardsY.size)
} else if (x.member.upNumber != y.member.upNumber) {
// prefer older
Member.ageOrdering.compare(x.member, y.member)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why prefer older? Not saying younger or ignoring age would be better, but want to understand the rationale. (I understand this is the case where nodes have same version so not rolling)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No strong preference, mostly just wanted to have a deterministic order. Possibly that a younger could be more "cold" and shouldn't be overloaded by many new shards immediately.

val currentNumberOfShards = sortedRegionEntries.map(_.shardIds.size).sum
val limitedResult = result.take(limit(currentNumberOfShards)).toSet
previousRebalance = previousRebalance.union(limitedResult)
if (previousRebalance.size >= numberOfSlices / 4)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did it trigger rebalance-loop with the previous lower value (100 I think)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exact science around that choice. In some simulations I could see loops with 100, which looked better with 256.

@patriknw
Copy link
Member Author

Some api docs and maybe some cleanup remaining, but marking as ready.

@patriknw patriknw marked this pull request as ready for review May 21, 2024 13:50
Copy link
Contributor

@pvlugter pvlugter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good starting point to improve things for number of connections 👍🏼

The tension between distribution and affinity feels fundamental to the problem here, and likely always some tradeoffs to make. Maybe it's useful to have an option to trigger more reshuffling to get closer to optimal in terms of affinity, if cluster membership changes are not expected to happen often and more movement is ok during changes before settling again? Or to retain optimal affinity allocation after a rolling update (compared with cluster size changes).

Some extra thoughts (not for now): wonder if there's potential for this to become a general affinity-based allocation strategy, where some affinity function is provided. And thinking about whether there's a hashing + affinity/preference approach that would be useful here (compared with searching for neighbours / high affinity shards).

// These are not real tests, but can be useful for exploring the algorithm and tuning
"SliceRangeShardAllocationStrategy simulations" must {

"try distributions" ignore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so for the numbers we've been testing with (around 25 nodes and 8 slice ranges) it's a decent improvement, but still over 3x the optimal number of connections. And for higher number of nodes it's easier to get affinity.

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants