diff --git a/controller/cluster.go b/controller/cluster.go index b086419a..27fcc6bb 100644 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -298,11 +298,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster * log.Error("Failed to set the slot", zap.Error(err)) return } - cluster.Shards[i].SlotRanges = store.RemoveSlotRanges(cluster.Shards[i].SlotRanges, - []store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}}) - cluster.Shards[shard.TargetShardIndex].SlotRanges = store.MergeSlotRanges( - cluster.Shards[shard.TargetShardIndex].SlotRanges, - []store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}}) + cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot) + cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( + cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot) cluster.Shards[i].ClearMigrateState() if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) diff --git a/store/cluster.go b/store/cluster.go index 75f6cef2..ec725861 100644 --- a/store/cluster.go +++ b/store/cluster.go @@ -53,7 +53,7 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) { } shardCount := len(nodes) / replicas shards := make([]*Shard, 0) - slotRanges := SpiltSlotRange(shardCount) + slotRanges := CalculateSlotRanges(shardCount) for i := 0; i < shardCount; i++ { shard := NewShard() shard.Nodes = make([]Node, 0) @@ -186,9 +186,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId return consts.ErrShardIsSame } if slotOnly { - migrateSlot := SlotRange{Start: slot, Stop: slot} - cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, []SlotRange{migrateSlot}) - cluster.Shards[targetShardIdx].SlotRanges = MergeSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, []SlotRange{migrateSlot}) + cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot) + cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot) return nil } diff --git a/store/slot.go b/store/slot.go index db088db7..8544aa6c 100644 --- a/store/slot.go +++ b/store/slot.go @@ -39,6 +39,8 @@ type SlotRange struct { Stop int `json:"stop"` } +type SlotRanges []SlotRange + func NewSlotRange(start, stop int) (*SlotRange, error) { if start > stop { return nil, errors.New("start was larger than Shutdown") @@ -122,69 +124,103 @@ func ParseSlotRange(s string) (*SlotRange, error) { }, nil } -func MergeSlotRanges(source []SlotRange, target []SlotRange) []SlotRange { - source = append(source, target...) - if len(source) == 1 { - return source +func (SlotRanges *SlotRanges) Contains(slot int) bool { + for _, slotRange := range *SlotRanges { + if slotRange.Contains(slot) { + return true + } } + return false +} + +func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges { sort.Slice(source, func(i, j int) bool { return source[i].Start < source[j].Start }) - merged := make([]SlotRange, 0) - start := source[0].Start - stop := source[0].Stop - for i := 1; i < len(source); i++ { - if stop+1 < source[i].Start { - merged = append(merged, SlotRange{Start: start, Stop: stop}) - start = source[i].Start - stop = source[i].Stop - } else if stop < source[i].Stop { - stop = source[i].Stop + if len(source) == 0 { + return append(source, SlotRange{Start: slot, Stop: slot}) + } + if source[0].Start-1 > slot { + return append([]SlotRange{{Start: slot, Stop: slot}}, source...) + } + if source[len(source)-1].Stop+1 < slot { + return append(source, SlotRange{Start: slot, Stop: slot}) + } + + // first run is to find the fittest slot range and create a new one if necessary + for i, slotRange := range source { + if slotRange.Contains(slot) { + return source + } + // check next slot range, it won't be the last one since we have checked it before + if slotRange.Stop+1 < slot { + continue + } + if slotRange.Start == slot+1 { + source[i].Start = slot + } else if slotRange.Stop == slot-1 { + source[i].Stop = slot + } else if slotRange.Start > slot { + // no suitable slot range, create a new one before the current slot range + tmp := make(SlotRanges, len(source)+1) + copy(tmp, source[0:i]) + tmp[i] = SlotRange{Start: slot, Stop: slot} + copy(tmp[i+1:], source[i:]) + source = tmp + } else { + // should not reach here + panic("should not reach here") } + break } - merged = append(merged, SlotRange{Start: start, Stop: stop}) - return merged + // merge the slot ranges if necessary + for i := 0; i < len(source)-1; i++ { + if source[i].Stop+1 == source[i+1].Start { + source[i].Stop = source[i+1].Stop + if i+1 == len(source)-1 { + // remove the last slot range + source = source[:i+1] + } else { + source = append(source[:i+1], source[i+2:]...) + } + } + } + return source } -func RemoveSlotRanges(source []SlotRange, target []SlotRange) []SlotRange { - for delIdx := 0; delIdx < len(target); { - deleteSlotRange := target[delIdx] - sort.Slice(source, func(i, j int) bool { - return source[i].Start < source[j].Start - }) - skip := true - for i, slotRange := range source { - if !slotRange.HasOverlap(&deleteSlotRange) { - continue - } - skip = false - source = append(source[0:i], source[i+1:]...) - if deleteSlotRange.Start == slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop}) - } else if deleteSlotRange.Stop == slotRange.Stop && deleteSlotRange.Start > slotRange.Start { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - } else if deleteSlotRange.Start < slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop}) - } else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop > slotRange.Stop { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - } else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop < slotRange.Stop { - source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1}) - source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1., Stop: slotRange.Stop}) +func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges { + sort.Slice(source, func(i, j int) bool { + return source[i].Start < source[j].Start + }) + if !source.Contains(slot) { + return source + } + for i, slotRange := range source { + if slotRange.Contains(slot) { + if slotRange.Start == slot && slotRange.Stop == slot { + source = append(source[0:i], source[i+1:]...) + } else if slotRange.Start == slot { + source[i].Start = slot + 1 + } else if slotRange.Stop == slot { + source[i].Stop = slot - 1 + } else { + tmp := make(SlotRanges, len(source)+1) + copy(tmp, source[0:i]) + tmp[i] = SlotRange{Start: slotRange.Start, Stop: slot - 1} + tmp[i+1] = SlotRange{Start: slot + 1, Stop: slotRange.Stop} + copy(tmp[i+2:], source[i+1:]) + source = tmp } - break - } - if skip { - delIdx++ } } return source } -func SpiltSlotRange(number int) []SlotRange { +func CalculateSlotRanges(n int) SlotRanges { var slots []SlotRange - rangeSize := (MaxSlotID + 1) / number - for i := 0; i < number; i++ { - if i != number-1 { + rangeSize := (MaxSlotID + 1) / n + for i := 0; i < n; i++ { + if i != n-1 { slots = append(slots, SlotRange{Start: i * rangeSize, Stop: (i+1)*rangeSize - 1}) } else { slots = append(slots, SlotRange{Start: i * rangeSize, Stop: MaxSlotID}) diff --git a/store/slot_test.go b/store/slot_test.go index e5d2ce22..f5740af5 100644 --- a/store/slot_test.go +++ b/store/slot_test.go @@ -57,113 +57,79 @@ func TestSlotRange_Parse(t *testing.T) { assert.NotNil(t, err) } -func TestSlotRange_MergeSlotRanges(t *testing.T) { - range1 := SlotRange{ - Start: 0, - Stop: 8191, - } - range2 := SlotRange{ - Start: 8192, - Stop: 16383, - } - newSlot := MergeSlotRanges([]SlotRange{range1}, []SlotRange{range2}) - assert.Equal(t, 1, len(newSlot)) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) - - range3 := SlotRange{ - Start: 0, - Stop: 8199, - } - range4 := SlotRange{ - Start: 8192, - Stop: 16383, - } - newSlot = MergeSlotRanges([]SlotRange{range3}, []SlotRange{range4}) - assert.Equal(t, 1, len(newSlot)) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) +func TestAddSlotToSlotRanges(t *testing.T) { + slotRanges := SlotRanges{ + {Start: 1, Stop: 20}, + {Start: 101, Stop: 199}, + {Start: 201, Stop: 300}, + } + slotRanges = AddSlotToSlotRanges(slotRanges, 0) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 21) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 50) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 200) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2]) + + slotRanges = AddSlotToSlotRanges(slotRanges, 400) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3]) } -func TestSlotRange_RemoveSlotRanges(t *testing.T) { - range1 := SlotRange{ - Start: 0, - Stop: 8191, - } - range2 := SlotRange{ - Start: 0, - Stop: 0, - } - range3 := SlotRange{ - Start: 8191, - Stop: 8191, +func TestRemoveSlotRanges(t *testing.T) { + slotRanges := SlotRanges{ + {Start: 1, Stop: 20}, + {Start: 101, Stop: 199}, + {Start: 201, Stop: 300}, } - newSlot := RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range2}) - assert.Equal(t, 1, newSlot[0].Start) - assert.Equal(t, 8191, newSlot[0].Stop) - newSlot = RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range3}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 8190, newSlot[0].Stop) - - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 0, - Stop: 8192, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3}) - assert.Equal(t, 8193, newSlot[0].Start) - assert.Equal(t, 16383, newSlot[0].Stop) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 1, - Stop: 8192, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 0, newSlot[0].Stop) - assert.Equal(t, 8193, newSlot[1].Start) - assert.Equal(t, 16383, newSlot[1].Stop) - - range1 = SlotRange{ - Start: 0, - Stop: 8191, - } - range2 = SlotRange{ - Start: 8192, - Stop: 16383, - } - range3 = SlotRange{ - Start: 1, - Stop: 8192, - } - range4 := SlotRange{ - Start: 8194, - Stop: 16383, - } - newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3, range4}) - assert.Equal(t, 0, newSlot[0].Start) - assert.Equal(t, 0, newSlot[0].Stop) - assert.Equal(t, 8193, newSlot[1].Start) - assert.Equal(t, 8193, newSlot[1].Stop) + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20) + require.Equal(t, 3, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298) + require.Equal(t, 5, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) + require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4]) + + slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299) + require.Equal(t, 4, len(slotRanges)) + require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3]) } -func TestSlotRange_SpiltSlotRange(t *testing.T) { - slots := SpiltSlotRange(5) +func TestCalculateSlotRanges(t *testing.T) { + slots := CalculateSlotRanges(5) assert.Equal(t, 0, slots[0].Start) assert.Equal(t, 3275, slots[0].Stop) assert.Equal(t, 13104, slots[4].Start)