Skip to content

Commit fbe5aa3

Browse files
committed
Final touch
1 parent 182bb3a commit fbe5aa3

22 files changed

+444
-219
lines changed

pkg/deployment/agency/generator_collection_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package agency
2222

2323
type CollectionGeneratorInterface interface {
2424
WithWriteConcern(wc int) CollectionGeneratorInterface
25+
WithReplicationFactor(rf int) CollectionGeneratorInterface
2526
WithShard() ShardGeneratorInterface
2627
Add() DatabaseGeneratorInterface
2728
}
@@ -31,6 +32,7 @@ type collectionGenerator struct {
3132
col string
3233

3334
wc *int
35+
rf *int
3436
shards map[int]shardGenerator
3537
}
3638

@@ -55,3 +57,8 @@ func (c collectionGenerator) WithWriteConcern(wc int) CollectionGeneratorInterfa
5557
c.wc = &wc
5658
return c
5759
}
60+
61+
func (c collectionGenerator) WithReplicationFactor(rf int) CollectionGeneratorInterface {
62+
c.rf = &rf
63+
return c
64+
}

pkg/deployment/agency/generator_database_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,10 @@ func (d databaseGenerator) Add() StateGenerator {
9494
}
9595

9696
planCol := StatePlanCollection{
97-
Name: util.NewString(col),
98-
Shards: planShards,
99-
WriteConcern: colDet.wc,
97+
Name: util.NewString(col),
98+
Shards: planShards,
99+
WriteConcern: colDet.wc,
100+
ReplicationFactor: colDet.rf,
100101
}
101102

102103
plan[col] = planCol

pkg/deployment/agency/plan_collections.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ func (a StatePlanDBCollections) IsDBServerInCollections(name string) bool {
4242
return false
4343
}
4444

45+
func (a StatePlanDBCollections) CountShards() int {
46+
count := 0
47+
48+
for _, d := range a {
49+
count += len(d.Shards)
50+
}
51+
52+
return count
53+
}
54+
4555
type StatePlanCollection struct {
4656
Name *string `json:"name"`
4757
Shards Shards `json:"shards"`
@@ -63,9 +73,9 @@ func (a *StatePlanCollection) GetReplicationFactor(shard string) int {
6373
return l
6474
} else {
6575
if v := *z; v > l {
66-
return l
67-
} else {
6876
return v
77+
} else {
78+
return l
6979
}
7080
}
7181
}
@@ -98,7 +108,11 @@ func (a StatePlanCollection) GetName(d string) string {
98108
return *a.Name
99109
}
100110

101-
func (a StatePlanCollection) IsDBServerInShards(name string) bool {
111+
func (a *StatePlanCollection) IsDBServerInShards(name string) bool {
112+
if a == nil {
113+
return false
114+
}
115+
102116
for _, planShards := range a.Shards {
103117
if planShards.Contains(name) {
104118
return true

pkg/deployment/agency/shards.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,15 @@ func (s ShardServers) Contains(server string) bool {
3333

3434
return false
3535
}
36+
37+
func (s ShardServers) FilterBy(b ShardServers) ShardServers {
38+
q := make(ShardServers, 0, len(s))
39+
40+
for _, i := range s {
41+
if b.Contains(i) {
42+
q = append(q, i)
43+
}
44+
}
45+
46+
return q
47+
}

pkg/deployment/agency/state.go

Lines changed: 126 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -120,64 +120,151 @@ func (d *StateExists) UnmarshalJSON(bytes []byte) error {
120120
return nil
121121
}
122122

123-
type CollectionIterator func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers)
123+
func (s State) CountShards() int {
124+
count := 0
125+
126+
for _, collections := range s.Plan.Collections {
127+
count += collections.CountShards()
128+
}
129+
130+
return count
131+
}
132+
133+
func (s State) PlanServers() []string {
134+
q := map[string]bool{}
135+
136+
for _, db := range s.Plan.Collections {
137+
for _, col := range db {
138+
for _, shards := range col.Shards {
139+
for _, shard := range shards {
140+
q[shard] = true
141+
}
142+
}
143+
}
144+
}
145+
146+
r := make([]string, 0, len(q))
147+
148+
for k := range q {
149+
r = append(r, k)
150+
}
151+
152+
return r
153+
}
154+
155+
type CollectionShardDetails []CollectionShardDetail
156+
157+
type CollectionShardDetail struct {
158+
Database string
159+
Collection string
160+
Shard string
161+
}
162+
163+
type StateShardFilter func(s State, db, col, shard string) bool
164+
165+
func NegateFilter(in StateShardFilter) StateShardFilter {
166+
return func(s State, db, col, shard string) bool {
167+
return !in(s, db, col, shard)
168+
}
169+
}
170+
171+
func (s State) Filter(f StateShardFilter) CollectionShardDetails {
172+
shards := make(CollectionShardDetails, s.CountShards())
173+
size := 0
124174

125-
func (s State) IterateOverCollections(i CollectionIterator) {
126175
for db, collections := range s.Plan.Collections {
127176
for collection, details := range collections {
128-
for shard, shardDetails := range details.Shards {
129-
if currShard, ok := s.Current.Collections[db][collection][shard]; ok {
130-
i(db, collection, &details, shard, shardDetails, currShard.Servers)
177+
for shard := range details.Shards {
178+
if f(s, db, collection, shard) {
179+
shards[size] = CollectionShardDetail{
180+
Database: db,
181+
Collection: collection,
182+
Shard: shard,
183+
}
184+
size++
131185
}
132186
}
133187
}
134188
}
189+
190+
if size == 0 {
191+
return nil
192+
}
193+
194+
return shards[0:size]
135195
}
136196

137-
func (s State) IsDBServerInSync(serverID string) bool {
138-
isInSync := true
139-
s.IterateOverCollections(func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers) {
140-
if plan.Contains(serverID) {
141-
serverIsNotInSync := !current.Contains(serverID)
142-
wc := planCollection.GetWriteConcern(1)
143-
if serverIsNotInSync || wc > len(current) {
144-
isInSync = false
145-
}
197+
func GetDBServerBlockingRestartShards(s State, serverID string) CollectionShardDetails {
198+
return s.Filter(FilterDBServerShardRestart(serverID))
199+
}
200+
201+
func FilterDBServerShardRestart(serverID string) StateShardFilter {
202+
return NegateFilter(func(s State, db, col, shard string) bool {
203+
// Filter all shards which are not blocking restart of server
204+
plan := s.Plan.Collections[db][col]
205+
planShard := plan.Shards[shard]
206+
207+
if !planShard.Contains(serverID) {
208+
// This DBServer is not even in plan, restart possible
209+
return true
210+
}
211+
212+
current := s.Current.Collections[db][col][shard]
213+
currentShard := current.Servers.FilterBy(planShard)
214+
215+
serverInSync := currentShard.Contains(serverID)
216+
217+
if len(planShard) == 1 && serverInSync {
218+
// The requested server is the only one in the plan, restart possible
219+
return true
220+
}
221+
222+
// If WriteConcern equals replicationFactor then downtime is always there
223+
wc := plan.GetWriteConcern(1)
224+
if rf := plan.GetReplicationFactor(shard); wc >= rf {
225+
wc = rf - 1
226+
}
227+
228+
if len(currentShard) >= wc && !serverInSync {
229+
// Current shard is not in sync, but it does not matter - we have enough replicas in sync
230+
// Restart of this DBServer won't affect WC
231+
return true
146232
}
233+
234+
if len(currentShard) > wc {
235+
// We are in plan, but restart is possible
236+
return true
237+
}
238+
239+
// If we restart this server, write concern won't be satisfied
240+
return false
147241
})
242+
}
148243

149-
return isInSync
244+
func GetDBServerShardsNotInSync(s State, serverID string) CollectionShardDetails {
245+
return s.Filter(FilterDBServerShardsNotInSync(serverID))
150246
}
151247

152-
func (s State) IsDBServerReadyToRestart(serverID string) bool {
153-
readyToRestart := true
248+
func FilterDBServerShardsNotInSync(serverID string) StateShardFilter {
249+
return NegateFilter(func(s State, db, col, shard string) bool {
250+
planShard := s.Plan.Collections[db][col].Shards[shard]
154251

155-
s.IterateOverCollections(func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers) {
156-
if plan.Contains(serverID) {
157-
serverInSync := current.Contains(serverID)
158-
if len(plan) == 1 && serverInSync {
159-
// The requested server is the only one in the plan
160-
return
161-
}
252+
if serverID != "*" && !planShard.Contains(serverID) {
253+
return true
254+
}
162255

163-
// If WriteConcern equals replicationFactor then downtime is always there
164-
wc := planCollection.GetWriteConcern(1)
165-
if rf := planCollection.GetReplicationFactor(shard); wc >= rf {
166-
wc = rf - 1
167-
}
256+
currentShard := s.Current.Collections[db][col][shard]
168257

169-
if len(current) >= wc && !serverInSync {
170-
// Current shard is not in sync, but it does not matter - we have enough replicas in sync
171-
// Restart of this DBServer won't affect WC
172-
return
173-
}
258+
if len(planShard) != len(currentShard.Servers) {
259+
return false
260+
}
174261

175-
if len(current) <= wc {
176-
// If we restart this server, write concern won't be satisfied
177-
readyToRestart = false
262+
for _, s := range planShard {
263+
if !currentShard.Servers.Contains(s) {
264+
return false
178265
}
179266
}
180-
})
181267

182-
return readyToRestart
268+
return true
269+
})
183270
}

0 commit comments

Comments
 (0)