/
cluster_config.go
348 lines (320 loc) · 10.3 KB
/
cluster_config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
//
// DISCLAIMER
//
// Copyright 2017-2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package service
import (
"context"
"crypto/tls"
"fmt"
"net"
"strconv"
"strings"
"time"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
driver_http "github.com/arangodb/go-driver/http"
"github.com/arangodb/go-driver/jwt"
"github.com/arangodb-helper/arangodb/pkg/definitions"
"github.com/arangodb-helper/arangodb/service/options"
)
// ClusterConfig contains all the information of a cluster from a starter's point of view.
// When this type (or any of the types used in here) is changed, increase `SetupConfigVersion`.
type ClusterConfig struct {
AllPeers []Peer `json:"Peers"` // All peers
AgencySize int // Number of agents
LastModified *time.Time `json:"LastModified,omitempty"` // Time of last modification
PortOffsetIncrement int `json:"PortOffsetIncrement,omitempty"` // Increment of port offsets for peers on same address
ServerStorageEngine string `json:"ServerStorageEngine,omitempty"` // Storage engine being used
PersistentOptions options.PersistentOptions `json:"PersistentOptions,omitempty"` // Options which were used during first start of DB and can't be changed anymore
}
// PeerByID returns a peer with given id & true, or false if not found.
func (p ClusterConfig) PeerByID(id string) (Peer, bool) {
for _, x := range p.AllPeers {
if x.ID == id {
return x, true
}
}
return Peer{}, false
}
// PeerByAddressAndPort returns a peer with given address, port & true, or false if not found.
func (p ClusterConfig) PeerByAddressAndPort(address string, port int) (Peer, bool) {
address = strings.ToLower(address)
for _, x := range p.AllPeers {
if x.Port+x.PortOffset == port && strings.ToLower(x.Address) == address {
return x, true
}
}
return Peer{}, false
}
// AllAgents returns a list of all peers that have an agent.
func (p ClusterConfig) AllAgents() []Peer {
var result []Peer
for _, x := range p.AllPeers {
if x.HasAgent() {
result = append(result, x)
}
}
return result
}
// Initialize a new cluster configuration
func (p *ClusterConfig) Initialize(initialPeer Peer, agencySize int, storageEngine string, persistentOptions options.PersistentOptions) {
p.AllPeers = []Peer{initialPeer}
p.AgencySize = agencySize
p.PortOffsetIncrement = definitions.PortOffsetIncrementNew
p.ServerStorageEngine = storageEngine
p.PersistentOptions = persistentOptions
p.updateLastModified()
}
// UpdatePeerByID updates the peer with given id & true, or false if not found.
func (p *ClusterConfig) UpdatePeerByID(update Peer) bool {
for index, x := range p.AllPeers {
if x.ID == update.ID {
p.AllPeers[index] = update
p.updateLastModified()
return true
}
}
return false
}
// ForEachPeer updates all peers using predicate
func (p *ClusterConfig) ForEachPeer(updateFunc func(p Peer) Peer) {
for i, peer := range p.AllPeers {
p.AllPeers[i] = updateFunc(peer)
}
}
// AddPeer adds the given peer to the list of all peers, only if the id is not yet one of the peers.
// Returns true of success, false otherwise
func (p *ClusterConfig) AddPeer(newPeer Peer) bool {
for _, x := range p.AllPeers {
if x.ID == newPeer.ID {
return false
}
}
p.AllPeers = append(p.AllPeers, newPeer)
p.updateLastModified()
return true
}
// RemovePeerByID removes the peer with given ID.
func (p *ClusterConfig) RemovePeerByID(id string) bool {
newPeers := make([]Peer, 0, len(p.AllPeers))
found := false
for _, x := range p.AllPeers {
if x.ID != id {
newPeers = append(newPeers, x)
} else {
found = true
}
}
if found {
p.AllPeers = newPeers
p.updateLastModified()
}
return found
}
// IDs returns the IDs of all peers.
func (p ClusterConfig) IDs() []string {
list := make([]string, 0, len(p.AllPeers))
for _, x := range p.AllPeers {
list = append(list, x.ID)
}
return list
}
// GetFreePortOffset returns the first unallocated port offset.
func (p ClusterConfig) GetFreePortOffset(peerAddress string, basePort int, allPortOffsetsUnique bool) int {
portOffset := 0
peerAddress = normalizeHostName(peerAddress)
for {
found := false
for _, peer := range p.AllPeers {
if peer.PortRangeOverlaps(basePort+portOffset, p) {
if allPortOffsetsUnique || normalizeHostName(peer.Address) == peerAddress {
found = true
break
}
}
}
if !found {
return portOffset
}
portOffset = p.NextPortOffset(portOffset)
}
}
// NextPortOffset returns the next port offset (from given offset)
func (p ClusterConfig) NextPortOffset(portOffset int) int {
if p.PortOffsetIncrement == 0 {
return portOffset + definitions.PortOffsetIncrementOld
}
return portOffset + definitions.PortOffsetIncrementNew
}
// HaveEnoughAgents returns true when the number of peers that have an agent
// is greater or equal to AgencySize.
func (p ClusterConfig) HaveEnoughAgents() bool {
count := 0
for _, x := range p.AllPeers {
if x.HasAgent() {
count++
}
}
return count >= p.AgencySize
}
// IsSecure returns true if any of the peers is secure.
func (p ClusterConfig) IsSecure() bool {
for _, x := range p.AllPeers {
if x.IsSecure {
return true
}
}
return false
}
// GetPeerEndpoints creates a list of URL's for all peer.
func (p ClusterConfig) GetPeerEndpoints() ([]string, error) {
// Build endpoint list
var endpoints []string
for _, p := range p.AllPeers {
port := p.Port + p.PortOffset
scheme := NewURLSchemes(p.IsSecure).Browser
ep := fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(p.Address, strconv.Itoa(port)))
endpoints = append(endpoints, ep)
}
return endpoints, nil
}
// GetAgentEndpoints creates a list of URL's for all agents.
func (p ClusterConfig) GetAgentEndpoints() ([]string, error) {
// Build endpoint list
var endpoints []string
for _, p := range p.AllPeers {
if p.HasAgent() {
port := p.Port + p.PortOffset + definitions.ServerType(definitions.ServerTypeAgent).PortOffset()
scheme := NewURLSchemes(p.IsSecure).Browser
ep := fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(p.Address, strconv.Itoa(port)))
endpoints = append(endpoints, ep)
}
}
return endpoints, nil
}
// GetDBServerEndpoints creates a list of URL's for all dbservers.
func (p ClusterConfig) GetDBServerEndpoints() ([]string, error) {
// Build endpoint list
var endpoints []string
for _, p := range p.AllPeers {
if p.HasDBServer() {
port := p.Port + p.PortOffset + definitions.ServerType(definitions.ServerTypeDBServer).PortOffset()
scheme := NewURLSchemes(p.IsSecure).Browser
ep := fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(p.Address, strconv.Itoa(port)))
endpoints = append(endpoints, ep)
}
}
return endpoints, nil
}
// GetCoordinatorEndpoints creates a list of URL's for all coordinators.
func (p ClusterConfig) GetCoordinatorEndpoints() ([]string, error) {
// Build endpoint list
var endpoints []string
for _, p := range p.AllPeers {
if p.HasCoordinator() {
port := p.Port + p.PortOffset + definitions.ServerType(definitions.ServerTypeCoordinator).PortOffset()
scheme := NewURLSchemes(p.IsSecure).Browser
ep := fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(p.Address, strconv.Itoa(port)))
endpoints = append(endpoints, ep)
}
}
return endpoints, nil
}
// GetSingleEndpoints creates a list of URL's for all single servers.
func (p ClusterConfig) GetSingleEndpoints() ([]string, error) {
// Build endpoint list
var endpoints []string
for _, p := range p.AllPeers {
port := p.Port + p.PortOffset + definitions.ServerType(definitions.ServerTypeSingle).PortOffset()
scheme := NewURLSchemes(p.IsSecure).Browser
ep := fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(p.Address, strconv.Itoa(port)))
endpoints = append(endpoints, ep)
}
return endpoints, nil
}
// CreateAgencyAPI creates a client for the agency
func (p ClusterConfig) CreateAgencyAPI(clientBuilder ClientBuilder) (agency.Agency, error) {
// Build endpoint list
endpoints, err := p.GetAgentEndpoints()
if err != nil {
return nil, maskAny(err)
}
c, err := clientBuilder.CreateClient(endpoints, ConnectionTypeAgency, definitions.ServerTypeUnknown)
if err != nil {
return nil, maskAny(err)
}
conn := c.Connection()
a, err := agency.NewAgency(conn)
if err != nil {
return nil, maskAny(err)
}
return a, nil
}
// CreateClusterAPI creates a client for the cluster
func (p ClusterConfig) CreateClusterAPI(ctx context.Context, clientBuilder ClientBuilder) (driver.Cluster, error) {
// Build endpoint list
endpoints, err := p.GetCoordinatorEndpoints()
if err != nil {
return nil, maskAny(err)
}
c, err := clientBuilder.CreateClient(endpoints, ConnectionTypeDatabase, definitions.ServerTypeUnknown)
if err != nil {
return nil, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
return nil, maskAny(err)
}
return cluster, nil
}
// CreateCoordinatorsClient creates go-driver client targeting the coordinators.
func (p ClusterConfig) CreateCoordinatorsClient(jwtSecret string) (driver.Client, error) {
// Build endpoint list
endpoints, err := p.GetCoordinatorEndpoints()
if err != nil {
return nil, maskAny(err)
}
conn, err := driver_http.NewConnection(driver_http.ConnectionConfig{
Endpoints: endpoints,
TLSConfig: &tls.Config{InsecureSkipVerify: true},
})
if err != nil {
return nil, maskAny(err)
}
options := driver.ClientConfig{
Connection: conn,
}
if jwtSecret != "" {
value, err := jwt.CreateArangodJwtAuthorizationHeader(jwtSecret, "starter")
if err != nil {
return nil, maskAny(err)
}
options.Authentication = driver.RawAuthentication(value)
}
c, err := driver.NewClient(options)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}
// Set the LastModified timestamp to now.
func (p *ClusterConfig) updateLastModified() {
ts := time.Now()
p.LastModified = &ts
}