-
Notifications
You must be signed in to change notification settings - Fork 36
/
rebalancer.go
1859 lines (1525 loc) · 56.7 KB
/
rebalancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// @copyright 2016-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package indexer
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/couchbase/cbauth/metakv"
"github.com/couchbase/cbauth/service"
"github.com/couchbase/indexing/secondary/common"
c "github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
l "github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/manager"
"github.com/couchbase/indexing/secondary/manager/client"
"github.com/couchbase/indexing/secondary/planner"
)
type DoneCallback func(err error, cancel <-chan struct{})
type ProgressCallback func(progress float64, cancel <-chan struct{})
type Callbacks struct {
progress ProgressCallback
done DoneCallback
}
type Rebalancer struct {
// Transfer token maps from ttid to token
transferTokens map[string]*c.TransferToken // all TTs for this rebalance
acceptedTokens map[string]*c.TransferToken // accepted TTs
sourceTokens map[string]*c.TransferToken // TTs for which a source index exists (move case)
mu sync.RWMutex // for transferTokens, acceptedTokens, sourceTokens, currBatchTokens
currBatchTokens []string // ttids of all TTs in current batch
transferTokenBatches [][]string // slice of all TT batches (2nd dimension is ttid)
drop map[string]bool
pendingBuild int32
dropQueue chan string
rebalToken *RebalanceToken
nodeId string
master bool
cb Callbacks
cancel chan struct{} // closed to signal rebalance canceled
done chan struct{} // closed to signal rebalance done (not canceled)
isDone int32
metakvCancel chan struct{} // close this to end metakv.RunObserveChildren callbacks
muCleanup sync.RWMutex // for metakvCancel
supvMsgch MsgChannel
localaddr string
wg sync.WaitGroup
cleanupOnce sync.Once
waitForTokenPublish chan struct{}
retErr error
config c.ConfigHolder
lastKnownProgress map[c.IndexInstId]float64
change *service.TopologyChange
runPlanner bool // should this rebalance run the planner?
runParam *runParams
}
// NewRebalancer creates the Rebalancer object that will master a rebalance and starts
// go routines that perform it asynchronously. If this is a worker node it launches
// an observeRebalance go routine that does the token processing for this node. If this
// is the master that will be launched later by initRebalAsync --> doRebalance.
func NewRebalancer(transferTokens map[string]*c.TransferToken, rebalToken *RebalanceToken,
nodeId string, master bool, progress ProgressCallback, done DoneCallback,
supvMsgch MsgChannel, localaddr string, config c.Config, change *service.TopologyChange,
runPlanner bool, runParam *runParams) *Rebalancer {
l.Infof("NewRebalancer nodeId %v rebalToken %v master %v localaddr %v runPlanner %v runParam %v", nodeId,
rebalToken, master, localaddr, runPlanner, runParam)
r := &Rebalancer{
transferTokens: transferTokens,
rebalToken: rebalToken,
master: master,
nodeId: nodeId,
cb: Callbacks{progress, done},
cancel: make(chan struct{}),
done: make(chan struct{}),
metakvCancel: make(chan struct{}),
supvMsgch: supvMsgch,
acceptedTokens: make(map[string]*c.TransferToken),
sourceTokens: make(map[string]*c.TransferToken),
drop: make(map[string]bool),
dropQueue: make(chan string, 10000),
localaddr: localaddr,
waitForTokenPublish: make(chan struct{}),
lastKnownProgress: make(map[c.IndexInstId]float64),
change: change,
runPlanner: runPlanner,
transferTokenBatches: make([][]string, 0),
runParam: runParam,
}
r.config.Store(config)
if master {
go r.initRebalAsync()
} else {
close(r.waitForTokenPublish)
go r.observeRebalance()
}
go r.processDropIndexQueue()
return r
}
// initRebalAsync runs as a helper go routine for NewRebalancer on the rebalance
// master. It calls the planner if needed, then launches a separate go routine,
// doRebalance, to manage the fine-grained steps of the rebalance.
func (r *Rebalancer) initRebalAsync() {
//short circuit
if len(r.transferTokens) == 0 && !r.runPlanner {
r.cb.progress(1.0, r.cancel)
r.finish(nil)
return
}
// Launch the progress updater goroutine
if r.cb.progress != nil {
go r.updateProgress()
}
if r.runPlanner {
cfg := r.config.Load()
loop:
for {
select {
case <-r.cancel:
l.Infof("Rebalancer::initRebalAsync Cancel Received")
return
case <-r.done:
l.Infof("Rebalancer::initRebalAsync Done Received")
return
default:
allWarmedup, _ := checkAllIndexersWarmedup(cfg["clusterAddr"].String())
if allWarmedup {
topology, err := getGlobalTopology(r.localaddr)
if err != nil {
l.Errorf("Rebalancer::initRebalAsync Error Fetching Topology %v", err)
go r.finish(err)
return
}
l.Infof("Rebalancer::initRebalAsync Global Topology %v", topology)
onEjectOnly := cfg["rebalance.node_eject_only"].Bool()
optimizePlacement := cfg["settings.rebalance.redistribute_indexes"].Bool()
disableReplicaRepair := cfg["rebalance.disable_replica_repair"].Bool()
timeout := cfg["planner.timeout"].Int()
threshold := cfg["planner.variationThreshold"].Float64()
cpuProfile := cfg["planner.cpuProfile"].Bool()
minIterPerTemp := cfg["planner.internal.minIterPerTemp"].Int()
maxIterPerTemp := cfg["planner.internal.maxIterPerTemp"].Int()
//user setting redistribute_indexes overrides the internal setting
//onEjectOnly. onEjectOnly is not expected to be used in production
//as this is not documented.
if optimizePlacement {
onEjectOnly = false
} else {
onEjectOnly = true
}
start := time.Now()
r.transferTokens, err = planner.ExecuteRebalance(cfg["clusterAddr"].String(), *r.change,
r.nodeId, onEjectOnly, disableReplicaRepair, threshold, timeout, cpuProfile,
minIterPerTemp, maxIterPerTemp)
if err != nil {
l.Errorf("Rebalancer::initRebalAsync Planner Error %v", err)
go r.finish(err)
return
}
if len(r.transferTokens) == 0 {
r.transferTokens = nil
}
elapsed := time.Since(start)
l.Infof("Rebalancer::initRebalAsync Planner Time Taken %v", elapsed)
break loop
}
}
l.Errorf("Rebalancer::initRebalAsync All Indexers Not Active. Waiting...")
time.Sleep(5 * time.Second)
}
}
go r.doRebalance()
}
// Cancel cancels a currently running rebalance or failover and waits
// for its go routines to finish.
func (r *Rebalancer) Cancel() {
l.Infof("Rebalancer::Cancel Exiting")
r.cancelMetakv()
close(r.cancel)
r.wg.Wait()
}
func (r *Rebalancer) finish(err error) {
if err == nil && r.master && r.change != nil {
// Note that this function tansfers the ownership of only those
// tokens, which are not owned by keep nodes. Ownership of other
// tokens remains unchanged.
keepNodes := make(map[string]bool)
for _, node := range r.change.KeepNodes {
keepNodes[string(node.NodeInfo.NodeID)] = true
}
cfg := r.config.Load()
// Suppress error if any. The background task to handle failover
// will do necessary retry for transferring ownership.
_ = transferScheduleTokens(keepNodes, cfg["clusterAddr"].String())
}
r.retErr = err
r.cleanupOnce.Do(r.doFinish)
}
func (r *Rebalancer) doFinish() {
l.Infof("Rebalancer::doFinish Cleanup %v", r.retErr)
atomic.StoreInt32(&r.isDone, 1)
close(r.done)
r.cancelMetakv()
r.wg.Wait()
r.cb.done(r.retErr, r.cancel)
}
func (r *Rebalancer) isFinish() bool {
return atomic.LoadInt32(&r.isDone) == 1
}
// cancelMetakv closes the metakvCancel channel, thus terminating metakv's
// transfer token callback loop that was initiated by metakv.RunObserveChildren.
// Must be done when rebalance finishes (successful or not).
func (r *Rebalancer) cancelMetakv() {
r.muCleanup.Lock()
defer r.muCleanup.Unlock()
if r.metakvCancel != nil {
close(r.metakvCancel)
r.metakvCancel = nil
}
}
// addToWaitGroup adds caller to r.wg waitgroup and returns true if metakv.RunObserveChildren
// callbacks are active, else it does nothing and returns false.
func (r *Rebalancer) addToWaitGroup() bool {
r.muCleanup.Lock()
defer r.muCleanup.Unlock()
if r.metakvCancel != nil {
r.wg.Add(1)
return true
} else {
return false
}
}
// doRebalance runs as a go routine helper to initRebalanceAsync that manages
// the fine-grained steps of a rebalance. It creates the transfer token batches
// and publishes the first one. (If more than one batch exists the others will
// be published later by processTokenAsMaster.) Then it launches an
// observeRebalance go routine that does the real rebalance work for the master.
func (r *Rebalancer) doRebalance() {
if r.transferTokens != nil {
if ddl, err := r.checkDDLRunning(); ddl {
r.finish(err)
return
}
select {
case <-r.cancel:
l.Infof("Rebalancer::doRebalance Cancel Received. Skip Publishing Tokens.")
return
default:
r.createTransferBatches()
r.publishTransferTokenBatch()
close(r.waitForTokenPublish)
go r.observeRebalance()
}
} else {
r.cb.progress(1.0, r.cancel)
r.finish(nil)
return
}
}
// createTransferBatches is a helper for doRebalance (master node only) that creates
// all the transfer token batches.
func (r *Rebalancer) createTransferBatches() {
cfg := r.config.Load()
batchSize := cfg["rebalance.transferBatchSize"].Int()
var batch []string
for ttid, _ := range r.transferTokens {
if batch == nil {
batch = make([]string, 0, batchSize)
}
batch = append(batch, ttid)
if len(batch) == batchSize {
r.transferTokenBatches = append(r.transferTokenBatches, batch)
batch = nil
}
}
if len(batch) != 0 {
r.transferTokenBatches = append(r.transferTokenBatches, batch)
}
l.Infof("Rebalancer::createTransferBatches Transfer Batches %v", r.transferTokenBatches)
}
// publishTransferTokenBatch publishes the next batch of transfer tokens into
// metakv so they will start being processed. For batches other than the first,
// it is only called once the prior batch completes.
func (r *Rebalancer) publishTransferTokenBatch() {
r.mu.Lock()
defer r.mu.Unlock()
r.currBatchTokens = r.transferTokenBatches[0]
r.transferTokenBatches = r.transferTokenBatches[1:]
l.Infof("Rebalancer::publishTransferTokenBatch Registered Transfer Token In Metakv %v", r.currBatchTokens)
for _, ttid := range r.currBatchTokens {
setTransferTokenInMetakv(ttid, r.transferTokens[ttid])
}
}
// observeRebalance runs as a go routine on both master and worker nodes of a rebalance.
// It registers the processTokens function as a callback in metakv on the rebalance
// transfer token (TT) directory. Metakv will call the callback function on each TT and
// on each mutation of a TT until an error occurs or its stop channel is closed. These
// callbacks trigger the individual index movement steps of the rebalance.
func (r *Rebalancer) observeRebalance() {
l.Infof("Rebalancer::observeRebalance %v master:%v", r.rebalToken, r.master)
<-r.waitForTokenPublish
err := metakv.RunObserveChildrenV2(RebalanceMetakvDir, r.processTokens, r.metakvCancel)
if err != nil {
l.Infof("Rebalancer::observeRebalance Exiting On Metakv Error %v", err)
r.finish(err)
}
l.Infof("Rebalancer::observeRebalance exiting err %v", err)
}
// processTokens is the callback registered on the metakv transfer token directory for
// a rebalance and gets called by metakv for each token and each change to a token.
// This decodes the token and hands it off to processTransferToken.
func (r *Rebalancer) processTokens(kve metakv.KVEntry) error {
if kve.Path == RebalanceTokenPath || kve.Path == MoveIndexTokenPath {
l.Infof("Rebalancer::processTokens RebalanceToken %v %v", kve.Path, kve.Value)
if kve.Value == nil {
l.Infof("Rebalancer::processTokens Rebalance Token Deleted. Mark Done.")
r.cancelMetakv()
r.finish(nil)
}
} else if strings.Contains(kve.Path, TransferTokenTag) {
if kve.Value != nil {
ttid, tt, err := r.decodeTransferToken(kve.Path, kve.Value)
if err != nil {
l.Errorf("Rebalancer::processTokens Unable to decode transfer token. Ignored")
return nil
}
r.processTransferToken(ttid, tt)
} else {
l.Infof("Rebalancer::processTokens Received empty or deleted transfer token %v", kve.Path)
}
}
return nil
}
// processTransferTokens performs the work needed from this node, if any, for a
// transfer token. The work is split into three helpers for work specific to
// 1) the master (non-movement bookkeeping, including publishing the next token
// batch when the prior one completes), 2) source of an index move, 3) destination
// of a move.
func (r *Rebalancer) processTransferToken(ttid string, tt *c.TransferToken) {
if !r.addToWaitGroup() {
return
}
defer r.wg.Done()
if ddl, err := r.checkDDLRunning(); ddl {
r.setTransferTokenError(ttid, tt, err.Error())
return
}
// "processed" var ensures only the incoming token state gets processed by this
// call, as metakv will call parent processTokens again for each TT state change.
var processed bool
if tt.MasterId == r.nodeId {
processed = r.processTokenAsMaster(ttid, tt)
}
if tt.SourceId == r.nodeId && !processed {
processed = r.processTokenAsSource(ttid, tt)
}
if tt.DestId == r.nodeId && !processed {
processed = r.processTokenAsDest(ttid, tt)
}
}
// processTokenAsSource performs the work of the source node of an index move
// reflected by the transfer token, which is to queue up the source index drops
// for later processing. It handles only TT state TransferTokenReady. Returns
// true iff it is considered to have processed this token (including handling
// some error cases).
func (r *Rebalancer) processTokenAsSource(ttid string, tt *c.TransferToken) bool {
if tt.RebalId != r.rebalToken.RebalId {
l.Warnf("Rebalancer::processTokenAsSource Found TransferToken with Unknown "+
"RebalanceId. Local RId %v Token %v. Ignored.", r.rebalToken.RebalId, tt)
return true
}
switch tt.State {
case c.TransferTokenReady:
if !r.checkValidNotifyStateSource(ttid, tt) {
return true
}
r.mu.Lock()
defer r.mu.Unlock()
r.sourceTokens[ttid] = tt
logging.Infof("Rebalancer::processTokenAsSource Processing transfer token: %v", tt)
//TODO batch this rather than one per index
r.queueDropIndex(ttid)
default:
return false
}
return true
}
func (r *Rebalancer) processDropIndexQueue() {
notifych := make(chan bool, 2)
first := true
cfg := r.config.Load()
waitTime := cfg["rebalance.drop_index.wait_time"].Int()
for {
select {
case <-r.cancel:
l.Infof("Rebalancer::processDropIndexQueue Cancel Received")
return
case <-r.done:
l.Infof("Rebalancer::processDropIndexQueue Done Received")
return
case ttid := <-r.dropQueue:
var tt c.TransferToken
logging.Infof("Rebalancer::processDropIndexQueue processing drop index request for ttid: %v", ttid)
if first {
// If it is the first drop, let wait to give a chance for the target's metaadta
// being synchronized with the cbq nodes. This is to ensure that the cbq nodes
// can direct scan to the target nodes, before we start dropping the index in the source.
time.Sleep(time.Duration(waitTime) * time.Second)
first = false
}
r.mu.Lock()
tt1, ok := r.sourceTokens[ttid]
if ok {
tt = *tt1
}
r.mu.Unlock()
if !ok {
l.Warnf("Rebalancer::processDropIndexQueue: Cannot find token %v in r.sourceTokens. Skip drop index.", ttid)
continue
}
if tt.State == c.TransferTokenReady {
if !r.drop[ttid] {
if r.addToWaitGroup() {
notifych <- true
go r.dropIndexWhenIdle(ttid, &tt, notifych)
} else {
logging.Warnf("Rebalancer::processDropIndexQueue Skip processing drop index request for tt: %v as rebalancer can not add to wait group", tt)
}
} else {
logging.Infof("Rebalancer::processDropIndexQueue: Skip processing tt: %v as it is already added to drop list: %v", tt, r.drop)
}
} else {
logging.Warnf("Rebalancer::processDropIndexQueue Skipping drop index request for tt: %v", tt)
}
}
}
}
//
// Must hold lock when calling this function
//
func (r *Rebalancer) queueDropIndex(ttid string) {
select {
case <-r.cancel:
l.Warnf("Rebalancer::queueDropIndex: Cannot drop index when rebalance being cancel.")
return
case <-r.done:
l.Warnf("Rebalancer::queueDropIndex: Cannot drop index when rebalance is done.")
return
default:
if r.checkIndexReadyToDrop() {
select {
case r.dropQueue <- ttid:
logging.Infof("Rebalancer::queueDropIndex Successfully queued index for drop, ttid: %v", ttid)
default:
tt := r.sourceTokens[ttid]
if tt.State == c.TransferTokenReady {
if !r.drop[ttid] {
if r.addToWaitGroup() {
go r.dropIndexWhenIdle(ttid, tt, nil)
} else {
logging.Warnf("Rebalancer::queueDropIndex Could not add to wait group, hence not attempting drop, ttid: %v", ttid)
}
} else {
logging.Infof("Rebalancer::queueDropIndex Did not queue drop index as index is already in drop list, ttid: %v, drop list: %v", ttid, r.drop)
}
} else {
logging.Infof("Rebalancer::queueDropIndex Did not queue index for drop as tt state is: %v, ttid: %v", tt.State, ttid)
}
}
} else {
logging.Warnf("Rebalancer::queueDropIndex Failed to queue index for drop, ttid: %v", ttid)
}
}
}
//
// Must hold lock when calling this function
//
func (r *Rebalancer) dropIndexWhenReady() {
if r.checkIndexReadyToDrop() {
for ttid, tt := range r.sourceTokens {
if tt.State == c.TransferTokenReady {
if !r.drop[ttid] {
r.queueDropIndex(ttid)
}
}
}
}
}
// isMissingBSC determines whether an error message is due to a bucket, scope,
// or collection not existing. These can be dropped after a TT referencing them
// was created, in which case we will set the TT forward to TransferTokenCommit
// state to abort the index move without failing the rebalance. In some cases
// the target error string will be contained inside brackets within a more
// detailed user-visible errMsg.
func isMissingBSC(errMsg string) bool {
return errMsg == common.ErrCollectionNotFound.Error() ||
errMsg == common.ErrScopeNotFound.Error() ||
errMsg == common.ErrBucketNotFound.Error() ||
strings.Contains(errMsg, "["+common.ErrCollectionNotFound.Error()+"]") ||
strings.Contains(errMsg, "["+common.ErrScopeNotFound.Error()+"]") ||
strings.Contains(errMsg, "["+common.ErrBucketNotFound.Error()+"]")
}
// isIndexNotFoundRebal checks whether a build error returned for rebalance is the
// special error ErrIndexNotFoundRebal indicating the key returned for this error
// is the originally submitted defnId instead of an instId because the metadata
// for defnId could not be found. This error should only be used for this purpose.
func isIndexNotFoundRebal(errMsg string) bool {
return errMsg == common.ErrIndexNotFoundRebal.Error()
}
// dropIndexWhenIdle performs the source index drop asynchronously. This is the last real
// action of an index move during rebalance; the remainder are token bookkeeping operations.
// Changes the TT state to TransferTokenCommit when the drop is completed.
func (r *Rebalancer) dropIndexWhenIdle(ttid string, tt *c.TransferToken, notifych chan bool) {
defer r.wg.Done()
defer func() {
if notifych != nil {
// Blocking wait to ensure indexes are dropped sequentially
<-notifych
}
}()
r.mu.Lock()
if r.drop[ttid] {
r.mu.Unlock()
return
}
r.drop[ttid] = true
r.mu.Unlock()
missingStatRetry := 0
loop:
for {
labelselect:
select {
case <-r.cancel:
l.Infof("Rebalancer::dropIndexWhenIdle Cancel Received")
break loop
case <-r.done:
l.Infof("Rebalancer::dropIndexWhenIdle Done Received")
break loop
default:
stats, err := getLocalStats(r.localaddr, true)
if err != nil {
l.Errorf("Rebalancer::dropIndexWhenIdle Error Fetching Local Stats %v %v", r.localaddr, err)
break
}
statsMap := stats.ToMap()
if statsMap == nil {
l.Infof("Rebalancer::dropIndexWhenIdle Nil Stats. Retrying...")
break
}
tt.IndexInst.Defn.SetCollectionDefaults()
pending := float64(0)
for _, partitionId := range tt.IndexInst.Defn.Partitions {
defn := tt.IndexInst.Defn
prefix := common.GetStatsPrefix(defn.Bucket, defn.Scope, defn.Collection,
defn.Name, tt.IndexInst.ReplicaId, int(partitionId), true)
sname_completed := common.GetIndexStatKey(prefix, "num_completed_requests")
sname_requests := common.GetIndexStatKey(prefix, "num_requests")
var num_completed, num_requests float64
if _, ok := statsMap[sname_completed]; ok {
num_completed = statsMap[sname_completed].(float64)
num_requests = statsMap[sname_requests].(float64)
} else {
l.Infof("Rebalancer::dropIndexWhenIdle Missing Stats %v %v. Retrying...", sname_completed, sname_requests)
missingStatRetry++
if missingStatRetry > 10 {
if r.needRetryForDrop(ttid, tt) {
break labelselect
} else {
break loop
}
}
break labelselect
}
pending += num_requests - num_completed
}
if pending > 0 {
l.Infof("Rebalancer::dropIndexWhenIdle Index %v:%v:%v:%v Pending Scan %v", tt.IndexInst.Defn.Bucket, tt.IndexInst.Defn.Scope, tt.IndexInst.Defn.Collection, tt.IndexInst.Defn.Name, pending)
break
}
defn := tt.IndexInst.Defn
defn.InstId = tt.InstId
defn.RealInstId = tt.RealInstId
req := manager.IndexRequest{Index: defn}
body, err := json.Marshal(&req)
if err != nil {
l.Errorf("Rebalancer::dropIndexWhenIdle Error marshal drop index %v", err)
r.setTransferTokenError(ttid, tt, err.Error())
return
}
bodybuf := bytes.NewBuffer(body)
url := "/dropIndex"
resp, err := postWithAuth(r.localaddr+url, "application/json", bodybuf)
if err != nil {
// Error from HTTP layer, not from index processing code
l.Errorf("Rebalancer::dropIndexWhenIdle Error drop index on %v %v", r.localaddr+url, err)
r.setTransferTokenError(ttid, tt, err.Error())
return
}
response := new(manager.IndexResponse)
if err := convertResponse(resp, response); err != nil {
l.Errorf("Rebalancer::dropIndexWhenIdle Error unmarshal response %v %v", r.localaddr+url, err)
r.setTransferTokenError(ttid, tt, err.Error())
return
}
if response.Code == manager.RESP_ERROR {
// Error from index processing code (e.g. the string from common.ErrCollectionNotFound)
if !isMissingBSC(response.Error) {
l.Errorf("Rebalancer::dropIndexWhenIdle Error dropping index %v %v", r.localaddr+url, response.Error)
r.setTransferTokenError(ttid, tt, response.Error)
return
}
// Ok: failed to drop source index because b/s/c was dropped. Continue to TransferTokenCommit state.
l.Infof("Rebalancer::dropIndexWhenIdle Source index already dropped due to bucket/scope/collection dropped. tt %v.", tt)
}
tt.State = c.TransferTokenCommit
setTransferTokenInMetakv(ttid, tt)
r.mu.Lock()
r.sourceTokens[ttid] = tt
r.mu.Unlock()
break loop
}
time.Sleep(5 * time.Second)
}
}
func (r *Rebalancer) needRetryForDrop(ttid string, tt *c.TransferToken) bool {
localMeta, err := getLocalMeta(r.localaddr)
if err != nil {
l.Errorf("Rebalancer::dropIndexWhenIdle Error Fetching Local Meta %v %v", r.localaddr, err)
return true
}
indexState, errStr := getIndexStatusFromMeta(tt, localMeta)
if errStr != "" {
l.Errorf("Rebalancer::dropIndexWhenIdle Error Fetching Index Status %v %v", r.localaddr, errStr)
return true
}
if indexState == c.INDEX_STATE_NIL {
//if index cannot be found in metadata, most likely its drop has already succeeded.
//instead of waiting indefinitely, it is better to assume success and proceed.
l.Infof("Rebalancer::dropIndexWhenIdle Missing Metadata for %v. Assume success and abort retry", tt.IndexInst)
tt.State = c.TransferTokenCommit
setTransferTokenInMetakv(ttid, tt)
r.mu.Lock()
r.sourceTokens[ttid] = tt
r.mu.Unlock()
return false
}
return true
}
// processTokenAsDest performs the work of the destination node of an index
// move reflected by the transfer token. It directly handles TT states
// TransferTokenCreated and TransferTokenInitiate, and indirectly (via token
// tokenMergeOrReady call here or in buildAcceptedIndexes --> waitForIndexBuild)
// states TransferTokenInProgress and TransferTokenMerge. Returns true iff it is
// considered to have processed this token (including handling some error cases).
func (r *Rebalancer) processTokenAsDest(ttid string, tt *c.TransferToken) bool {
if tt.RebalId != r.rebalToken.RebalId {
l.Warnf("Rebalancer::processTokenAsDest Found TransferToken with Unknown "+
"RebalanceId. Local RId %v Token %v. Ignored.", r.rebalToken.RebalId, tt)
return true
}
if !r.checkValidNotifyStateDest(ttid, tt) {
return true
}
switch tt.State {
case c.TransferTokenCreated:
indexDefn := tt.IndexInst.Defn
indexDefn.SetCollectionDefaults()
indexDefn.Nodes = nil
indexDefn.Deferred = true
indexDefn.InstId = tt.InstId
indexDefn.RealInstId = tt.RealInstId
getReqBody := func() (*bytes.Buffer, bool) {
ir := manager.IndexRequest{Index: indexDefn}
body, err := json.Marshal(&ir)
if err != nil {
l.Errorf("Rebalancer::processTokenAsDest Error marshal clone index %v", err)
r.setTransferTokenError(ttid, tt, err.Error())
return nil, true
}
bodybuf := bytes.NewBuffer(body)
return bodybuf, false
}
bodybuf, isErr := getReqBody()
if isErr {
return true
}
var resp *http.Response
var err error
url := "/createIndexRebalance"
resp, err = postWithAuth(r.localaddr+url, "application/json", bodybuf)
if err != nil {
// Error from HTTP layer, not from index processing code
l.Errorf("Rebalancer::processTokenAsDest Error register clone index on %v %v", r.localaddr+url, err)
// If the error is io.EOF, then it is possible that server side
// may have closed the connection while client is about the send the request.
// Though this is extremely unlikely, this is observed for multiple users
// in golang community. See: https://github.com/golang/go/issues/19943,
// https://groups.google.com/g/golang-nuts/c/A46pBUjdgeM/m/jrn35_IxAgAJ for
// more details
//
// In such a case, instead of failing the rebalance with io.EOF error, retry
// the POST request. Two scenarios exist here:
// (a) Server has received the request and closed the connection (Very unlikely)
// In this case, the request will be processed by server but client will see
// EOF error. Retry will fail rebalance that index definition already exists.
// (b) Server has not received this request. Then retry will work and rebalance
// will not fail.
//
// Instead of failing rebalance with io.EOF error, we retry the request and reduce
// probability of failure
if strings.HasSuffix(err.Error(), ": EOF") {
bodybuf, isErr := getReqBody()
if isErr {
return true
}
resp, err = postWithAuth(r.localaddr+url, "application/json", bodybuf)
if err != nil {
l.Errorf("Rebalancer::processTokenAsDest Error register clone index during retry on %v %v", r.localaddr+url, err)
r.setTransferTokenError(ttid, tt, err.Error())
return true
} else {
l.Infof("Rebalancer::processTokenAsDest Successful POST of createIndexRebalance during retry on %v, defnId: %v, instId: %v",
r.localaddr+url, indexDefn.DefnId, indexDefn.InstId)
}
} else {
r.setTransferTokenError(ttid, tt, err.Error())
return true
}
}
response := new(manager.IndexResponse)
if err := convertResponse(resp, response); err != nil {
l.Errorf("Rebalancer::processTokenAsDest Error unmarshal response %v %v", r.localaddr+url, err)
r.setTransferTokenError(ttid, tt, err.Error())
return true
}
if response.Code == manager.RESP_ERROR {
// Error from index processing code (e.g. the string from common.ErrCollectionNotFound)
if !isMissingBSC(response.Error) {
l.Errorf("Rebalancer::processTokenAsDest Error cloning index %v %v", r.localaddr+url, response.Error)
r.setTransferTokenError(ttid, tt, response.Error)
return true
}
// Ok: failed to create dest index because b/s/c was dropped. Skip to TransferTokenCommit state.
l.Infof("Rebalancer::processTokenAsDest Create destination index failed due to bucket/scope/collection dropped. Skipping. tt %v.", tt)
tt.State = c.TransferTokenCommit
} else {
tt.State = c.TransferTokenAccepted
}
setTransferTokenInMetakv(ttid, tt)
r.mu.Lock()
r.acceptedTokens[ttid] = tt
r.mu.Unlock()
case c.TransferTokenInitiate:
r.mu.Lock()
defer r.mu.Unlock()
att, ok := r.acceptedTokens[ttid]
if !ok {
l.Errorf("Rebalancer::processTokenAsDest Unknown TransferToken for Initiate %v %v", ttid, tt)
r.setTransferTokenError(ttid, tt, "Unknown TransferToken For Initiate")
return true
}
if tt.IndexInst.Defn.Deferred && tt.IndexInst.State == c.INDEX_STATE_READY {
atomic.AddInt32(&r.pendingBuild, 1)
r.tokenMergeOrReady(ttid, tt)
att.State = tt.State
} else {
att.State = c.TransferTokenInProgress
tt.State = c.TransferTokenInProgress
setTransferTokenInMetakv(ttid, tt)
atomic.AddInt32(&r.pendingBuild, 1)
}
logging.Infof("Rebalancer::processTokenAsDest, Incremening pendingBuild due to ttid: %v, pendingBuild value: %v", ttid, atomic.LoadInt32(&r.pendingBuild))
if r.checkIndexReadyToBuild() == true {
if !r.addToWaitGroup() {
return true
}
go r.buildAcceptedIndexes()
}
case c.TransferTokenInProgress:
// Nothing to do here; TT state transitions done in tokenMergeOrReady
case c.TransferTokenMerge:
// Nothing to do here; TT state transitions done in tokenMergeOrReady
default:
return false
}
return true
}
func (r *Rebalancer) checkValidNotifyStateDest(ttid string, tt *c.TransferToken) bool {
r.mu.Lock()
defer r.mu.Unlock()
if tto, ok := r.acceptedTokens[ttid]; ok {
if tt.State <= tto.State {
l.Warnf("Rebalancer::checkValidNotifyStateDest Detected Invalid State "+
"Change Notification. Token Id %v Local State %v Metakv State %v", ttid,
tto.State, tt.State)
return false
}
}
return true
}
func (r *Rebalancer) checkValidNotifyStateSource(ttid string, tt *c.TransferToken) bool {
r.mu.Lock()
defer r.mu.Unlock()