-
Notifications
You must be signed in to change notification settings - Fork 13
/
vbucket_discovery.go
109 lines (86 loc) · 2.93 KB
/
vbucket_discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package stream
import (
"errors"
"github.com/asaskevich/EventBus"
"github.com/Trendyol/go-dcp/config"
"github.com/Trendyol/go-dcp/kubernetes"
"github.com/Trendyol/go-dcp/couchbase"
"github.com/Trendyol/go-dcp/helpers"
"github.com/Trendyol/go-dcp/logger"
"github.com/Trendyol/go-dcp/membership"
)
type VBucketDiscovery interface {
Get() []uint16
Close()
GetMetric() *VBucketDiscoveryMetric
}
type vBucketDiscovery struct {
membership membership.Membership
vBucketDiscoveryMetric *VBucketDiscoveryMetric
vBucketNumber int
}
type VBucketDiscoveryMetric struct {
Type string
TotalMembers int
MemberNumber int
VBucketCount int
VBucketRangeStart uint16
VBucketRangeEnd uint16
}
func (s *vBucketDiscovery) Get() []uint16 {
vBuckets := make([]uint16, 0, s.vBucketNumber)
for i := 0; i < s.vBucketNumber; i++ {
vBuckets = append(vBuckets, uint16(i))
}
receivedInfo := s.membership.GetInfo()
readyToStreamVBuckets := helpers.ChunkSlice[uint16](vBuckets, receivedInfo.TotalMembers)[receivedInfo.MemberNumber-1]
start := readyToStreamVBuckets[0]
end := readyToStreamVBuckets[len(readyToStreamVBuckets)-1]
logger.Log.Info(
"member: %v/%v, vbucket range: %v-%v",
receivedInfo.MemberNumber, receivedInfo.TotalMembers,
start, end,
)
s.vBucketDiscoveryMetric.TotalMembers = receivedInfo.TotalMembers
s.vBucketDiscoveryMetric.MemberNumber = receivedInfo.MemberNumber
s.vBucketDiscoveryMetric.VBucketRangeStart = start
s.vBucketDiscoveryMetric.VBucketRangeEnd = end
return readyToStreamVBuckets
}
func (s *vBucketDiscovery) Close() {
s.membership.Close()
logger.Log.Debug("vbucket discovery closed")
}
func (s *vBucketDiscovery) GetMetric() *VBucketDiscoveryMetric {
return s.vBucketDiscoveryMetric
}
func NewVBucketDiscovery(client couchbase.Client,
config *config.Dcp,
vBucketNumber int,
bus EventBus.Bus,
) VBucketDiscovery {
var ms membership.Membership
switch {
case config.Dcp.Group.Membership.Type == membership.StaticMembershipType:
ms = membership.NewStaticMembership(config)
case config.Dcp.Group.Membership.Type == membership.CouchbaseMembershipType:
ms = couchbase.NewCBMembership(config, client, bus)
case config.Dcp.Group.Membership.Type == membership.KubernetesStatefulSetMembershipType:
ms = kubernetes.NewStatefulSetMembership(config)
case config.Dcp.Group.Membership.Type == membership.KubernetesHaMembershipType:
ms = kubernetes.NewHaMembership(config, bus)
default:
err := errors.New("unknown membership")
logger.Log.Error("error while try to use membership: %s, err: %v", config.Dcp.Group.Membership.Type, err)
panic(err)
}
logger.Log.Debug("vbucket discovery opened with membership type: %s", config.Dcp.Group.Membership.Type)
return &vBucketDiscovery{
vBucketNumber: vBucketNumber,
membership: ms,
vBucketDiscoveryMetric: &VBucketDiscoveryMetric{
VBucketCount: vBucketNumber,
Type: config.Dcp.Group.Membership.Type,
},
}
}