Skip to content

Commit b159fa0

Browse files
committed
Final touch
1 parent e5b6b65 commit b159fa0

24 files changed

+461
-355
lines changed

pkg/deployment/agency/generator_collection_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package agency
2222

2323
type CollectionGeneratorInterface interface {
2424
WithWriteConcern(wc int) CollectionGeneratorInterface
25+
WithReplicationFactor(rf int) CollectionGeneratorInterface
2526
WithShard() ShardGeneratorInterface
2627
Add() DatabaseGeneratorInterface
2728
}
@@ -31,6 +32,7 @@ type collectionGenerator struct {
3132
col string
3233

3334
wc *int
35+
rf *int
3436
shards map[int]shardGenerator
3537
}
3638

@@ -55,3 +57,8 @@ func (c collectionGenerator) WithWriteConcern(wc int) CollectionGeneratorInterfa
5557
c.wc = &wc
5658
return c
5759
}
60+
61+
func (c collectionGenerator) WithReplicationFactor(rf int) CollectionGeneratorInterface {
62+
c.rf = &rf
63+
return c
64+
}

pkg/deployment/agency/generator_database_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,10 @@ func (d databaseGenerator) Add() StateGenerator {
9494
}
9595

9696
planCol := StatePlanCollection{
97-
Name: util.NewString(col),
98-
Shards: planShards,
99-
WriteConcern: colDet.wc,
97+
Name: util.NewString(col),
98+
Shards: planShards,
99+
WriteConcern: colDet.wc,
100+
ReplicationFactor: colDet.rf,
100101
}
101102

102103
plan[col] = planCol

pkg/deployment/agency/plan_collections.go

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,42 @@ func (a StatePlanDBCollections) IsDBServerInCollections(name string) bool {
4242
return false
4343
}
4444

45+
func (a StatePlanDBCollections) CountShards() int {
46+
count := 0
47+
48+
for _, d := range a {
49+
count += len(d.Shards)
50+
}
51+
52+
return count
53+
}
54+
4555
type StatePlanCollection struct {
4656
Name *string `json:"name"`
4757
Shards Shards `json:"shards"`
4858
// deprecated
4959
// MinReplicationFactor is deprecated, but we have to support it for backward compatibility
5060
MinReplicationFactor *int `json:"minReplicationFactor,omitempty"`
5161
WriteConcern *int `json:"writeConcern,omitempty"`
62+
ReplicationFactor *int `json:"replicationFactor,omitempty"`
63+
}
64+
65+
func (a *StatePlanCollection) GetReplicationFactor(shard string) int {
66+
if a == nil {
67+
return 0
68+
}
69+
70+
l := len(a.Shards[shard])
71+
72+
if z := a.ReplicationFactor; z == nil {
73+
return l
74+
} else {
75+
if v := *z; v > l {
76+
return v
77+
} else {
78+
return l
79+
}
80+
}
5281
}
5382

5483
func (a *StatePlanCollection) GetWriteConcern(def int) int {
@@ -79,7 +108,11 @@ func (a StatePlanCollection) GetName(d string) string {
79108
return *a.Name
80109
}
81110

82-
func (a StatePlanCollection) IsDBServerInShards(name string) bool {
111+
func (a *StatePlanCollection) IsDBServerInShards(name string) bool {
112+
if a == nil {
113+
return false
114+
}
115+
83116
for _, planShards := range a.Shards {
84117
if planShards.Contains(name) {
85118
return true

pkg/deployment/agency/shards.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,15 @@ func (s ShardServers) Contains(server string) bool {
3333

3434
return false
3535
}
36+
37+
func (s ShardServers) FilterBy(b ShardServers) ShardServers {
38+
q := make(ShardServers, 0, len(s))
39+
40+
for _, i := range s {
41+
if b.Contains(i) {
42+
q = append(q, i)
43+
}
44+
}
45+
46+
return q
47+
}

pkg/deployment/agency/state.go

Lines changed: 117 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -120,75 +120,151 @@ func (d *StateExists) UnmarshalJSON(bytes []byte) error {
120120
return nil
121121
}
122122

123-
type CollectionIterator func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers) (skipRest bool)
123+
func (s State) CountShards() int {
124+
count := 0
125+
126+
for _, collections := range s.Plan.Collections {
127+
count += collections.CountShards()
128+
}
129+
130+
return count
131+
}
132+
133+
func (s State) PlanServers() []string {
134+
q := map[string]bool{}
135+
136+
for _, db := range s.Plan.Collections {
137+
for _, col := range db {
138+
for _, shards := range col.Shards {
139+
for _, shard := range shards {
140+
q[shard] = true
141+
}
142+
}
143+
}
144+
}
145+
146+
r := make([]string, 0, len(q))
147+
148+
for k := range q {
149+
r = append(r, k)
150+
}
151+
152+
return r
153+
}
154+
155+
type CollectionShardDetails []CollectionShardDetail
156+
157+
type CollectionShardDetail struct {
158+
Database string
159+
Collection string
160+
Shard string
161+
}
162+
163+
type StateShardFilter func(s State, db, col, shard string) bool
164+
165+
func NegateFilter(in StateShardFilter) StateShardFilter {
166+
return func(s State, db, col, shard string) bool {
167+
return !in(s, db, col, shard)
168+
}
169+
}
170+
171+
func (s State) Filter(f StateShardFilter) CollectionShardDetails {
172+
shards := make(CollectionShardDetails, s.CountShards())
173+
size := 0
124174

125-
func (s State) IterateOverCollections(i CollectionIterator) {
126-
dbsLoop:
127175
for db, collections := range s.Plan.Collections {
128176
for collection, details := range collections {
129-
for shard, shardDetails := range details.Shards {
130-
if currShard, ok := s.Current.Collections[db][collection][shard]; ok {
131-
if skipRest := i(db, collection, &details, shard, shardDetails, currShard.Servers); skipRest {
132-
break dbsLoop
177+
for shard := range details.Shards {
178+
if f(s, db, collection, shard) {
179+
shards[size] = CollectionShardDetail{
180+
Database: db,
181+
Collection: collection,
182+
Shard: shard,
133183
}
184+
size++
134185
}
135186
}
136187
}
137188
}
138-
}
139189

140-
func (s State) IsDBServerInSync(serverID string) bool {
141-
isInSync := true
142-
s.IterateOverCollections(func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers) bool {
143-
if !plan.Contains(serverID) {
144-
return false
145-
}
190+
if size == 0 {
191+
return nil
192+
}
146193

147-
serverIsNotInSync := !current.Contains(serverID)
148-
wc := planCollection.GetWriteConcern(1)
149-
if serverIsNotInSync || wc > len(current) {
150-
isInSync = false
151-
return true
152-
}
153-
return false
154-
})
194+
return shards[0:size]
195+
}
155196

156-
return isInSync
197+
func GetDBServerBlockingRestartShards(s State, serverID string) CollectionShardDetails {
198+
return s.Filter(FilterDBServerShardRestart(serverID))
157199
}
158200

159-
func (s State) IsDBServerReadyToRestart(serverID string) bool {
160-
readyToRestart := true
201+
func FilterDBServerShardRestart(serverID string) StateShardFilter {
202+
return NegateFilter(func(s State, db, col, shard string) bool {
203+
// Filter all shards which are not blocking restart of server
204+
plan := s.Plan.Collections[db][col]
205+
planShard := plan.Shards[shard]
161206

162-
s.IterateOverCollections(func(db, col string, planCollection *StatePlanCollection, shard string, plan ShardServers, current ShardServers) bool {
163-
if !plan.Contains(serverID) {
164-
return false
207+
if !planShard.Contains(serverID) {
208+
// This DBServer is not even in plan, restart possible
209+
return true
165210
}
166211

167-
serverInSync := current.Contains(serverID)
168-
if len(plan) == 1 && serverInSync {
169-
// The requested server is the only one in the plan
170-
return false
212+
current := s.Current.Collections[db][col][shard]
213+
currentShard := current.Servers.FilterBy(planShard)
214+
215+
serverInSync := currentShard.Contains(serverID)
216+
217+
if len(planShard) == 1 && serverInSync {
218+
// The requested server is the only one in the plan, restart possible
219+
return true
171220
}
172221

173-
wc := planCollection.GetWriteConcern(1)
174-
if wc >= len(plan) && len(plan) != 0 {
175-
wc = len(plan) - 1
222+
// If WriteConcern equals replicationFactor then downtime is always there
223+
wc := plan.GetWriteConcern(1)
224+
if rf := plan.GetReplicationFactor(shard); wc >= rf {
225+
wc = rf - 1
176226
}
177227

178-
if len(current) >= wc && !serverInSync {
228+
if len(currentShard) >= wc && !serverInSync {
179229
// Current shard is not in sync, but it does not matter - we have enough replicas in sync
180230
// Restart of this DBServer won't affect WC
181-
return false
231+
return true
182232
}
183233

184-
if len(current) <= wc {
185-
// If we restart this server, write concern won't be satisfied
186-
readyToRestart = false
234+
if len(currentShard) > wc {
235+
// We are in plan, but restart is possible
187236
return true
188237
}
189238

239+
// If we restart this server, write concern won't be satisfied
190240
return false
191241
})
242+
}
192243

193-
return readyToRestart
244+
func GetDBServerShardsNotInSync(s State, serverID string) CollectionShardDetails {
245+
return s.Filter(FilterDBServerShardsNotInSync(serverID))
246+
}
247+
248+
func FilterDBServerShardsNotInSync(serverID string) StateShardFilter {
249+
return NegateFilter(func(s State, db, col, shard string) bool {
250+
planShard := s.Plan.Collections[db][col].Shards[shard]
251+
252+
if serverID != "*" && !planShard.Contains(serverID) {
253+
return true
254+
}
255+
256+
currentShard := s.Current.Collections[db][col][shard]
257+
258+
if len(planShard) != len(currentShard.Servers) {
259+
return false
260+
}
261+
262+
for _, s := range planShard {
263+
if !currentShard.Servers.Contains(s) {
264+
return false
265+
}
266+
}
267+
268+
return true
269+
})
194270
}

0 commit comments

Comments
 (0)