/
cbdatasource.go
2177 lines (1844 loc) · 66.4 KB
/
cbdatasource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.
// Package cbdatasource streams data from a Couchbase cluster. It is
// implemented using Couchbase DCP protocol and has auto-reconnecting
// and auto-restarting goroutines underneath the hood to provide a
// simple, high-level cluster-wide abstraction. By using
// cbdatasource, your application does not need to worry about
// connections or reconnections to individual server nodes or cluster
// topology changes, rebalance & failovers. The API starting point is
// NewBucketDataSource().
package cbdatasource
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/binary"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/couchbase/go-couchbase"
"github.com/couchbase/go-couchbase/trace"
"github.com/couchbase/gomemcached"
"github.com/couchbase/gomemcached/client"
)
const FlagOpenProducer = uint32(1)
const FlagOpenIncludeXattrs = uint32(4)
const FeatureEnabledDataType = uint16(0x01)
const FeatureEnabledXAttrs = uint16(0x06)
const FeatureEnabledXError = uint16(0x07)
const UpdateSecuritySettings = "UpdateSecuritySettings"
var ErrXAttrsNotSupported = fmt.Errorf("xattrs not supported by server")
type SecurityConfig struct {
EncryptData bool
DisableNonSSLPorts bool
Certificates []tls.Certificate
RootCAs *x509.CertPool
}
var currSecurityConfigMutex sync.RWMutex
var currSecurityConfig *SecurityConfig
func init() {
currSecurityConfig = &SecurityConfig{}
}
func UpdateSecurityConfig(newConfig *SecurityConfig) error {
if newConfig == nil {
return fmt.Errorf("security config provided is nil")
}
currSecurityConfigMutex.Lock()
defer currSecurityConfigMutex.Unlock()
currSecurityConfig = newConfig
return nil
}
func fetchGlobalTLSConfig() *tls.Config {
var tlsConfig *tls.Config
currSecurityConfigMutex.RLock()
if currSecurityConfig.EncryptData &&
(currSecurityConfig.RootCAs != nil ||
currSecurityConfig.Certificates != nil) {
tlsConfig = &tls.Config{
RootCAs: currSecurityConfig.RootCAs,
Certificates: currSecurityConfig.Certificates,
}
}
currSecurityConfigMutex.RUnlock()
return tlsConfig
}
// BucketDataSource is the main control interface returned by
// NewBucketDataSource().
type BucketDataSource interface {
// Use Start() to kickoff connectivity to a Couchbase cluster,
// after which calls will be made to the Receiver's methods.
Start() error
// Asynchronously request a cluster map refresh. A reason string
// of "" is valid.
Kick(reason string) error
// Returns an immutable snapshot of stats.
Stats(dest *BucketDataSourceStats) error
// Stops the underlying goroutines.
Close() error
}
// A Receiver interface is implemented by the application, or the
// receiver of data. Calls to methods on this interface will be made
// by the BucketDataSource using multiple, concurrent goroutines, so
// the application should implement its own Receiver-side
// synchronizations if needed.
type Receiver interface {
// Invoked in advisory fashion by the BucketDataSource when it
// encounters an error. The BucketDataSource will continue to try
// to "heal" and restart connections, etc, as necessary. The
// Receiver has a recourse during these error notifications of
// simply Close()'ing the BucketDataSource.
OnError(error)
// Invoked by the BucketDataSource when it has received a mutation
// from the data source. Receiver implementation is responsible
// for making its own copies of the key and request.
DataUpdate(vbucketID uint16, key []byte, seq uint64,
r *gomemcached.MCRequest) error
// Invoked by the BucketDataSource when it has received a deletion
// or expiration from the data source. Receiver implementation is
// responsible for making its own copies of the key and request.
DataDelete(vbucketID uint16, key []byte, seq uint64,
r *gomemcached.MCRequest) error
// An callback invoked by the BucketDataSource when it has
// received a start snapshot message from the data source. The
// Receiver implementation, for example, might choose to optimize
// persistence perhaps by preparing a batch write to
// application-specific storage.
SnapshotStart(vbucketID uint16, snapStart, snapEnd uint64, snapType uint32) error
// The Receiver should persist the value parameter of
// SetMetaData() for retrieval during some future call to
// GetMetaData() by the BucketDataSource. The metadata value
// should be considered "in-stream", or as part of the sequence
// history of mutations. That is, a later Rollback() to some
// previous sequence number for a particular vbucketID should
// rollback both persisted metadata and regular data.
SetMetaData(vbucketID uint16, value []byte) error
// GetMetaData() should return the opaque value previously
// provided by an earlier call to SetMetaData(). If there was no
// previous call to SetMetaData(), such as in the case of a brand
// new instance of a Receiver (as opposed to a restarted or
// reloaded Receiver), the Receiver should return (nil, 0, nil)
// for (value, lastSeq, err), respectively. The lastSeq should be
// the last sequence number received and persisted during calls to
// the Receiver's DataUpdate() & DataDelete() methods.
GetMetaData(vbucketID uint16) (value []byte, lastSeq uint64, err error)
// Invoked by the BucketDataSource when the datasource signals a
// rollback during stream initialization. Note that both data and
// metadata should be rolled back.
Rollback(vbucketID uint16, rollbackSeq uint64) error
}
// A ReceiverEx interface is an advanced Receiver interface that's
// optionally implemented by the application, or the receiver of data.
// Calls to methods on this interface will be made by the
// BucketDataSource using multiple, concurrent goroutines, so the
// application should implement its own Receiver-side synchronizations
// if needed.
type ReceiverEx interface {
Receiver
// Invoked by the BucketDataSource when the datasource signals a
// rollback during stream initialization. Note that both data and
// metadata should be rolled back.
RollbackEx(vbucketID uint16, vbucketUUID uint64, rollbackSeq uint64) error
}
// BucketDataSourceOptions allows the application to provide
// configuration settings to NewBucketDataSource().
type BucketDataSourceOptions struct {
// Optional - used during UPR_OPEN stream start. If empty a
// random name will be automatically generated.
Name string
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a cluster manager node.
ClusterManagerBackoffFactor float32
// Initial sleep time (millisecs) before first retry to cluster manager.
ClusterManagerSleepInitMS int
// Maximum sleep time (millisecs) between retries to cluster manager.
ClusterManagerSleepMaxMS int
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a data manager node.
DataManagerBackoffFactor float32
// Initial sleep time (millisecs) before first retry to data manager.
DataManagerSleepInitMS int
// Maximum sleep time (millisecs) between retries to data manager.
DataManagerSleepMaxMS int
// Buffer size in bytes provided for UPR flow control.
FeedBufferSizeBytes uint32
// Used for UPR flow control and buffer-ack messages when this
// percentage of FeedBufferSizeBytes is reached.
FeedBufferAckThreshold float32
// Time interval in seconds of NO-OP messages for UPR flow control,
// needs to be set to a non-zero value to enable no-ops.
NoopTimeIntervalSecs uint32
// Used for applications like backup which wish to control the
// last sequence number provided. Key is vbucketID, value is seqEnd.
SeqEnd map[uint16]uint64
// Optional function to connect to a couchbase cluster manager bucket.
// Defaults to ConnectBucket() function in this package.
ConnectBucket func(serverURL, poolName, bucketName string,
auth couchbase.AuthHandler) (Bucket, error)
// Optional function to connect to a couchbase data manager node.
// Defaults to memcached.Connect().
Connect func(protocol, dest string) (*memcached.Client, error)
// Optional function to connect to a couchbase data manager nodes, over SSL.
// Defaults to memcached.ConnectTLS().
ConnectTLS func(protocol, dest string, tlsConfig *tls.Config) (
*memcached.Client, error)
// Optional function to fetch tls.Config that would be used for the
// ConnectTLS(..) function (above)
TLSConfig func() *tls.Config
// Optional function for logging diagnostic messages.
Logf func(fmt string, v ...interface{})
// When true, message trace information will be captured and
// reported via the Logf() callback.
TraceCapacity int `json:"-"`
// When there's been no send/receive activity for this many
// milliseconds, then transmit a NOOP to the DCP source. When 0,
// the DefaultBucketDataSourceOptions.PingTimeoutMS is used. Of
// note, the NOOP itself counts as send/receive activity.
PingTimeoutMS int
// IncludeXAttrs is an optional flag which specifies whether
// the clients are interested in the X Attributes values
// during DCP connection set up.
// Defaulted to false to keep it backward compatible.
IncludeXAttrs bool
}
// AllServerURLsConnectBucketError is the error type passed to
// Receiver.OnError() when the BucketDataSource failed to connect to
// all the serverURL's provided as a parameter to
// NewBucketDataSource(). The application, for example, may choose to
// BucketDataSource.Close() based on this error. Otherwise, the
// BucketDataSource will backoff and retry reconnecting to the
// serverURL's.
type AllServerURLsConnectBucketError struct {
ServerURLs []string
}
func (e *AllServerURLsConnectBucketError) Error() string {
return fmt.Sprintf("could not connect to any serverURL: %#v", e.ServerURLs)
}
// AuthFailError is the error type passed to Receiver.OnError() when there
// is an auth request error to the Couchbase cluster or server node.
type AuthFailError struct {
ServerURL string
User string
}
func (e *AuthFailError) Error() string {
return fmt.Sprintf("auth fail, serverURL: %#v, user: %s", e.ServerURL, e.User)
}
// A Bucket interface defines the set of methods that cbdatasource
// needs from an abstract couchbase.Bucket. This separate interface
// allows for easier testability.
type Bucket interface {
Close()
GetUUID() string
VBServerMap() *couchbase.VBucketServerMap
GetPoolServices(string) (*couchbase.PoolServices, error)
}
// DefaultBucketDataSourceOptions defines the default options that
// will be used if nil is provided to NewBucketDataSource().
var DefaultBucketDataSourceOptions = &BucketDataSourceOptions{
ClusterManagerBackoffFactor: 1.5,
ClusterManagerSleepInitMS: 100,
ClusterManagerSleepMaxMS: 1000,
DataManagerBackoffFactor: 1.5,
DataManagerSleepInitMS: 100,
DataManagerSleepMaxMS: 1000,
FeedBufferSizeBytes: 20000000, // ~20MB; see UPR_CONTROL/connection_buffer_size.
FeedBufferAckThreshold: 0.2,
NoopTimeIntervalSecs: 120, // 120 seconds; see UPR_CONTROL/set_noop_interval
TraceCapacity: 200,
PingTimeoutMS: 30000,
IncludeXAttrs: false,
}
// BucketDataSourceStats is filled by the BucketDataSource.Stats()
// method. All the metrics here prefixed with "Tot" are monotonic
// counters: they only increase.
type BucketDataSourceStats struct {
TotStart uint64
TotKick uint64
TotKickDeduped uint64
TotKickOk uint64
TotRefreshCluster uint64
TotRefreshClusterConnectBucket uint64
TotRefreshClusterConnectBucketErr uint64
TotRefreshClusterConnectBucketOk uint64
TotRefreshClusterBucketUUIDErr uint64
TotRefreshClusterVBMNilErr uint64
TotRefreshClusterKickWorkers uint64
TotRefreshClusterKickWorkersClosed uint64
TotRefreshClusterKickWorkersStopped uint64
TotRefreshClusterKickWorkersOk uint64
TotRefreshClusterStopped uint64
TotRefreshClusterAwokenClosed uint64
TotRefreshClusterAwokenStopped uint64
TotRefreshClusterAwokenRestart uint64
TotRefreshClusterAwoken uint64
TotRefreshClusterAllServerURLsConnectBucketErr uint64
TotRefreshClusterDone uint64
TotRefreshWorkersStarted uint64
TotRefreshWorkers uint64
TotRefreshWorkersClusterChKicks uint64
TotRefreshWorkersSecurityUpdates uint64
TotRefreshWorkersVBMNilErr uint64
TotRefreshWorkersVBucketIDErr uint64
TotRefreshWorkersServerIdxsErr uint64
TotRefreshWorkersMasterIdxErr uint64
TotRefreshWorkersMasterServerErr uint64
TotRefreshWorkersRemoveWorker uint64
TotRefreshWorkersAddWorker uint64
TotRefreshWorkersKickWorker uint64
TotRefreshWorkersCloseWorker uint64
TotRefreshWorkersLoop uint64
TotRefreshWorkersLoopDone uint64
TotRefreshWorkersDone uint64
TotWorkerStart uint64
TotWorkerDone uint64
TotWorkerBody uint64
TotWorkerBodyKick uint64
TotWorkerConnect uint64
TotWorkerConnectErr uint64
TotWorkerConnectOk uint64
TotWorkerAuth uint64
TotWorkerAuthErr uint64
TotWorkerAuthFail uint64
TotWorkerAuthOk uint64
TotWorkerUPROpenErr uint64
TotWorkerUPROpenOk uint64
TotWorkerAuthenticateMemcachedConn uint64
TotWorkerAuthenticateMemcachedConnErr uint64
TotWorkerAuthenticateMemcachedConnOk uint64
TotWorkerClientClose uint64
TotWorkerClientCloseDone uint64
TotWorkerTransmitStart uint64
TotWorkerTransmit uint64
TotWorkerTransmitErr uint64
TotWorkerTransmitOk uint64
TotWorkerTransmitDone uint64
TotWorkerReceiveStart uint64
TotWorkerReceive uint64
TotWorkerReceiveErr uint64
TotWorkerReceiveOk uint64
TotWorkerReceiveDone uint64
TotWorkerSendEndCh uint64
TotWorkerRecvEndCh uint64
TotWorkerHandleRecv uint64
TotWorkerHandleRecvErr uint64
TotWorkerHandleRecvOk uint64
TotWorkerCleanup uint64
TotWorkerCleanupDone uint64
TotRefreshWorker uint64
TotRefreshWorkerDone uint64
TotRefreshWorkerOk uint64
TotUPRDataChange uint64
TotUPRDataChangeStateErr uint64
TotUPRDataChangeMutation uint64
TotUPRDataChangeDeletion uint64
TotUPRDataChangeExpiration uint64
TotUPRDataChangeErr uint64
TotUPRDataChangeOk uint64
TotUPRCloseStream uint64
TotUPRCloseStreamRes uint64
TotUPRCloseStreamResStateErr uint64
TotUPRCloseStreamResErr uint64
TotUPRCloseStreamResOk uint64
TotUPRStreamReq uint64
TotUPRStreamReqWant uint64
TotUPRStreamReqRes uint64
TotUPRStreamReqResStateErr uint64
TotUPRStreamReqResFail uint64
TotUPRStreamReqResFailNotMyVBucket uint64
TotUPRStreamReqResFailERange uint64
TotUPRStreamReqResFailENoMem uint64
TotUPRStreamReqResRollback uint64
TotUPRStreamReqResRollbackStart uint64
TotUPRStreamReqResRollbackErr uint64
TotUPRStreamReqResWantAfterRollbackErr uint64
TotUPRStreamReqResKick uint64
TotUPRStreamReqResSuccess uint64
TotUPRStreamReqResSuccessOk uint64
TotUPRStreamReqResFLogErr uint64
TotUPRStreamEnd uint64
TotUPRStreamEndStateErr uint64
TotUPRStreamEndKick uint64
TotUPRSnapshot uint64
TotUPRSnapshotStateErr uint64
TotUPRSnapshotStart uint64
TotUPRSnapshotStartErr uint64
TotUPRSnapshotOk uint64
TotUPRNoop uint64
TotUPRControl uint64
TotUPRControlErr uint64
TotUPRBufferAck uint64
TotWantCloseRequestedVBucketErr uint64
TotWantClosingVBucketErr uint64
TotSelectBucketErr uint64
TotHandShakeErr uint64
TotGetVBucketMetaData uint64
TotGetVBucketMetaDataUnmarshalErr uint64
TotGetVBucketMetaDataErr uint64
TotGetVBucketMetaDataOk uint64
TotSetVBucketMetaData uint64
TotSetVBucketMetaDataMarshalErr uint64
TotSetVBucketMetaDataErr uint64
TotSetVBucketMetaDataOk uint64
TotPingTimeout uint64
TotPingReq uint64
TotPingReqDone uint64
}
// --------------------------------------------------------
// VBucketMetaData is an internal struct that is exposed to enable
// json marshaling.
type VBucketMetaData struct {
SeqStart uint64 `json:"seqStart"`
SeqEnd uint64 `json:"seqEnd"`
SnapStart uint64 `json:"snapStart"`
SnapEnd uint64 `json:"snapEnd"`
FailOverLog [][]uint64 `json:"failOverLog"`
}
type bucketDataSource struct {
serverURLs []string
poolName string
bucketName string
bucketUUID string
vbucketIDs []uint16
auth couchbase.AuthHandler // Auth for couchbase.
receiver Receiver
options *BucketDataSourceOptions
refreshClusterM sync.Mutex // Protects the refreshClusterReasons field.
refreshClusterReasons map[string]uint64
// When refreshClusterReasons transitions from empty to non-empty,
// then refreshClusterCh must be notified.
stopCh chan struct{}
refreshClusterCh chan struct{}
refreshWorkersCh chan string
closedCh chan bool
stats BucketDataSourceStats
m sync.Mutex // Protects all the below fields.
life string // Valid life states: "" (unstarted); "running"; "closed".
vbm *couchbase.VBucketServerMap
ps *couchbase.PoolServices
}
// NewBucketDataSource is the main starting point for using the
// cbdatasource API. The application must supply an array of 1 or
// more serverURLs (or "seed" URL's) to Couchbase Server
// cluster-manager REST URL endpoints, like "http://localhost:8091".
// The BucketDataSource (after Start()'ing) will try each serverURL,
// in turn, until it can get a successful cluster map. Additionally,
// the application must supply a poolName & bucketName from where the
// BucketDataSource will retrieve data. The optional bucketUUID is
// double-checked by the BucketDataSource to ensure we have the
// correct bucket, and a bucketUUID of "" means skip the bucketUUID
// validation. An optional array of vbucketID numbers allows the
// application to specify which vbuckets to retrieve; and the
// vbucketIDs array can be nil which means all vbuckets are retrieved
// by the BucketDataSource. The optional auth parameter can be nil.
// The application must supply its own implementation of the Receiver
// interface (see the example program as a sample). The optional
// options parameter (which may be nil) allows the application to
// specify advanced parameters like backoff and retry-sleep values.
func NewBucketDataSource(
serverURLs []string,
poolName string,
bucketName string,
bucketUUID string,
vbucketIDs []uint16,
auth couchbase.AuthHandler,
receiver Receiver,
options *BucketDataSourceOptions) (BucketDataSource, error) {
if len(serverURLs) < 1 {
return nil, fmt.Errorf("missing at least 1 serverURL")
}
if poolName == "" {
return nil, fmt.Errorf("missing poolName")
}
if bucketName == "" {
return nil, fmt.Errorf("missing bucketName")
}
if receiver == nil {
return nil, fmt.Errorf("missing receiver")
}
if options == nil {
options = DefaultBucketDataSourceOptions
}
return &bucketDataSource{
serverURLs: serverURLs,
poolName: poolName,
bucketName: bucketName,
bucketUUID: bucketUUID,
vbucketIDs: vbucketIDs,
auth: auth,
receiver: receiver,
options: options,
refreshClusterReasons: map[string]uint64{},
stopCh: make(chan struct{}),
refreshClusterCh: make(chan struct{}),
refreshWorkersCh: make(chan string, 1),
closedCh: make(chan bool),
}, nil
}
func (d *bucketDataSource) Start() error {
atomic.AddUint64(&d.stats.TotStart, 1)
d.m.Lock()
if d.life != "" {
d.m.Unlock()
return fmt.Errorf("call to Start() in wrong state: %s", d.life)
}
d.life = "running"
d.m.Unlock()
backoffFactor := d.options.ClusterManagerBackoffFactor
if backoffFactor <= 0.0 {
backoffFactor = DefaultBucketDataSourceOptions.ClusterManagerBackoffFactor
}
sleepInitMS := d.options.ClusterManagerSleepInitMS
if sleepInitMS <= 0 {
sleepInitMS = DefaultBucketDataSourceOptions.ClusterManagerSleepInitMS
}
sleepMaxMS := d.options.ClusterManagerSleepMaxMS
if sleepMaxMS <= 0 {
sleepMaxMS = DefaultBucketDataSourceOptions.ClusterManagerSleepMaxMS
}
go func() {
ExponentialBackoffLoop("cbdatasource.refreshCluster",
func() int { return d.refreshCluster() },
sleepInitMS, backoffFactor, sleepMaxMS)
// We reach here when we need to shutdown.
close(d.refreshWorkersCh)
atomic.AddUint64(&d.stats.TotRefreshClusterDone, 1)
}()
go d.refreshWorkers()
return nil
}
func (d *bucketDataSource) isRunning() bool {
d.m.Lock()
life := d.life
d.m.Unlock()
return life == "running"
}
func (d *bucketDataSource) refreshCluster() int {
atomic.AddUint64(&d.stats.TotRefreshCluster, 1)
if !d.isRunning() {
return -1
}
for _, serverURL := range d.serverURLs {
atomic.AddUint64(&d.stats.TotRefreshClusterConnectBucket, 1)
connectBucket := d.options.ConnectBucket
if connectBucket == nil {
connectBucket = ConnectBucket
}
bucket, err := connectBucket(serverURL, d.poolName, d.bucketName, d.auth)
if err != nil {
// add details only for errors other than BucketNotFoundError
if _, ok := err.(*couchbase.BucketNotFoundError); !ok {
err = fmt.Errorf("connectBucket failed for server: %s,"+
" poolName: %s, bucketName: %s, err: %v",
serverURL, d.poolName, d.bucketName, err)
}
atomic.AddUint64(&d.stats.TotRefreshClusterConnectBucketErr, 1)
d.receiver.OnError(err)
continue // Try another serverURL.
}
atomic.AddUint64(&d.stats.TotRefreshClusterConnectBucketOk, 1)
if d.bucketUUID != "" && d.bucketUUID != bucket.GetUUID() {
bucket.Close()
atomic.AddUint64(&d.stats.TotRefreshClusterBucketUUIDErr, 1)
d.receiver.OnError(fmt.Errorf("mismatched bucket uuid,"+
" serverURL: %s, bucketName: %s, bucketUUID: %s, bucket.UUID: %s",
serverURL, d.bucketName, d.bucketUUID, bucket.GetUUID()))
continue // Try another serverURL.
}
vbm := bucket.VBServerMap()
if vbm == nil {
bucket.Close()
atomic.AddUint64(&d.stats.TotRefreshClusterVBMNilErr, 1)
d.receiver.OnError(fmt.Errorf("refreshCluster got no vbm,"+
" serverURL: %s, bucketName: %s, bucketUUID: %s, bucket.UUID: %s",
serverURL, d.bucketName, d.bucketUUID, bucket.GetUUID()))
continue // Try another serverURL.
}
ps, err := bucket.GetPoolServices(d.poolName)
if err != nil {
d.receiver.OnError(fmt.Errorf("refreshCluster got an error"+
" when it requested for /pools/nodeServices, err: %v", err))
}
bucket.Close()
d.m.Lock()
d.vbm = vbm
d.ps = ps
d.m.Unlock()
for {
atomic.AddUint64(&d.stats.TotRefreshClusterKickWorkers, 1)
select {
case <-d.stopCh:
atomic.AddUint64(&d.stats.TotRefreshClusterKickWorkersStopped, 1)
return -1
case d.refreshWorkersCh <- "new-vbm": // Kick workers to refresh.
// NO-OP.
}
atomic.AddUint64(&d.stats.TotRefreshClusterKickWorkersOk, 1)
// Wait for refreshCluster kick.
var refreshClusterReasons map[string]uint64
for {
d.refreshClusterM.Lock()
if len(d.refreshClusterReasons) > 0 {
refreshClusterReasons = d.refreshClusterReasons
d.refreshClusterReasons = map[string]uint64{}
}
d.refreshClusterM.Unlock()
if len(refreshClusterReasons) > 0 {
break
}
select {
case <-d.stopCh:
atomic.AddUint64(&d.stats.TotRefreshClusterStopped, 1)
return -1
case _, refreshAlive := <-d.refreshClusterCh:
if !refreshAlive {
atomic.AddUint64(&d.stats.TotRefreshClusterAwokenClosed, 1)
return -1
}
}
}
if !d.isRunning() {
atomic.AddUint64(&d.stats.TotRefreshClusterAwokenStopped, 1)
return -1
}
// If it's only that new workers have appeared, then we
// can keep with this inner loop and not have to restart
// all the way at the top / retrieve a new cluster map, etc.
wasNewWorkerOnly :=
len(refreshClusterReasons) == 1 &&
refreshClusterReasons["new-worker"] > 0
if !wasNewWorkerOnly {
atomic.AddUint64(&d.stats.TotRefreshClusterAwokenRestart, 1)
return 1 // Assume progress, so restart at first serverURL.
}
atomic.AddUint64(&d.stats.TotRefreshClusterAwoken, 1)
}
}
// Notify Receiver in case it wants to Close() down this
// BucketDataSource after enough attempts. The typed interfaces
// allow Receiver to have better error handling logic.
atomic.AddUint64(&d.stats.TotRefreshClusterAllServerURLsConnectBucketErr, 1)
d.receiver.OnError(&AllServerURLsConnectBucketError{ServerURLs: d.serverURLs})
return 0 // Ran through all the serverURLs, so no progress.
}
func (d *bucketDataSource) refreshWorkers() {
atomic.AddUint64(&d.stats.TotRefreshWorkersStarted, 1)
// Keyed by server, value is chan of array of vbucketID's that the
// worker needs to provide.
workers := make(map[string]chan []uint16)
var totSecurityUpdates uint64
OUTER_LOOP:
for {
select {
case <-d.refreshWorkersCh:
// Wait for refresh kick
atomic.AddUint64(&d.stats.TotRefreshWorkersClusterChKicks, 1)
case <-d.stopCh:
break OUTER_LOOP
}
// if the security settings refresh kick happens from a tls config update,
// then this stat would have bumped.
latestTotSecurityUpdates :=
atomic.LoadUint64(&d.stats.TotRefreshWorkersSecurityUpdates)
tlsConfigUpdated := (totSecurityUpdates != latestTotSecurityUpdates)
totSecurityUpdates = latestTotSecurityUpdates
var tlsConfig *tls.Config
if tlsConfigCB := d.options.TLSConfig; tlsConfigCB != nil {
tlsConfig = tlsConfigCB()
} else {
tlsConfig = fetchGlobalTLSConfig()
}
atomic.AddUint64(&d.stats.TotRefreshWorkers, 1)
d.m.Lock()
vbm := d.vbm
ps := d.ps
d.m.Unlock()
if vbm == nil {
atomic.AddUint64(&d.stats.TotRefreshWorkersVBMNilErr, 1)
continue
}
// If nil vbucketIDs, then default to all vbucketIDs.
vbucketIDs := d.vbucketIDs
if vbucketIDs == nil {
vbucketIDs = make([]uint16, len(vbm.VBucketMap))
for i := 0; i < len(vbucketIDs); i++ {
vbucketIDs[i] = uint16(i)
}
}
// Group the wanted vbucketIDs by server.
vbucketIDsByServer := make(map[string][]uint16)
for _, vbucketID := range vbucketIDs {
if int(vbucketID) >= len(vbm.VBucketMap) {
atomic.AddUint64(&d.stats.TotRefreshWorkersVBucketIDErr, 1)
d.receiver.OnError(fmt.Errorf("refreshWorkers"+
" saw bad vbucketID: %d",
vbucketID))
d.Kick("bad-vbm")
continue
}
serverIdxs := vbm.VBucketMap[vbucketID]
if serverIdxs == nil || len(serverIdxs) <= 0 {
atomic.AddUint64(&d.stats.TotRefreshWorkersServerIdxsErr, 1)
d.receiver.OnError(fmt.Errorf("refreshWorkers"+
" no serverIdxs for vbucketID: %d",
vbucketID))
continue
}
masterIdx := serverIdxs[0]
if masterIdx < 0 || int(masterIdx) >= len(vbm.ServerList) {
atomic.AddUint64(&d.stats.TotRefreshWorkersMasterIdxErr, 1)
d.receiver.OnError(fmt.Errorf("refreshWorkers"+
" bad masterIdx: %d, vbucketID: %d",
masterIdx, vbucketID))
continue
}
masterServer := vbm.ServerList[masterIdx]
if masterServer == "" {
atomic.AddUint64(&d.stats.TotRefreshWorkersMasterServerErr, 1)
d.receiver.OnError(fmt.Errorf("refreshWorkers"+
" no masterServer for vbucketID: %d",
vbucketID))
continue
}
v, exists := vbucketIDsByServer[masterServer]
if !exists || v == nil {
v = []uint16{}
}
vbucketIDsByServer[masterServer] = append(v, vbucketID)
}
// Remove any extraneous workers.
for server, workerCh := range workers {
if _, exists := vbucketIDsByServer[server]; !exists {
atomic.AddUint64(&d.stats.TotRefreshWorkersRemoveWorker, 1)
delete(workers, server)
close(workerCh)
}
}
// Use the data obtained from /pools/nodeServices to map the regular
// memcached ("kv") port to the "kvSsl" port for SSL, when encryption
// has been enabled. Note that if for some reason data isn't available
// from nodeServices, proceed with non-encrypted DCP.
if ps == nil {
tlsConfig = nil
}
// Add any missing workers and update workers with their
// latest vbucketIDs.
for server, serverVBucketIDs := range vbucketIDsByServer {
workerCh, exists := workers[server]
if !exists || workerCh == nil || tlsConfigUpdated {
if tlsConfigUpdated && workerCh != nil {
// If tlsConfig was updated, close the older worker before
// setting up a new one.
atomic.AddUint64(&d.stats.TotRefreshWorkersRemoveWorker, 1)
close(workerCh)
}
atomic.AddUint64(&d.stats.TotRefreshWorkersAddWorker, 1)
workerCh = make(chan []uint16, 1)
workers[server] = workerCh
var conf *tls.Config
if tlsConfig != nil {
serverSSL, encrypted, err := couchbase.MapKVtoSSLExt(server, ps, true)
// If the "kv" port for the selected server wasn't
// successfully mapped to it's "kvSsl" port, silently
// fall back to using non-encrypted DCP to support
// mixed-version cluster scenarios.
if err == nil {
server = serverSSL
if encrypted {
conf = tlsConfig
} else {
conf = nil
}
} else if d.options.Logf != nil {
d.options.Logf("cbdatasource: falling back to non"+
" encrypted dcp as MapKVtoSSL err: %v for"+
" server: %s", err, server)
}
}
d.workerStart(server, workerCh, conf)
}
select {
case <-d.stopCh:
break OUTER_LOOP
case workerCh <- serverVBucketIDs:
// NOOP.
}
atomic.AddUint64(&d.stats.TotRefreshWorkersKickWorker, 1)
}
atomic.AddUint64(&d.stats.TotRefreshWorkersLoop, 1)
}
atomic.AddUint64(&d.stats.TotRefreshWorkersLoopDone, 1)
// We reach here when we need to shutdown.
for _, workerCh := range workers {
atomic.AddUint64(&d.stats.TotRefreshWorkersCloseWorker, 1)
close(workerCh)
}
close(d.closedCh)
atomic.AddUint64(&d.stats.TotRefreshWorkersDone, 1)
}
// A worker connects to one data manager server.
func (d *bucketDataSource) workerStart(server string, workerCh chan []uint16, tlsConfig *tls.Config) {
backoffFactor := d.options.DataManagerBackoffFactor
if backoffFactor <= 0.0 {
backoffFactor = DefaultBucketDataSourceOptions.DataManagerBackoffFactor
}
sleepInitMS := d.options.DataManagerSleepInitMS
if sleepInitMS <= 0 {
sleepInitMS = DefaultBucketDataSourceOptions.DataManagerSleepInitMS
}
sleepMaxMS := d.options.DataManagerSleepMaxMS
if sleepMaxMS <= 0 {
sleepMaxMS = DefaultBucketDataSourceOptions.DataManagerSleepMaxMS
}
// Use exponential backoff loop to handle connect retries to the server.
go func() {
atomic.AddUint64(&d.stats.TotWorkerStart, 1)
ExponentialBackoffLoop("cbdatasource.worker-"+server,
func() int { return d.worker(server, workerCh, tlsConfig) },
sleepInitMS, backoffFactor, sleepMaxMS)
atomic.AddUint64(&d.stats.TotWorkerDone, 1)
}()
}
type VBucketState struct {
// Valid values for state: "" (dead/closed/unknown), "requested",
// "running", "closing".
State string
SnapStart uint64
SnapEnd uint64
FailOverLog [][]uint64
SnapSaved bool // True when the snapStart/snapEnd have been persisted.
}
// Connect once to the server and work the UPR stream. If anything
// goes wrong, return our level of progress in order to let our caller
// control any potential retries.
func (d *bucketDataSource) worker(server string, workerCh chan []uint16, tlsConfig *tls.Config) int {
atomic.AddUint64(&d.stats.TotWorkerBody, 1)
if !d.isRunning() {
return -1
}
atomic.AddUint64(&d.stats.TotWorkerConnect, 1)
emptyWorkerCh := func() int {
for {
select {
case _, ok := <-workerCh:
if !ok {
return -1 // workerCh was closed
}
// Else, keep looping to consume workerCh.
default:
return 0 // Stop loop when workerCh is empty.
}
}
}
var client *memcached.Client
var err error
if tlsConfig == nil {
connect := d.options.Connect