/
cluster_info_lite.go
2643 lines (2212 loc) · 69.9 KB
/
cluster_info_lite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package common
import (
"errors"
"fmt"
"math"
"net"
"net/http"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/couchbase/indexing/secondary/audit"
"github.com/couchbase/indexing/secondary/common/collections"
couchbase "github.com/couchbase/indexing/secondary/dcp"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/security"
)
/*
1. nodesInfo, collectionInfo, bucketInfo are the data structures that are formed
after getting data from ns_server
2. ClusterInfoCacheLite will have all the data cached in atomic holders. Pointers
to above data are updated atomically on update using these holders.
3. ClusterInfoCacheLiteManager will watch the streaming endpoints and the clients
and update the cache, by fetching again from ns_server if needed.
4. ClusterInfoCacheLiteClient will provide all the APIs to access and use the
data which can have some customization at user level if needed.
5. Indices to data like NodeId can become invalid on update. So they must not
be used across multiple instances. Eg: GetNodeInfo will give us a nodeInfo
pointer. nodeInfo.GetNodeIdsByServiceType will give us NodeIds these should not
be used with another instance of nodeInfo fetched again later.
*/
var singletonCICLContainer struct {
sync.Mutex
ciclMgr *clusterInfoCacheLiteManager
refCount int // RefCount of ciclMgr to close it when it is 0
}
var majorVersionCICL uint32
const MAJOR_VERSION_7 = 7
func SetMajorVersionCICL(ver uint32) {
logging.Infof("SetMajorVersionCICL: Setting cluster version to %v", ver)
atomic.StoreUint32(&majorVersionCICL, ver)
}
func GetMajorVersionCICL() uint32 {
return atomic.LoadUint32(&majorVersionCICL)
}
var ErrorEventWaitTimeout = errors.New("error event wait timeout")
var ErrorUnintializedNodesInfo = errors.New("error uninitialized nodesInfo")
var ErrorThisNodeNotFound = errors.New("error thisNode not found")
var ErrorSingletonCICLMgrNotFound = errors.New("singleton manager not found")
func SetCICLMgrTimeDiffToForceFetch(minutes uint32) {
singletonCICLContainer.Lock()
defer singletonCICLContainer.Unlock()
if mgr := singletonCICLContainer.ciclMgr; mgr != nil {
mgr.setTimeDiffToForceFetch(minutes)
} else {
logging.Warnf("SetCICLMgrTimeDiffToForceFetch: Singleton Manager in ClusterInfoCacheLite is not set")
}
}
func SetCICLMgrSleepTimeOnNotifierRestart(milliSeconds uint32) {
singletonCICLContainer.Lock()
defer singletonCICLContainer.Unlock()
if mgr := singletonCICLContainer.ciclMgr; mgr != nil {
mgr.setNotifierRetrySleep(milliSeconds)
} else {
logging.Warnf("SetCICLMgrSleepTimeOnNotifierRestart: Singleton Manager in ClusterInfoCacheLite is not set")
}
}
func GetCICLStats() (Statistics, error) {
singletonCICLContainer.Lock()
defer singletonCICLContainer.Unlock()
s := make(map[string]interface{})
mgr := singletonCICLContainer.ciclMgr
if mgr == nil {
s["ref_count"] = 0
s["status"] = ErrorSingletonCICLMgrNotFound.Error()
return NewStatistics(s)
}
s["ref_count"] = singletonCICLContainer.refCount
s["status"] = "singleton manager running"
// TODO: Add more stats later as needed
return NewStatistics(s)
}
func HandleCICLStats(w http.ResponseWriter, r *http.Request) {
_, valid, err := IsAuthValid(r)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(err.Error() + "\n"))
return
} else if !valid {
audit.Audit(AUDIT_UNAUTHORIZED, r, "StatsManager::handleCICLStats", "")
w.WriteHeader(http.StatusUnauthorized)
w.Write(HTTP_STATUS_UNAUTHORIZED)
return
}
if r.Method == "GET" {
stats, err := GetCICLStats()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
errStr := fmt.Sprintf("error while retrieving stats: %v", err)
w.Write([]byte(errStr))
}
data, err := stats.Encode()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
errStr := fmt.Sprintf("error while marshaling stats: %v", err)
w.Write([]byte(errStr))
}
w.WriteHeader(http.StatusOK)
w.Write(data)
} else {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("Unsupported method"))
}
}
//
// Nodes Info
//
type NodesInfo struct {
version uint32
minorVersion uint32
nodes []couchbase.Node
nodesExt []couchbase.NodeServices
addNodes []couchbase.Node
failedNodes []couchbase.Node
encryptedPortMap map[string]string
node2group map[NodeId]string
clusterURL string
bucketNames []couchbase.BucketName
bucketURLMap map[string]string
// Note: Static port information is populated with information from the
// command line. This is only used to get the port information on local
// node. This is needed as PoolChangeNotification does not have port
// numbers specific to indexer till service manager register with ns_server
// but we will need this port information before that registration in the
// boot process
useStaticPorts bool
servicePortMap map[string]string
valid bool
errList []error
lastUpdatedTs time.Time
nodeServicesHash string
StubRWMutex // Stub to make NodesInfo replaceable with ClusterInfoCache
}
func newNodesInfo(pool *couchbase.Pool, clusterURL, nodeSvsHash string) *NodesInfo {
var nodes []couchbase.Node
var failedNodes []couchbase.Node
var addNodes []couchbase.Node
version := uint32(math.MaxUint32)
minorVersion := uint32(math.MaxUint32)
for _, n := range pool.Nodes {
if n.ClusterMembership == "active" {
nodes = append(nodes, n)
} else if n.ClusterMembership == "inactiveFailed" {
// node being failed over
failedNodes = append(failedNodes, n)
} else if n.ClusterMembership == "inactiveAdded" {
// node being added (but not yet rebalanced in)
addNodes = append(addNodes, n)
} else {
logging.Warnf("newNodesInfo: unrecognized node membership %v", n.ClusterMembership)
}
// Find the minimum cluster compatibility
v := uint32(n.ClusterCompatibility / 65536)
minorv := uint32(n.ClusterCompatibility) - (v * 65536)
if v < version || (v == version && minorv < minorVersion) {
version = v
minorVersion = minorv
}
}
if version == math.MaxUint32 {
version = 0
}
newNInfo := &NodesInfo{
nodes: nodes,
addNodes: addNodes,
failedNodes: failedNodes,
version: version,
minorVersion: minorVersion,
clusterURL: clusterURL,
nodeServicesHash: nodeSvsHash,
node2group: make(map[NodeId]string),
}
for i, node := range nodes {
newNInfo.node2group[NodeId(i)] = node.ServerGroup
}
if len(pool.BucketNames) != 0 {
bucketNames := make([]couchbase.BucketName, len(pool.BucketNames))
for i, bn := range pool.BucketNames {
bucketNames[i] = bn
}
newNInfo.bucketNames = bucketNames
}
bucketURLMap := make(map[string]string)
for k, v := range pool.BucketURL {
bucketURLMap[k] = v
}
newNInfo.bucketURLMap = bucketURLMap
if ServiceAddrMap != nil {
newNInfo.useStaticPorts = true
newNInfo.servicePortMap = ServiceAddrMap
}
return newNInfo
}
func newNodesInfoWithError(err error) *NodesInfo {
ni := &NodesInfo{}
ni.valid = false
ni.errList = append(ni.errList, err)
return ni
}
func (ni *NodesInfo) setNodesInfoWithError(err error) *NodesInfo {
ni.valid = false
ni.errList = append(ni.errList, err)
return ni
}
// Clone will do shallow copy and hence will have new copy of mutable valid and
// errList fields and hence the *NodesInfo returned can be used to replace the
// value of nih in cicl atomically
func (ni *NodesInfo) clone() *NodesInfo {
newNInfo := &NodesInfo{
version: ni.version,
minorVersion: ni.minorVersion,
clusterURL: ni.clusterURL,
nodeServicesHash: ni.nodeServicesHash,
}
newNInfo.nodes = append(newNInfo.nodes, ni.nodes...)
newNInfo.addNodes = append(newNInfo.addNodes, ni.addNodes...)
newNInfo.failedNodes = append(newNInfo.failedNodes, ni.failedNodes...)
newNInfo.bucketNames = append(newNInfo.bucketNames, ni.bucketNames...)
newNInfo.node2group = make(map[NodeId]string)
for i, node := range ni.nodes {
newNInfo.node2group[NodeId(i)] = node.ServerGroup
}
newNInfo.bucketURLMap = make(map[string]string)
for k, v := range ni.bucketURLMap {
newNInfo.bucketURLMap[k] = v
}
if ServiceAddrMap != nil {
newNInfo.useStaticPorts = true
newNInfo.servicePortMap = ServiceAddrMap
}
return newNInfo
}
func (ni *NodesInfo) setNodesExt(nodesExt []couchbase.NodeServices) {
if nodesExt == nil {
logging.Warnf("setNodesExt: nodesExt is nil")
return
}
for _, ns := range nodesExt {
nns := couchbase.NodeServices{
ThisNode: ns.ThisNode,
Hostname: ns.Hostname,
Services: make(map[string]int),
}
for k, v := range ns.Services {
nns.Services[k] = v
}
ni.nodesExt = append(ni.nodesExt, nns)
}
ni.encryptedPortMap = buildEncryptPortMapping(ni.nodesExt)
}
func (ni *NodesInfo) validateNodesAndSvs(connHost string) {
found := false
for _, node := range ni.nodes {
if node.ThisNode {
found = true
}
}
if !found {
ni.valid = false
ni.errList = append(ni.errList, ErrorThisNodeNotFound)
logging.Warnf("ThisNode not found for any node in pool")
return
}
if len(ni.nodes) == 0 || len(ni.nodesExt) == 0 {
ni.valid = false
ni.errList = append(ni.errList, ErrValidationFailed)
return
}
//validation not required for single node setup(MB-16494)
if len(ni.nodes) == 1 && len(ni.nodesExt) == 1 {
ni.valid = true
ni.lastUpdatedTs = time.Now()
return
}
var hostsFromNodes []string
var hostsFromNodesExt []string
for _, n := range ni.nodes {
hostsFromNodes = append(hostsFromNodes, n.Hostname)
}
for _, svc := range ni.nodesExt {
h := svc.Hostname
if h == "" {
// 1. For nodeServices if the configured hostname is 127.0.0.1
// hostname is not emitted client should use the hostname
// it’s already using to access other ports for that node
// 2. For pools/default if the configured hostname is 127.0.0.1
// hostname is emitted as the interface on which the
// pools/default request is received
h, _, _ = net.SplitHostPort(connHost)
}
p := svc.Services["mgmt"]
hp := net.JoinHostPort(h, fmt.Sprint(p))
hostsFromNodesExt = append(hostsFromNodesExt, hp)
}
if len(ni.nodes) != len(ni.nodesExt) {
logging.Warnf("validateNodesAndSvs - Failed as len(nodes): %v != len(nodesExt): %v", len(ni.nodes),
len(ni.nodesExt))
logging.Warnf("HostNames Nodes: %v NodesExt: %v", hostsFromNodes, hostsFromNodesExt)
ni.valid = false
ni.errList = append(ni.errList, ErrValidationFailed)
return
}
for i, hn := range hostsFromNodesExt {
if hostsFromNodes[i] != hn {
logging.Warnf("validateNodesAndSvs - Failed as hostname in nodes: %s != the one from nodesExt: %s", hostsFromNodes[i], hn)
ni.valid = false
ni.errList = append(ni.errList, ErrValidationFailed)
return
}
}
ni.valid = true
ni.lastUpdatedTs = time.Now()
return
}
//
// Collection Info
//
type collectionInfo struct {
bucketName string
manifest *collections.CollectionManifest
valid bool
errList []error
lastUpdatedTs time.Time
StubRWMutex // Stub to make CollectionInfo replaceable with ClusterInfoCache
}
func newCollectionInfo(bucketName string, manifest *collections.CollectionManifest) *collectionInfo {
return &collectionInfo{
bucketName: bucketName,
manifest: manifest,
valid: true,
lastUpdatedTs: time.Now(),
}
}
func newCollectionInfoWithErr(bucketName string, err error) *collectionInfo {
ci := &collectionInfo{
bucketName: bucketName,
}
ci.valid = false
ci.errList = append(ci.errList, err)
return ci
}
//
// Bucket Info
//
type bucketInfo struct {
bucket *couchbase.Bucket
clusterURL string
valid bool
errList []error
lastUpdatedTs time.Time
StubRWMutex // Stub to make BucketInfo replaceable with ClusterInfoCache
}
func newBucketInfo(tb *couchbase.Bucket, connHost string) *bucketInfo {
bi := &bucketInfo{
bucket: tb,
}
bi.bucket.NormalizeHostnames(connHost)
bi.valid = true
bi.lastUpdatedTs = time.Now()
return bi
}
func newBucketInfoWithErr(bucketName string, err error) *bucketInfo {
bi := &bucketInfo{
bucket: &couchbase.Bucket{Name: bucketName},
}
bi.valid = false
bi.errList = append(bi.errList, err)
return bi
}
func (bi *bucketInfo) setClusterURL(u string) {
bi.clusterURL = u
}
func (bi *bucketInfo) String() string {
return fmt.Sprintf("bucket: %+v cluster: %v", bi.bucket, bi.clusterURL)
}
func NewBucketInfo(cluster, pooln, bucketn string) (*bucketInfo, error) {
if strings.HasPrefix(cluster, "http") {
u, err := url.Parse(cluster)
if err != nil {
return nil, err
}
cluster = u.Host
}
ah := &CbAuthHandler{
Hostport: cluster,
Bucket: bucketn,
}
client, err := couchbase.ConnectWithAuth("http://"+cluster, ah)
if err != nil {
return nil, err
}
client.SetUserAgent("NewBucketInfo")
var bucket *couchbase.Bucket
retryCount := 0
terseBucketsBase := fmt.Sprintf("/pools/%v/b/", pooln)
for retry := true; retry && retryCount <= 5; retryCount++ {
retry, bucket, err = client.GetTerseBucket(terseBucketsBase, bucketn)
if retry {
time.Sleep(5 * time.Millisecond)
}
}
if retryCount > 1 {
logging.Warnf("NewBucketInfo: Retried %v times due to out of sync for"+
" bucket %s. Final err: %v", retryCount, bucketn, err)
}
if err != nil {
return nil, err
}
bi := newBucketInfo(bucket, client.BaseURL.Host)
bi.setClusterURL(cluster)
return bi, err
}
//
// PoolInfo
//
type PoolInfo struct {
pool *couchbase.Pool
}
func newPoolInfo(p *couchbase.Pool) *PoolInfo {
return &PoolInfo{pool: p}
}
//
// Cluster Info Cache Lite
//
type clusterInfoCacheLite struct {
logPrefix string
nih nodesInfoHolder
cihm map[string]collectionInfoHolder
cihmLock sync.RWMutex
bihm map[string]bucketInfoHolder
bihmLock sync.RWMutex
pih poolInfoHolder
}
func newClusterInfoCacheLite(logPrefix string) *clusterInfoCacheLite {
c := &clusterInfoCacheLite{logPrefix: logPrefix}
c.cihm = make(map[string]collectionInfoHolder)
c.bihm = make(map[string]bucketInfoHolder)
c.nih.Init()
c.pih.Init()
return c
}
// nodesInfo used by manager during API calls if this is nil the API cannot use
// the data. This should not be nil as constructor of manager will ensure atleast
// one pool is fetched before it returns a manager object
func (cicl *clusterInfoCacheLite) nodesInfo() *NodesInfo {
if ptr := cicl.nih.Get(); ptr != nil {
return ptr
} else {
return newNodesInfoWithError(ErrorUnintializedNodesInfo)
}
}
// getNodesInfo is used by manager while handling the notifications
func (cicl *clusterInfoCacheLite) getNodesInfo() *NodesInfo {
return cicl.nih.Get()
}
// setNodesInfo is used by manager while handling the notifications
func (cicl *clusterInfoCacheLite) setNodesInfo(ni *NodesInfo) {
cicl.nih.Set(ni)
}
// TODO : Check log redaction and add other objects for printing.
func (cicl *clusterInfoCacheLite) String() string {
ni := cicl.nih.Get()
if ni != nil {
return fmt.Sprintf("%v", ni)
}
return ""
}
func (cicl *clusterInfoCacheLite) addCollnInfo(bucketName string,
ci *collectionInfo) error {
cicl.cihmLock.Lock()
defer cicl.cihmLock.Unlock()
if _, ok := cicl.cihm[bucketName]; ok {
return ErrBucketAlreadyExist
}
cih := collectionInfoHolder{}
cih.Init()
cih.Set(ci)
cicl.cihm[bucketName] = cih
return nil
}
func (cicl *clusterInfoCacheLite) deleteCollnInfo(bucketName string) error {
cicl.cihmLock.Lock()
defer cicl.cihmLock.Unlock()
if _, ok := cicl.cihm[bucketName]; !ok {
return ErrBucketNotFound
}
delete(cicl.cihm, bucketName)
return nil
}
func (cicl *clusterInfoCacheLite) updateCollnInfo(bucketName string,
ci *collectionInfo) error {
cicl.cihmLock.RLock()
defer cicl.cihmLock.RUnlock()
cih, ok := cicl.cihm[bucketName]
if !ok {
return ErrBucketNotFound
}
cih.Set(ci)
return nil
}
// getCollnInfo will retun *collectionInfo it can be valid or not
// error returned is related to the availability of data in cache and is used by
// CICL Manager. When data is not available in cache an invalid *collectionInfo is
// returned with errList[0] set to error
func (cicl *clusterInfoCacheLite) getCollnInfo(bucketName string) (*collectionInfo,
error) {
cicl.cihmLock.RLock()
defer cicl.cihmLock.RUnlock()
cih, ok := cicl.cihm[bucketName]
if !ok {
ci := newCollectionInfoWithErr(bucketName, ErrBucketNotFound)
return ci, ci.errList[0]
}
ci := cih.Get()
if ci == nil {
ci := newCollectionInfoWithErr(bucketName, ErrUnInitializedClusterInfo)
return ci, ci.errList[0]
}
return ci, nil
}
func (cicl *clusterInfoCacheLite) addBucketInfo(bucketName string, bi *bucketInfo) error {
cicl.bihmLock.Lock()
defer cicl.bihmLock.Unlock()
if _, ok := cicl.bihm[bucketName]; ok {
return ErrBucketAlreadyExist
}
bih := bucketInfoHolder{}
bih.Init()
bih.Set(bi)
cicl.bihm[bucketName] = bih
return nil
}
func (cicl *clusterInfoCacheLite) deleteBucketInfo(bucketName string) error {
cicl.bihmLock.Lock()
defer cicl.bihmLock.Unlock()
if _, ok := cicl.bihm[bucketName]; !ok {
return ErrBucketNotFound
}
delete(cicl.bihm, bucketName)
return nil
}
func (cicl *clusterInfoCacheLite) updateBucketInfo(bucketName string,
bi *bucketInfo) error {
cicl.bihmLock.RLock()
defer cicl.bihmLock.RUnlock()
bih, ok := cicl.bihm[bucketName]
if !ok {
return ErrBucketNotFound
}
bih.Set(bi)
return nil
}
// getBucketInfo returns bi it can be valid or invalid. error is used by CICL
// manager to check if bucket exists in cache or not. error here does not indicate
// validity of bucketInfo returned its error related to availability of bi in cache.
// Invalid bi is returned with error set if the data is not available in cache.
func (cicl *clusterInfoCacheLite) getBucketInfo(bucketName string) (*bucketInfo,
error) {
cicl.bihmLock.RLock()
defer cicl.bihmLock.RUnlock()
bih, ok := cicl.bihm[bucketName]
if !ok {
bi := newBucketInfoWithErr(bucketName, ErrBucketNotFound)
return bi, bi.errList[0]
}
bi := bih.Get()
if bi == nil {
bi := newBucketInfoWithErr(bucketName, ErrUnInitializedClusterInfo)
return bi, bi.errList[0]
}
return bi, nil
}
func (cicl *clusterInfoCacheLite) setPoolInfo(pi *PoolInfo) {
cicl.pih.Set(pi)
}
func (cicl *clusterInfoCacheLite) getPoolInfo() *PoolInfo {
return cicl.pih.Get()
}
//
// Cluster Info Cache Lite Manager
//
type clusterInfoCacheLiteManager struct {
clusterURL string
poolName string
logPrefix string
cicl *clusterInfoCacheLite
// Used for making adhoc queries to ns_server
client couchbase.Client
ticker *time.Ticker
timeDiffToForceFetch uint32 // In minutes
poolsStreamingCh chan Notification
collnManifestCh chan Notification
perBucketCollnManifestCh map[string]chan Notification
collnBucketsHash string
bucketInfoCh chan Notification
bucketInfoChPerBucket map[string]chan Notification
bucketURLMap map[string]string
bInfoBucketsHash string
eventMgr *eventManager
eventCtr uint64
maxRetries uint32
retryInterval uint32
notifierRetrySleep uint32
closeCh chan bool
}
func newClusterInfoCacheLiteManager(cicl *clusterInfoCacheLite, clusterURL,
poolName, logPrefix string, config Config) (*clusterInfoCacheLiteManager,
error) {
cicm := &clusterInfoCacheLiteManager{
poolName: poolName,
logPrefix: logPrefix,
cicl: cicl,
timeDiffToForceFetch: config["force_after"].Uint32(), // In Minutes
poolsStreamingCh: make(chan Notification, 1000),
notifierRetrySleep: config["notifier_restart_sleep"].Uint32(), // In Milliseconds
retryInterval: uint32(CLUSTER_INFO_DEFAULT_RETRY_INTERVAL.Seconds()),
collnManifestCh: make(chan Notification, 10000),
perBucketCollnManifestCh: make(map[string]chan Notification),
bucketInfoCh: make(chan Notification, 10000),
bucketInfoChPerBucket: make(map[string]chan Notification),
closeCh: make(chan bool),
}
var err error
cicm.clusterURL, err = ClusterAuthUrl(clusterURL)
if err != nil {
return nil, err
}
cicm.client, err = couchbase.Connect(cicm.clusterURL)
if err != nil {
return nil, err
}
cicm.eventMgr, err = newEventManager(1, 500)
if err != nil {
return nil, err
}
cicm.client.SetUserAgent(logPrefix)
// Try fetching few times in the constructor
// With 2 Sec retry interval by default try for 120 Sec and then give up
cicm.maxRetries = 60
ni, p := cicm.FetchNodesInfo(nil)
if ni == nil {
// At least one successful pool call is needed
return nil, ErrUnInitializedClusterInfo
}
cicm.cicl.setNodesInfo(ni)
SetMajorVersionCICL(ni.version)
pi := newPoolInfo(p)
cicm.cicl.setPoolInfo(pi)
// Try Fetching default number of times else where
cicm.maxRetries = CLUSTER_INFO_DEFAULT_RETRIES
cicm.bucketURLMap = make(map[string]string)
for k, v := range ni.bucketURLMap {
cicm.bucketURLMap[k] = v
}
go cicm.handlePoolsChangeNotifications()
for _, bn := range ni.bucketNames {
msg := &couchbase.Bucket{Name: bn.Name}
notif := Notification{Type: ForceUpdateNotification, Msg: msg}
if ni.version >= MAJOR_VERSION_7 {
ch := make(chan Notification, 100)
cicm.perBucketCollnManifestCh[bn.Name] = ch
go cicm.handlePerBucketCollectionManifest(bn.Name, ch)
ch <- notif
}
ch1 := make(chan Notification, 100)
cicm.bucketInfoChPerBucket[bn.Name] = ch1
go cicm.handleBucketInfoChangesPerBucket(bn.Name, ch1)
ch1 <- notif
}
go cicm.handleCollectionManifestChanges()
go cicm.handleBucketInfoChanges()
go cicm.watchClusterChanges(false)
go cicm.periodicUpdater()
logging.Infof("newClusterInfoCacheLiteManager: started New clusterInfoCacheManager")
return cicm, nil
}
func readWithTimeout(ch <-chan interface{}, timeout uint32) (interface{}, error) {
if timeout == 0 {
event := <-ch
return event, nil
}
select {
case msg := <-ch:
return msg, nil
case <-time.After(time.Duration(timeout) * time.Second):
return nil, ErrorEventWaitTimeout
}
}
func (cicm *clusterInfoCacheLiteManager) sendToCollnManifestCh(msg Notification) {
if GetMajorVersionCICL() >= MAJOR_VERSION_7 {
cicm.collnManifestCh <- msg
}
}
func (cicm *clusterInfoCacheLiteManager) close() {
close(cicm.closeCh)
cicm.ticker.Stop()
close(cicm.poolsStreamingCh)
close(cicm.collnManifestCh)
for _, ch := range cicm.perBucketCollnManifestCh {
close(ch)
}
close(cicm.bucketInfoCh)
for _, ch := range cicm.bucketInfoChPerBucket {
close(ch)
}
logging.Infof("closed clusterInfoCacheLiteManager")
}
func (cicm *clusterInfoCacheLiteManager) setTimeDiffToForceFetch(minutes uint32) {
logging.Infof("clusterInfoCacheLiteManager: Setting time difference interval in manager to force fetch to %d minutes", minutes)
atomic.StoreUint32(&cicm.timeDiffToForceFetch, minutes)
}
func (cicm *clusterInfoCacheLiteManager) setMaxRetries(maxRetries uint32) {
logging.Infof("clusterInfoCacheLiteManager: Setting max retries in manager to %d", maxRetries)
atomic.StoreUint32(&cicm.maxRetries, maxRetries)
}
func (cicm *clusterInfoCacheLiteManager) setRetryInterval(seconds uint32) {
logging.Infof("clusterInfoCacheLiteManager: Setting retry interval in manager to %d seconds", seconds)
atomic.StoreUint32(&cicm.retryInterval, seconds)
}
func (cicm *clusterInfoCacheLiteManager) setNotifierRetrySleep(milliSeconds uint32) {
logging.Infof("clusterInfoCacheLiteManager: Setting sleep interval upon notifier restart in manager to %d milliSeconds", milliSeconds)
atomic.StoreUint32(&cicm.notifierRetrySleep, milliSeconds)
}
func (cicm *clusterInfoCacheLiteManager) nodesInfo() (*NodesInfo, error) {
ni := cicm.cicl.nodesInfo()
if !ni.valid {
return ni, ni.errList[0]
} else {
return ni, nil
}
}
func (cicm *clusterInfoCacheLiteManager) nodesInfoSync(eventTimeoutSeconds uint32) (
*NodesInfo, error) {
ni := cicm.cicl.nodesInfo()
if !ni.valid {
id := fmt.Sprintf("%d", atomic.AddUint64(&cicm.eventCtr, 1))
evtCount := cicm.eventMgr.count(EVENT_NODEINFO_UPDATED)
ch, err := cicm.eventMgr.register(id, EVENT_NODEINFO_UPDATED)
if err != nil {
return nil, err
}
if len(cicm.poolsStreamingCh) == 0 && evtCount == 0 {
notif := Notification{
Type: ForceUpdateNotification,
Msg: &couchbase.Pool{},
}
cicm.poolsStreamingCh <- notif
}
msg, err := readWithTimeout(ch, eventTimeoutSeconds)
if err != nil {
return nil, err
}
// NodeInfo event is notified only when its valid
// If command channel goes empty and it its still invalid
// periodic check will restart the processing or after timeout
// user can trigger the command again
ni = msg.(*NodesInfo)
}
return ni, nil
}
// bucketInfo will return *bucketInfo object it can be valid or not
func (cicm *clusterInfoCacheLiteManager) bucketInfo(bucketName string) *bucketInfo {
bi, _ := cicm.cicl.getBucketInfo(bucketName)
return bi
}
// bucketInfoSync will return (valid *bucketInfo, nil) when valid and (nil, err) when not valid
func (cicm *clusterInfoCacheLiteManager) bucketInfoSync(bucketName string, eventWaitTimeoutSeconds uint32) (
*bucketInfo, error) {
bi, _ := cicm.cicl.getBucketInfo(bucketName)
if bi.valid {
return bi, nil
}
id := fmt.Sprintf("%d", atomic.AddUint64(&cicm.eventCtr, 1))
evtType := getBucketInfoEventType(bucketName)
evtCount := cicm.eventMgr.count(evtType)
ch, err := cicm.eventMgr.register(id, evtType)
if err != nil {
return nil, err
}
if evtCount == 0 {
msg := Notification{
Type: ForceUpdateNotification,
Msg: &couchbase.Bucket{Name: bucketName},
}
cicm.bucketInfoCh <- msg
}
msg, err := readWithTimeout(ch, eventWaitTimeoutSeconds)
if err != nil {
return nil, err
}
bi = msg.(*bucketInfo)
if !bi.valid {
return nil, bi.errList[0]
}
return bi, nil
}
// collectionInfo returns *collectionInfo it can be valid or invalid
func (cicm *clusterInfoCacheLiteManager) collectionInfo(bucketName string) *collectionInfo {
ci, _ := cicm.cicl.getCollnInfo(bucketName)
return ci
}
// collectionInfoSync will return (*collectionInfo, nil) when valid and (nil, err) if not valid
func (cicm *clusterInfoCacheLiteManager) collectionInfoSync(bucketName string,
eventTimeoutSeconds uint32) (*collectionInfo, error) {
if GetMajorVersionCICL() < MAJOR_VERSION_7 {
return nil, ErrBucketNotFound
}
ci, _ := cicm.cicl.getCollnInfo(bucketName)
if ci.valid {
return ci, nil
}
id := fmt.Sprintf("%d", atomic.AddUint64(&cicm.eventCtr, 1))
evtType := getClusterInfoEventType(bucketName)
evtCount := cicm.eventMgr.count(evtType)
ch, err := cicm.eventMgr.register(id, evtType)