Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,9 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, cluster *
log.Error("Failed to set the slot", zap.Error(err))
return
}
cluster.Shards[i].SlotRanges = store.RemoveSlotRanges(cluster.Shards[i].SlotRanges,
[]store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}})
cluster.Shards[shard.TargetShardIndex].SlotRanges = store.MergeSlotRanges(
cluster.Shards[shard.TargetShardIndex].SlotRanges,
[]store.SlotRange{{Start: shard.MigratingSlot, Stop: shard.MigratingSlot}})
cluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(cluster.Shards[i].SlotRanges, shard.MigratingSlot)
cluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges(
cluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot)
cluster.Shards[i].ClearMigrateState()
if err := c.clusterStore.SetCluster(ctx, c.namespace, cluster); err != nil {
log.Error("Failed to update the cluster", zap.Error(err))
Expand Down
7 changes: 3 additions & 4 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewCluster(name string, nodes []string, replicas int) (*Cluster, error) {
}
shardCount := len(nodes) / replicas
shards := make([]*Shard, 0)
slotRanges := SpiltSlotRange(shardCount)
slotRanges := CalculateSlotRanges(shardCount)
for i := 0; i < shardCount; i++ {
shard := NewShard()
shard.Nodes = make([]Node, 0)
Expand Down Expand Up @@ -186,9 +186,8 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId
return consts.ErrShardIsSame
}
if slotOnly {
migrateSlot := SlotRange{Start: slot, Stop: slot}
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, []SlotRange{migrateSlot})
cluster.Shards[targetShardIdx].SlotRanges = MergeSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, []SlotRange{migrateSlot})
cluster.Shards[sourceShardIdx].SlotRanges = RemoveSlotFromSlotRanges(cluster.Shards[sourceShardIdx].SlotRanges, slot)
cluster.Shards[targetShardIdx].SlotRanges = AddSlotToSlotRanges(cluster.Shards[targetShardIdx].SlotRanges, slot)
return nil
}

Expand Down
132 changes: 84 additions & 48 deletions store/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type SlotRange struct {
Stop int `json:"stop"`
}

type SlotRanges []SlotRange

func NewSlotRange(start, stop int) (*SlotRange, error) {
if start > stop {
return nil, errors.New("start was larger than Shutdown")
Expand Down Expand Up @@ -122,69 +124,103 @@ func ParseSlotRange(s string) (*SlotRange, error) {
}, nil
}

func MergeSlotRanges(source []SlotRange, target []SlotRange) []SlotRange {
source = append(source, target...)
if len(source) == 1 {
return source
func (SlotRanges *SlotRanges) Contains(slot int) bool {
for _, slotRange := range *SlotRanges {
if slotRange.Contains(slot) {
return true
}
}
return false
}

func AddSlotToSlotRanges(source SlotRanges, slot int) SlotRanges {
sort.Slice(source, func(i, j int) bool {
return source[i].Start < source[j].Start
})
merged := make([]SlotRange, 0)
start := source[0].Start
stop := source[0].Stop
for i := 1; i < len(source); i++ {
if stop+1 < source[i].Start {
merged = append(merged, SlotRange{Start: start, Stop: stop})
start = source[i].Start
stop = source[i].Stop
} else if stop < source[i].Stop {
stop = source[i].Stop
if len(source) == 0 {
return append(source, SlotRange{Start: slot, Stop: slot})
}
if source[0].Start-1 > slot {
return append([]SlotRange{{Start: slot, Stop: slot}}, source...)
}
if source[len(source)-1].Stop+1 < slot {
return append(source, SlotRange{Start: slot, Stop: slot})
}

// first run is to find the fittest slot range and create a new one if necessary
for i, slotRange := range source {
if slotRange.Contains(slot) {
return source
}
// check next slot range, it won't be the last one since we have checked it before
if slotRange.Stop+1 < slot {
continue
}
if slotRange.Start == slot+1 {
source[i].Start = slot
} else if slotRange.Stop == slot-1 {
source[i].Stop = slot
} else if slotRange.Start > slot {
// no suitable slot range, create a new one before the current slot range
tmp := make(SlotRanges, len(source)+1)
copy(tmp, source[0:i])
tmp[i] = SlotRange{Start: slot, Stop: slot}
copy(tmp[i+1:], source[i:])
source = tmp
} else {
// should not reach here
panic("should not reach here")
}
break
}
merged = append(merged, SlotRange{Start: start, Stop: stop})
return merged
// merge the slot ranges if necessary
for i := 0; i < len(source)-1; i++ {
if source[i].Stop+1 == source[i+1].Start {
source[i].Stop = source[i+1].Stop
if i+1 == len(source)-1 {
// remove the last slot range
source = source[:i+1]
} else {
source = append(source[:i+1], source[i+2:]...)
}
}
}
return source
}

func RemoveSlotRanges(source []SlotRange, target []SlotRange) []SlotRange {
for delIdx := 0; delIdx < len(target); {
deleteSlotRange := target[delIdx]
sort.Slice(source, func(i, j int) bool {
return source[i].Start < source[j].Start
})
skip := true
for i, slotRange := range source {
if !slotRange.HasOverlap(&deleteSlotRange) {
continue
}
skip = false
source = append(source[0:i], source[i+1:]...)
if deleteSlotRange.Start == slotRange.Start && deleteSlotRange.Stop < slotRange.Stop {
source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop})
} else if deleteSlotRange.Stop == slotRange.Stop && deleteSlotRange.Start > slotRange.Start {
source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1})
} else if deleteSlotRange.Start < slotRange.Start && deleteSlotRange.Stop < slotRange.Stop {
source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1, Stop: slotRange.Stop})
} else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop > slotRange.Stop {
source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1})
} else if deleteSlotRange.Start > slotRange.Start && deleteSlotRange.Stop < slotRange.Stop {
source = append(source, SlotRange{Start: slotRange.Start, Stop: deleteSlotRange.Start - 1})
source = append(source, SlotRange{Start: deleteSlotRange.Stop + 1., Stop: slotRange.Stop})
func RemoveSlotFromSlotRanges(source SlotRanges, slot int) SlotRanges {
sort.Slice(source, func(i, j int) bool {
return source[i].Start < source[j].Start
})
if !source.Contains(slot) {
return source
}
for i, slotRange := range source {
if slotRange.Contains(slot) {
if slotRange.Start == slot && slotRange.Stop == slot {
source = append(source[0:i], source[i+1:]...)
} else if slotRange.Start == slot {
source[i].Start = slot + 1
} else if slotRange.Stop == slot {
source[i].Stop = slot - 1
} else {
tmp := make(SlotRanges, len(source)+1)
copy(tmp, source[0:i])
tmp[i] = SlotRange{Start: slotRange.Start, Stop: slot - 1}
tmp[i+1] = SlotRange{Start: slot + 1, Stop: slotRange.Stop}
copy(tmp[i+2:], source[i+1:])
source = tmp
}
break
}
if skip {
delIdx++
}
}
return source
}

func SpiltSlotRange(number int) []SlotRange {
func CalculateSlotRanges(n int) SlotRanges {
var slots []SlotRange
rangeSize := (MaxSlotID + 1) / number
for i := 0; i < number; i++ {
if i != number-1 {
rangeSize := (MaxSlotID + 1) / n
for i := 0; i < n; i++ {
if i != n-1 {
slots = append(slots, SlotRange{Start: i * rangeSize, Stop: (i+1)*rangeSize - 1})
} else {
slots = append(slots, SlotRange{Start: i * rangeSize, Stop: MaxSlotID})
Expand Down
168 changes: 67 additions & 101 deletions store/slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,113 +57,79 @@ func TestSlotRange_Parse(t *testing.T) {
assert.NotNil(t, err)
}

func TestSlotRange_MergeSlotRanges(t *testing.T) {
range1 := SlotRange{
Start: 0,
Stop: 8191,
}
range2 := SlotRange{
Start: 8192,
Stop: 16383,
}
newSlot := MergeSlotRanges([]SlotRange{range1}, []SlotRange{range2})
assert.Equal(t, 1, len(newSlot))
assert.Equal(t, 0, newSlot[0].Start)
assert.Equal(t, 16383, newSlot[0].Stop)

range3 := SlotRange{
Start: 0,
Stop: 8199,
}
range4 := SlotRange{
Start: 8192,
Stop: 16383,
}
newSlot = MergeSlotRanges([]SlotRange{range3}, []SlotRange{range4})
assert.Equal(t, 1, len(newSlot))
assert.Equal(t, 0, newSlot[0].Start)
assert.Equal(t, 16383, newSlot[0].Stop)
func TestAddSlotToSlotRanges(t *testing.T) {
slotRanges := SlotRanges{
{Start: 1, Stop: 20},
{Start: 101, Stop: 199},
{Start: 201, Stop: 300},
}
slotRanges = AddSlotToSlotRanges(slotRanges, 0)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 0, Stop: 20}, slotRanges[0])

slotRanges = AddSlotToSlotRanges(slotRanges, 21)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 0, Stop: 21}, slotRanges[0])

slotRanges = AddSlotToSlotRanges(slotRanges, 50)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 50, Stop: 50}, slotRanges[1])

slotRanges = AddSlotToSlotRanges(slotRanges, 200)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 101, Stop: 300}, slotRanges[2])

slotRanges = AddSlotToSlotRanges(slotRanges, 400)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 400, Stop: 400}, slotRanges[3])
}

func TestSlotRange_RemoveSlotRanges(t *testing.T) {
range1 := SlotRange{
Start: 0,
Stop: 8191,
}
range2 := SlotRange{
Start: 0,
Stop: 0,
}
range3 := SlotRange{
Start: 8191,
Stop: 8191,
func TestRemoveSlotRanges(t *testing.T) {
slotRanges := SlotRanges{
{Start: 1, Stop: 20},
{Start: 101, Stop: 199},
{Start: 201, Stop: 300},
}
newSlot := RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range2})
assert.Equal(t, 1, newSlot[0].Start)
assert.Equal(t, 8191, newSlot[0].Stop)
newSlot = RemoveSlotRanges([]SlotRange{range1}, []SlotRange{range3})
assert.Equal(t, 0, newSlot[0].Start)
assert.Equal(t, 8190, newSlot[0].Stop)

range1 = SlotRange{
Start: 0,
Stop: 8191,
}
range2 = SlotRange{
Start: 8192,
Stop: 16383,
}
range3 = SlotRange{
Start: 0,
Stop: 8192,
}
newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3})
assert.Equal(t, 8193, newSlot[0].Start)
assert.Equal(t, 16383, newSlot[0].Stop)
slotRanges = RemoveSlotFromSlotRanges(slotRanges, 0)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])

range1 = SlotRange{
Start: 0,
Stop: 8191,
}
range2 = SlotRange{
Start: 8192,
Stop: 16383,
}
range3 = SlotRange{
Start: 1,
Stop: 8192,
}
newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3})
assert.Equal(t, 0, newSlot[0].Start)
assert.Equal(t, 0, newSlot[0].Stop)
assert.Equal(t, 8193, newSlot[1].Start)
assert.Equal(t, 16383, newSlot[1].Stop)

range1 = SlotRange{
Start: 0,
Stop: 8191,
}
range2 = SlotRange{
Start: 8192,
Stop: 16383,
}
range3 = SlotRange{
Start: 1,
Stop: 8192,
}
range4 := SlotRange{
Start: 8194,
Stop: 16383,
}
newSlot = RemoveSlotRanges([]SlotRange{range1, range2}, []SlotRange{range3, range4})
assert.Equal(t, 0, newSlot[0].Start)
assert.Equal(t, 0, newSlot[0].Stop)
assert.Equal(t, 8193, newSlot[1].Start)
assert.Equal(t, 8193, newSlot[1].Stop)
slotRanges = RemoveSlotFromSlotRanges(slotRanges, 21)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 1, Stop: 20}, slotRanges[0])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 20)
require.Equal(t, 3, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 1, Stop: 19}, slotRanges[0])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 150)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 101, Stop: 149}, slotRanges[1])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 101)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 102, Stop: 149}, slotRanges[1])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 199)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 151, Stop: 198}, slotRanges[2])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 300)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 201, Stop: 299}, slotRanges[3])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 298)
require.Equal(t, 5, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
require.EqualValues(t, SlotRange{Start: 299, Stop: 299}, slotRanges[4])

slotRanges = RemoveSlotFromSlotRanges(slotRanges, 299)
require.Equal(t, 4, len(slotRanges))
require.EqualValues(t, SlotRange{Start: 201, Stop: 297}, slotRanges[3])
}

func TestSlotRange_SpiltSlotRange(t *testing.T) {
slots := SpiltSlotRange(5)
func TestCalculateSlotRanges(t *testing.T) {
slots := CalculateSlotRanges(5)
assert.Equal(t, 0, slots[0].Start)
assert.Equal(t, 3275, slots[0].Stop)
assert.Equal(t, 13104, slots[4].Start)
Expand Down