-
Notifications
You must be signed in to change notification settings - Fork 19
/
routeconfig.go
191 lines (174 loc) · 5.29 KB
/
routeconfig.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package gocbcore
import (
"fmt"
"strings"
)
type routeConfig struct {
revId int64
uuid string
bktType bucketType
kvServerList []string
capiEpList []string
mgmtEpList []string
n1qlEpList []string
ftsEpList []string
cbasEpList []string
vbMap *vbucketMap
ketamaMap *ketamaContinuum
}
func (config *routeConfig) IsValid() bool {
if len(config.kvServerList) == 0 || len(config.mgmtEpList) == 0 {
return false
}
switch config.bktType {
case bktTypeCouchbase:
return config.vbMap != nil && config.vbMap.IsValid()
case bktTypeMemcached:
return config.ketamaMap != nil && config.ketamaMap.IsValid()
default:
return false
}
}
func buildRouteConfig(bk *cfgBucket, useSsl bool, networkType string, firstConnect bool) *routeConfig {
var kvServerList []string
var capiEpList []string
var mgmtEpList []string
var n1qlEpList []string
var ftsEpList []string
var cbasEpList []string
var bktType bucketType
switch bk.NodeLocator {
case "ketama":
bktType = bktTypeMemcached
case "vbucket":
bktType = bktTypeCouchbase
default:
logDebugf("Invalid nodeLocator %s", bk.NodeLocator)
bktType = bktTypeInvalid
}
if bk.NodesExt != nil {
lenNodes := len(bk.Nodes)
for i, node := range bk.NodesExt {
hostname := node.Hostname
ports := node.Services
if networkType != "default" {
if altAddr, ok := node.AltAddresses[networkType]; ok {
hostname = altAddr.Hostname
if altAddr.Ports != nil {
ports = *altAddr.Ports
}
} else {
if !firstConnect {
logDebugf("Invalid config network type %s", networkType)
}
continue
}
}
// Hostname blank means to use the same one as was connected to
if hostname == "" {
// Note that the SourceHostname will already be IPv6 wrapped
hostname = bk.SourceHostname
} else {
// We need to detect an IPv6 address here and wrap it in the appropriate
// [] block to indicate its IPv6 for the rest of the system.
if strings.Contains(hostname, ":") {
hostname = "[" + hostname + "]"
}
}
if !useSsl {
if ports.Kv > 0 {
if i >= lenNodes {
logDebugf("KV node present in nodesext but not in nodes for %s:%d", hostname, ports.Kv)
} else {
kvServerList = append(kvServerList, fmt.Sprintf("%s:%d", hostname, ports.Kv))
}
}
if ports.Capi > 0 {
capiEpList = append(capiEpList, fmt.Sprintf("http://%s:%d/%s", hostname, ports.Capi, bk.Name))
}
if ports.Mgmt > 0 {
mgmtEpList = append(mgmtEpList, fmt.Sprintf("http://%s:%d", hostname, ports.Mgmt))
}
if ports.N1ql > 0 {
n1qlEpList = append(n1qlEpList, fmt.Sprintf("http://%s:%d", hostname, ports.N1ql))
}
if ports.Fts > 0 {
ftsEpList = append(ftsEpList, fmt.Sprintf("http://%s:%d", hostname, ports.Fts))
}
if ports.Cbas > 0 {
cbasEpList = append(cbasEpList, fmt.Sprintf("http://%s:%d", hostname, ports.Cbas))
}
} else {
if ports.KvSsl > 0 {
if i >= lenNodes {
logDebugf("KV node present in nodesext but not in nodes for %s:%d", hostname, ports.KvSsl)
} else {
kvServerList = append(kvServerList, fmt.Sprintf("%s:%d", hostname, ports.KvSsl))
}
}
if ports.CapiSsl > 0 {
capiEpList = append(capiEpList, fmt.Sprintf("https://%s:%d/%s", hostname, ports.CapiSsl, bk.Name))
}
if ports.MgmtSsl > 0 {
mgmtEpList = append(mgmtEpList, fmt.Sprintf("https://%s:%d", hostname, ports.MgmtSsl))
}
if ports.N1qlSsl > 0 {
n1qlEpList = append(n1qlEpList, fmt.Sprintf("https://%s:%d", hostname, ports.N1qlSsl))
}
if ports.FtsSsl > 0 {
ftsEpList = append(ftsEpList, fmt.Sprintf("https://%s:%d", hostname, ports.FtsSsl))
}
if ports.CbasSsl > 0 {
cbasEpList = append(cbasEpList, fmt.Sprintf("https://%s:%d", hostname, ports.CbasSsl))
}
}
}
} else {
if useSsl {
logErrorf("Received config without nodesExt while SSL is enabled. Generating invalid config.")
return &routeConfig{}
}
if bktType == bktTypeCouchbase {
kvServerList = bk.VBucketServerMap.ServerList
}
for _, node := range bk.Nodes {
if node.CouchAPIBase != "" {
// Slice off the UUID as Go's HTTP client cannot handle being passed URL-Encoded path values.
capiEp := strings.SplitN(node.CouchAPIBase, "%2B", 2)[0]
capiEpList = append(capiEpList, capiEp)
}
if node.Hostname != "" {
mgmtEpList = append(mgmtEpList, fmt.Sprintf("http://%s", node.Hostname))
}
if bktType == bktTypeMemcached {
// Get the data port. No VBucketServerMap.
host, err := hostFromHostPort(node.Hostname)
if err != nil {
logErrorf("Encountered invalid memcached host/port string. Ignoring node.")
continue
}
curKvHost := fmt.Sprintf("%s:%d", host, node.Ports["direct"])
kvServerList = append(kvServerList, curKvHost)
}
}
}
rc := &routeConfig{
revId: bk.Rev,
uuid: bk.UUID,
kvServerList: kvServerList,
capiEpList: capiEpList,
mgmtEpList: mgmtEpList,
n1qlEpList: n1qlEpList,
ftsEpList: ftsEpList,
cbasEpList: cbasEpList,
bktType: bktType,
}
if bktType == bktTypeCouchbase {
vbMap := bk.VBucketServerMap.VBucketMap
numReplicas := bk.VBucketServerMap.NumReplicas
rc.vbMap = newVbucketMap(vbMap, numReplicas)
} else if bktType == bktTypeMemcached {
rc.ketamaMap = newKetamaContinuum(kvServerList)
}
return rc
}