-
Notifications
You must be signed in to change notification settings - Fork 19
/
distributor.go
458 lines (416 loc) · 13.3 KB
/
distributor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
/* This file is part of VoltDB.
* Copyright (C) 2008-2022 Volt Active Data Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with VoltDB. If not, see <http://www.gnu.org/licenses/>.
*/
package voltdbclient
import (
"crypto/tls"
"database/sql/driver"
"errors"
"fmt"
"io/ioutil"
"log"
"math/rand"
"strings"
"sync/atomic"
"time"
)
const (
// DefaultQueryTimeout time out for queries.
DefaultQueryTimeout time.Duration = 2 * time.Minute
DefaultConnectionTimeout time.Duration = 1 * time.Minute
)
var handle int64
var sHandle int64 = -1
var ErrMissingServerArgument = errors.New("voltdbclient: missing voltdb connection string")
// ProtocolVersion lists the version of the voltdb wire protocol to use.
// For VoltDB releases of version 5.2 and later use version 1. For releases
// prior to that use version 0.
var ProtocolVersion = 1
// Conn holds the set of currently active connections.
type Conn struct {
pemPath string
tlsConfig *tls.Config
closeCh chan chan bool
open atomic.Value
rl rateLimiter
drainCh chan chan bool
useClientAffinity bool
sendReadsToReplicasBytDefaultIfCAEnabled bool
subscribedConnection *nodeConn
connected []*nodeConn
hasTopoStats bool
subTopoCh <-chan voltResponse
topoStatsCh <-chan voltResponse
prInfoCh <-chan voltResponse
fetchedCatalog bool
hnator hashinator
partitionReplicas *map[int][]*nodeConn
procedureInfos *map[string]procedure
partitionMasters map[int]*nodeConn
}
func newTLSConn(cis []string, clientConfig ClientConfig) (*Conn, error) {
var c = &Conn{
pemPath: clientConfig.PEMPath,
tlsConfig: clientConfig.TLSConfig,
closeCh: make(chan chan bool),
rl: newTxnLimiter(),
drainCh: make(chan chan bool),
useClientAffinity: true,
partitionMasters: make(map[int]*nodeConn),
}
c.open.Store(true)
if err := c.startWithTimeout(cis, clientConfig.InsecureSkipVerify, clientConfig.ConnectTimeout); err != nil {
return nil, err
}
return c, nil
}
func newConn(cis []string, duration time.Duration) (*Conn, error) {
var c = &Conn{
closeCh: make(chan chan bool),
rl: newTxnLimiter(),
drainCh: make(chan chan bool),
useClientAffinity: true,
partitionMasters: make(map[int]*nodeConn),
}
c.open.Store(true)
if err := c.startWithTimeout(cis, false, duration); err != nil {
return nil, err
}
return c, nil
}
// OpenConn returns a new connection to the VoltDB server. The name is a string
// in a driver-specific format. The returned connection can be used by only one
// goroutine at a time.
//
// By default voltdb doesn't require authentication,
// clients connecting to un secured database have access to everything.
// Supplying connection credentials doesn't affect for non secured databases
//
// Here we authenticate if username and password are supplied, if they are not
// then a connection is established without doing the authentication
//
// Connection string is similar to postgres, default port is 21212
//
// voltdb://
// voltdb://localhost
// voltdb://localhost:21212
// voltdb://user@localhost
// voltdb://user:secret@localhost
// voltdb://other@localhost?some_param=some_value
//
// You can omit the port,and the default port of 21212 will be automatically
// added for you.
//
// Additionally you can fine tune behavior of connections when in cluster mode
// using query parameters.
//
// Example localhost:21212?max_retries=10&retry=true&retry_interval=1s
//
// retry - if true will try to reconnect with the node when the connection is
// lost.
//
// max_retries - in the number of times you want to retry to connect to a node.
// This has no effect when retry is false.
//
// retry_interval is the duration of time to wait until the next retry.
func OpenConnWithTimeout(ci string, duration time.Duration) (*Conn, error) {
ci = strings.TrimSpace(ci)
if ci == "" {
return nil, ErrMissingServerArgument
}
cis := strings.Split(ci, ",")
return newConn(cis, duration)
}
func OpenConn(ci string) (*Conn, error) {
return OpenConnWithTimeout(ci, DefaultConnectionTimeout)
}
// OpenTLSConn uses TLS for network connections
func OpenTLSConn(ci string, clientConfig ClientConfig) (*Conn, error) {
ci = strings.TrimSpace(ci)
if ci == "" {
return nil, ErrMissingServerArgument
}
cis := strings.Split(ci, ",")
if clientConfig.TLSConfig == nil {
clientConfig.TLSConfig = &tls.Config{
InsecureSkipVerify: clientConfig.InsecureSkipVerify,
}
}
return newTLSConn(cis, clientConfig)
}
type ClientConfig struct {
PEMPath string
TLSConfig *tls.Config
InsecureSkipVerify bool
ConnectTimeout time.Duration
}
// OpenConnWithLatencyTarget returns a new connection to the VoltDB server.
// This connection will try to meet the specified latency target, potentially by
// throttling the rate at which asynchronous transactions are submitted.
func OpenConnWithLatencyTarget(ci string, latencyTarget int32) (*Conn, error) {
ci = strings.TrimSpace(ci)
if ci == "" {
return nil, ErrMissingServerArgument
}
cis := strings.Split(ci, ",")
c, err := newConn(cis, DefaultConnectionTimeout)
if err != nil {
return nil, err
}
c.rl = newLatencyLimiter(latencyTarget)
return c, nil
}
// OpenConnWithMaxOutstandingTxns returns a new connection to the VoltDB server.
// This connection will limit the number of outstanding transactions as
// indicated. An outstanding transaction is a transaction that has been sent to
// the server but for which no response has been received.
func OpenConnWithMaxOutstandingTxns(ci string, maxOutTxns int) (*Conn, error) {
ci = strings.TrimSpace(ci)
if ci == "" {
return nil, ErrMissingServerArgument
}
cis := strings.Split(ci, ",")
c, err := newConn(cis, DefaultConnectionTimeout)
if err != nil {
return nil, err
}
c.rl = newTxnLimiterWithMaxOutTxns(maxOutTxns)
return c, nil
}
func (c *Conn) startWithTimeout(cis []string, insecureSkipVerify bool, duration time.Duration) error {
var (
err error
disconnected []*nodeConn
hostIDToConnection = make(map[int]*nodeConn)
)
for _, ci := range cis {
var nc *nodeConn
if len(c.pemPath) > 0 || c.tlsConfig != nil {
if len(c.pemPath) > 0 {
PEMBytes, err := ioutil.ReadFile(c.pemPath)
if err != nil {
return err
}
nc = newNodeTLSConn(ci, insecureSkipVerify, c.tlsConfig, PEMBytes, duration)
} else {
nc = newNodeTLSConn(ci, insecureSkipVerify, c.tlsConfig, nil, duration)
}
} else {
nc = newNodeConnWithTimeout(ci, duration)
}
if err = nc.connect(ProtocolVersion); err != nil {
disconnected = append(disconnected, nc)
continue
}
c.connected = append(c.connected, nc)
if c.useClientAffinity {
hostIDToConnection[int(nc.connData.HostID)] = nc
}
}
if len(c.connected) == 0 {
return fmt.Errorf("No valid connections %v", err)
}
go c.loop(disconnected, &hostIDToConnection)
return nil
}
//Returns a node connection that is not closed.
func (c *Conn) getConn() *nodeConn {
if len(c.connected) == 1 {
return c.connected[0]
}
size := len(c.connected)
idx := rand.Intn(size)
nc := c.connected[idx]
if nc.isClosed() {
for {
n := rand.Intn(size)
nc = c.connected[n]
if !nc.isClosed() {
return nc
}
}
}
return nc
}
func (c *Conn) availableConn() *nodeConn {
nc := c.getConn()
c.subscribedConnection = nc
if c.useClientAffinity && c.subscribedConnection == nil && len(c.connected) > 0 {
c.subTopoCh = c.subscribeTopo(nc)
}
if c.useClientAffinity && !c.hasTopoStats && len(c.connected) > 0 {
c.topoStatsCh = c.getTopoStatistics(nc)
c.hasTopoStats = true
}
if c.useClientAffinity && !c.fetchedCatalog && len(c.connected) > 0 {
c.prInfoCh = c.getProcedureInfo(nc)
c.fetchedCatalog = true
}
return c.subscribedConnection
}
func (c *Conn) loop(disconnected []*nodeConn, hostIDToConnection *map[int]*nodeConn) {
// TODO: resubsribe when we lose the subscribed connection
for {
select {
case closeRespCh := <-c.closeCh:
if len(c.connected) == 0 {
closeRespCh <- true
} else {
// We make sure all node connections managed by this Conn object are closed.
// This will block, it is okay though since we are closing the connection
// which means we don't want anything else to be happening.
for _, connectedNc := range c.connected {
<-connectedNc.close()
}
closeRespCh <- true
return
}
case topoResp := <-c.subTopoCh:
switch topoResp.(type) {
// handle an error, otherwise the subscribe succeeded.
case VoltError:
if ResponseStatus(topoResp.GetStatus()) == ConnectionLost {
// TODO: handle this. Move the connection out of connected, try again.
// TODO: try to reconnect to the host in a separate go routine.
// TODO: subscribe to topo a second time
}
c.subscribedConnection = nil
default:
c.subTopoCh = nil
}
case topoStatsResp := <-c.topoStatsCh:
switch topoStatsResp.(type) {
case VoltRows:
tmpHnator, tmpPartitionReplicas, err := c.updateAffinityTopology(topoStatsResp.(VoltRows))
if err == nil {
c.hnator = tmpHnator
c.partitionReplicas = tmpPartitionReplicas
c.topoStatsCh = nil
} else {
if err.Error() != errLegacyHashinator.Error() {
c.hasTopoStats = false
}
}
default:
c.hasTopoStats = false
}
case prInfoResp := <-c.prInfoCh:
switch prInfoResp.(type) {
case VoltRows:
tmpProcedureInfos, err := c.updateProcedurePartitioning(prInfoResp.(VoltRows))
if err == nil {
c.procedureInfos = tmpProcedureInfos
c.prInfoCh = nil
} else {
c.fetchedCatalog = false
}
default:
c.fetchedCatalog = false
}
case drainRespCh := <-c.drainCh:
if len(c.connected) == 0 {
drainRespCh <- true
} else {
for _, connectedNc := range c.connected {
responseCh := make(chan bool, 1)
connectedNc.drain(responseCh)
<-responseCh
}
drainRespCh <- true
}
}
}
// have the set of ncs.
// I have some data structures that go with client affinity.
// each time through the loop.
// look for new pis, assign to some nc
// for reconnectings nc's, see if reconnected.
// check error channel to see if any lost connections.
}
func (c *Conn) submit(pi *procedureInvocation) (int, error) {
nc := c.availableConn()
// var nc *nodeConn
// var backpressure = true
// var err error
// if c.useClientAffinity && c.hnator != nil && c.partitionReplicas != nil && c.procedureInfos != nil {
// nc, backpressure, err =
// c.getConnByCA(c.connected, c.hnator, &c.partitionMasters, c.partitionReplicas, c.procedureInfos, pi)
// }
// if err != nil && !backpressure && nc != nil {
// // nc.submit(pi)
// } else {
// // c.allNcsPiCh <- pi
// }
return nc.submit(pi)
}
// Begin starts a transaction.
func (c *Conn) Begin() (driver.Tx, error) {
return nil, nil
}
// Close closes the connection to the VoltDB server. Connections to the server
// are meant to be long lived; it should not be necessary to continually close
// and reopen connections. Close would typically be called using a defer.
// Operations using a closed connection cause a panic.
func (c *Conn) Close() error {
respCh := make(chan bool)
c.closeCh <- respCh
<-respCh
return nil
}
// Drain blocks until all outstanding asynchronous requests have been satisfied.
// Asynchronous requests are processed in a background thread; this call blocks
// the current thread until that background thread has finished with all
// asynchronous requests.
func (c *Conn) Drain() {
drainRespCh := make(chan bool, 1)
c.drainCh <- drainRespCh
<-drainRespCh
}
func (c *Conn) assertOpen() {
if !(c.open.Load().(bool)) {
panic("Tried to use closed connection pool")
}
}
func (c *Conn) isClosed() bool {
return !(c.open.Load().(bool))
}
func (c *Conn) setClosed() {
c.open.Store(false)
}
func (c *Conn) getNextHandle() int64 {
return atomic.AddInt64(&handle, 1)
}
func (c *Conn) getNextSystemHandle() int64 {
return atomic.AddInt64(&sHandle, -1)
}
type procedure struct {
SinglePartition bool `json:"singlePartition"`
ReadOnly bool `json:"readOnly"`
PartitionParameter int `json:"partitionParameter"`
PartitionParameterType int `json:"partitionParameterType"`
}
func (proc *procedure) setDefaults() {
const ParameterNone = -1
if !proc.SinglePartition {
proc.PartitionParameter = ParameterNone
proc.PartitionParameterType = ParameterNone
}
}
func panicIfnotNil(str string, err error) {
if err != nil {
log.Panic(str, err)
}
}