-
Notifications
You must be signed in to change notification settings - Fork 30
/
cluster_shard_follow.go
101 lines (83 loc) · 2.35 KB
/
cluster_shard_follow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package topology
import "fmt"
// ClusterShard has the tuple of server id and shard id in a cluster.
type ClusterShard struct {
ShardId int
ServerId int
}
func (shard ClusterShard) String() string {
return fmt.Sprintf("%d.%d", shard.ServerId, shard.ShardId)
}
// PeerShards list peer shards that are on other cluster nodes
func PeerShards(selfServerId int, selfShardId int, clusterSize int, replicationFactor int) (peers []ClusterShard) {
if selfShardId >= clusterSize {
return
}
for i := 0; i < replicationFactor && i < clusterSize; i++ {
serverId := selfShardId + i
if serverId >= clusterSize {
serverId -= clusterSize
}
if serverId == selfServerId {
continue
}
peers = append(peers, ClusterShard{
ShardId: selfShardId,
ServerId: serverId,
})
}
return
}
// PartitionShards list shards that belongs to the same partition
func PartitionShards(selfServerId int, selfShardId int, clusterSize int, replicationFactor int) (shards []ClusterShard) {
if selfShardId >= clusterSize {
return
}
for i := 0; i < replicationFactor && i < clusterSize; i++ {
serverId := selfShardId + i
if serverId >= clusterSize {
serverId -= clusterSize
}
shards = append(shards, ClusterShard{
ShardId: selfShardId,
ServerId: serverId,
})
}
return
}
// LocalShards list shards that local node should have
func LocalShards(selfServerId int, clusterSize int, replicationFactor int) (shards []ClusterShard) {
if selfServerId >= clusterSize {
return
}
for i := 0; i < replicationFactor && i < clusterSize; i++ {
shardId := selfServerId - i
if shardId < 0 {
shardId += clusterSize
}
shards = append(shards, ClusterShard{
ShardId: shardId,
ServerId: selfServerId,
})
}
return
}
// IsShardInLocal returns true if the tuple if shard should be on current server
func IsShardInLocal(shardId int, selfServerId int, clusterSize int, replicationFactor int) bool {
shards := LocalShards(selfServerId, clusterSize, replicationFactor)
for _, shard := range shards {
if shardId == shard.ShardId {
return true
}
}
return false
}
// ShardListContains check whether shards contains one target shard
func ShardListContains(shards []ClusterShard, targetShard ClusterShard) bool {
for _, shard := range shards {
if shard.ShardId == targetShard.ShardId && shard.ServerId == targetShard.ServerId {
return true
}
}
return false
}