From 27f86654ded27ee0cf884e3cabe5292cfd11a8e0 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Sun, 28 Apr 2024 22:15:18 +0800 Subject: [PATCH] Improve the operation of slot in SlotRanges Currently, the continuous slot ranges won't be merged while the new slot is added which is expected in Redis behavior. For example, we now have below slot ranges in one shard: [1,100], [102, 200], [202, 300] and the slot `101` was added to this shard, we would like to become: [1, 200], [202, 300] instead of [1, 101], [102, 200], [202, 300]. --- controller/cluster.go | 8 +- store/cluster.go | 7 +- store/slot.go | 132 +++++++++++++++++++++------------ store/slot_test.go | 168 +++++++++++++++++------------------------- 4 files changed, 157 insertions(+), 158 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index b086419a..27fcc6bb 100644 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -298,11 +298,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster * log.Error("Failed to set the slot", zap.Error(err)) return } - cluster.Shards[i].SlotRanges = store.RemoveSlotRanges(cluster.Shards[i].SlotRanges, - []store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}}) - cluster.Shards[shard.TargetShardIndex].SlotRanges = store.MergeSlotRanges( - cluster.Shards[shard.TargetShardIndex].SlotRanges, - []store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}}) + cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot) + cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( + cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) cluster.Shards[i].ClearMigrateState() if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) diff --git a/store/cluster.go b/store/cluster.go index 75f6cef2..ec725861 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -53,7 +53,7 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { } shardCount := len(nodes) / replicas shards := make([]*Shard, 0) - slotRanges := SpiltSlotRange(shardCount) + slotRanges := CalculateSlotRanges(shardCount) for i := 0; i < shardCount; i++ { shard := NewShard() shard.Nodes = make([]Node, 0) @@ -186,9 +186,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId return consts.ErrShardIsSame } if slotOnly { - migrateSlot := SlotRange{Start: slot, Stop: slot} - cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, []SlotRange{migrateSlot}) - cluster.Shards[targetShardIdx].SlotRanges = MergeSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, []SlotRange{migrateSlot}) + cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot) + cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot) return nil } diff --git a/store/slot.go b/store/slot.go index db088db7..8544aa6c 100644 --- a/store/slot.go +++ b/store/slot.go @@ -39,6 +39,8 @@ type SlotRange struct { Stop int `json:"stop"` } +type SlotRanges []SlotRange + func NewSlotRange(start, stop int) (*SlotRange, error) { if start > stop { return nil, errors.New("start was larger than Shutdown") @@ -122,69 +124,103 @@ func ParseSlotRange(s string) (*SlotRange, error) { }, nil } -func MergeSlotRanges(source []SlotRange, target []SlotRange) []SlotRange { - source = append(source, target...) - if len(source) == 1 { - return source +func (SlotRanges *SlotRanges) Contains(slot int) bool { + for _, slotRange := range *SlotRanges { + if slotRange.Contains(slot) { + return true + } } + return false +} + +func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges { sort.Slice(source, func(i, j int) bool { return source[i].Start < source[j].Start }) - merged := make([]SlotRange, 0) - start := source[0].Start - stop := source[0].Stop - for i := 1; i < len(source); i++ { - if stop+1 < source[i].Start { - merged = append(merged, SlotRange{Start: start, Stop: stop}) - start = source[i].Start - stop = source[i].Stop - } else if stop < source[i].Stop { - stop = source[i].Stop + if len(source) == 0 { + return append(source, SlotRange{Start: slot, Stop: slot}) + } + if source[0].Start-1 > slot { + return append([]SlotRange{{Start: slot, Stop: slot}}, source...) + } + if source[len(source)-1].Stop+1 < slot { + return append(source, SlotRange{Start: slot, Stop: slot}) + } + + // first run is to find the fittest slot range and create a new one if necessary + for i, slotRange := range source { + if slotRange.Contains(slot) { + return source + } + // check next slot range, it won't be the last one since we have checked it before + if slotRange.Stop+1 < slot { + continue + } + if slotRange.Start == slot+1 { + source[i].Start = slot + } else if slotRange.Stop == slot-1 { + source[i].Stop = slot + } else if slotRange.Start > slot { + // no suitable slot range, create a new one before the current slot range + tmp := make(SlotRanges, len(source)+1) + copy(tmp, source[0:i]) + tmp[i] = SlotRange{Start: slot, Stop: slot} + copy(tmp[i+1:], source[i:]) + source = tmp + } else { + // should not reach here + panic("should not reach here") } + break } - merged = append(merged, SlotRange{Start: start, Stop: stop}) - return merged + // merge the slot ranges if necessary + for i := 0; i < len(source)-1; i++ { + if source[i].Stop+1 == source[i+1].Start { + source[i].Stop = source[i+1].Stop + if i+1 == len(source)-1 { + // remove the last slot range + source = source[:i+1] + } else { + source = append(source[:i+1], source[i+2:]...) + } + } + } + return source } -func RemoveSlotRanges(source []SlotRange, target []SlotRange) []SlotRange { - for delIdx := 0; delIdx < len(target); { - deleteSlotRange := target[delIdx] - sort.Slice(source, func(i, j int) bool { - return source[i].Start < source[j].Start - }) - skip := true - for i, slotRange := range source { - if !slotRange.HasOverlap(&deleteSlotRange) { - continue - } - skip = false - source = append(source[0:i], source[i+1:]...) - if deleteSlotRange.Start == slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop}) - } else if deleteSlotRange.Stop == slotRange.Stop && deleteSlotRange.Start > slotRange.Start { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - } else if deleteSlotRange.Start < slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop}) - } else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop > slotRange.Stop { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - } else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1., Stop: slotRange.Stop}) +func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges { + sort.Slice(source, func(i, j int) bool { + return source[i].Start < source[j].Start + }) + if !source.Contains(slot) { + return source + } + for i, slotRange := range source { + if slotRange.Contains(slot) { + if slotRange.Start == slot && slotRange.Stop == slot { + source = append(source[0:i], source[i+1:]...) + } else if slotRange.Start == slot { + source[i].Start = slot + 1 + } else if slotRange.Stop == slot { + source[i].Stop = slot - 1 + } else { + tmp := make(SlotRanges, len(source)+1) + copy(tmp, source[0:i]) + tmp[i] = SlotRange{Start: slotRange.Start, Stop: slot - 1} + tmp[i+1] = SlotRange{Start: slot + 1, Stop: slotRange.Stop} + copy(tmp[i+2:], source[i+1:]) + source = tmp } - break - } - if skip { - delIdx++ } } return source } -func SpiltSlotRange(number int) []SlotRange { +func CalculateSlotRanges(n int) SlotRanges { var slots []SlotRange - rangeSize := (MaxSlotID + 1) / number - for i := 0; i < number; i++ { - if i != number-1 { + rangeSize := (MaxSlotID + 1) / n + for i := 0; i < n; i++ { + if i != n-1 { slots = append(slots, SlotRange{Start: i * rangeSize, Stop: (i+1)*rangeSize - 1}) } else { slots = append(slots, SlotRange{Start: i * rangeSize, Stop: MaxSlotID}) diff --git a/store/slot_test.go b/store/slot_test.go index e5d2ce22..f5740af5 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -57,113 +57,79 @@ func TestSlotRange_Parse(t *testing.T) { assert.NotNil(t, err) } -func TestSlotRange_MergeSlotRanges(t *testing.T) { - range1 := SlotRange{ - Start: 0, - Stop: 8191, - } - range2 := SlotRange{ - Start: 8192, - Stop: 16383, - } - newSlot := MergeSlotRanges([]SlotRange{range1}, []SlotRange{range2}) - assert.Equal(t, 1, len(newSlot)) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) - - range3 := SlotRange{ - Start: 0, - Stop: 8199, - } - range4 := SlotRange{ - Start: 8192, - Stop: 16383, - } - newSlot = MergeSlotRanges([]SlotRange{range3}, []SlotRange{range4}) - assert.Equal(t, 1, len(newSlot)) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) +func TestAddSlotToSlotRanges(t *testing.T) { + slotRanges := SlotRanges{ + {Start: 1, Stop: 20}, + {Start: 101, Stop: 199}, + {Start: 201, Stop: 300}, + } + slotRanges = AddSlotToSlotRanges(slotRanges, 0) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 21) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 50) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 200) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 400) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3]) } -func TestSlotRange_RemoveSlotRanges(t *testing.T) { - range1 := SlotRange{ - Start: 0, - Stop: 8191, - } - range2 := SlotRange{ - Start: 0, - Stop: 0, - } - range3 := SlotRange{ - Start: 8191, - Stop: 8191, +func TestRemoveSlotRanges(t *testing.T) { + slotRanges := SlotRanges{ + {Start: 1, Stop: 20}, + {Start: 101, Stop: 199}, + {Start: 201, Stop: 300}, } - newSlot := RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range2}) - assert.Equal(t, 1, newSlot[0].Start) - assert.Equal(t, 8191, newSlot[0].Stop) - newSlot = RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range3}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 8190, newSlot[0].Stop) - - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 0, - Stop: 8192, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3}) - assert.Equal(t, 8193, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 1, - Stop: 8192, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 0, newSlot[0].Stop) - assert.Equal(t, 8193, newSlot[1].Start) - assert.Equal(t, 16383, newSlot[1].Stop) - - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 1, - Stop: 8192, - } - range4 := SlotRange{ - Start: 8194, - Stop: 16383, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3, range4}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 0, newSlot[0].Stop) - assert.Equal(t, 8193, newSlot[1].Start) - assert.Equal(t, 8193, newSlot[1].Stop) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298) + require.Equal(t, 5, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) + require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) } -func TestSlotRange_SpiltSlotRange(t *testing.T) { - slots := SpiltSlotRange(5) +func TestCalculateSlotRanges(t *testing.T) { + slots := CalculateSlotRanges(5) assert.Equal(t, 0, slots[0].Start) assert.Equal(t, 3275, slots[0].Stop) assert.Equal(t, 13104, slots[4].Start)