/
optimizing.go
639 lines (571 loc) 路 16 KB
/
optimizing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
package client
import (
"context"
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/drand/drand/chain"
"github.com/drand/drand/log"
)
const (
defaultRequestTimeout = time.Second * 5
defaultSpeedTestInterval = time.Minute * 5
// defaultRequestConcurrency controls both how many clients are raced
// when `Get` is called for on-demand results, and also how many watch
// clients are spun up (in addition to clients marked as passive) to
// provide results to `Watch` requests.
defaultRequestConcurrency = 1
// defaultWatchRetryInterval is the time after which a closed watch channel
// is re-open when no context error occurred.
defaultWatchRetryInterval = time.Second * 30
defaultChannelBuffer = 5
maxUnixTime = 1<<63 - 62135596801
maxNanoSec = 999999999
)
type optimizingClient struct {
sync.RWMutex
clients []Client
passiveClients []Client
stats []*requestStat
requestTimeout time.Duration
requestConcurrency int
speedTestInterval time.Duration
watchRetryInterval time.Duration
log log.Logger
done chan struct{}
}
// newOptimizingClient creates a drand client that measures the speed of clients
// and uses the fastest ones.
//
// Clients passed to the optimizing client are ordered by speed and calls to
// `Get` race the 2 fastest clients (by default) for the result. If a client
// errors then it is moved to the back of the list.
//
// A speed test is performed periodically in the background every 5 minutes (by
// default) to ensure we're still using the fastest clients. A negative speed
// test interval will disable testing.
//
// Calls to `Get` actually iterate over the speed-ordered client list with a
// concurrency of 2 (by default) until a result is retrieved. It means that the
// optimizing client will fallback to using the other slower clients in the
// event of failure(s).
//
// Additionally, calls to Get are given a timeout of 5 seconds (by default) to
// ensure no unbounded blocking occurs.
func newOptimizingClient(
clients []Client,
requestTimeout time.Duration,
requestConcurrency int,
speedTestInterval,
watchRetryInterval time.Duration,
) (*optimizingClient, error) {
if len(clients) == 0 {
return nil, errors.New("missing clients")
}
stats := make([]*requestStat, len(clients))
now := time.Now()
for i, c := range clients {
stats[i] = &requestStat{client: c, rtt: 0, startTime: now}
}
done := make(chan struct{})
if requestTimeout <= 0 {
requestTimeout = defaultRequestTimeout
}
if requestConcurrency <= 0 {
requestConcurrency = defaultRequestConcurrency
}
if speedTestInterval == 0 {
speedTestInterval = defaultSpeedTestInterval
}
if watchRetryInterval == 0 {
watchRetryInterval = defaultWatchRetryInterval
}
oc := &optimizingClient{
clients: clients,
stats: stats,
requestTimeout: requestTimeout,
requestConcurrency: requestConcurrency,
speedTestInterval: speedTestInterval,
watchRetryInterval: watchRetryInterval,
log: log.DefaultLogger(),
done: done,
}
return oc, nil
}
// Start starts the background speed measurements of the optimizing client.Start
// SetLog should not be called after Start.
func (oc *optimizingClient) Start() {
if oc.speedTestInterval > 0 {
go oc.testSpeed()
}
}
// MarkPassive tags a client as 'passive' - a generalization of the libp2p style gossip client.
// These clients will not participate in the speed test horse race, and will be protected from
// being stopped by the optimized watcher.
// Note: if a client marked as passive closes its results channel from a `watch` call, the
// optimizing client will not re-open it, as would be attempted with non-passive clients.
// MarkPassive must tag clients as passive before `Start` is run.
func (oc *optimizingClient) MarkPassive(c Client) {
oc.passiveClients = append(oc.passiveClients, c)
// push passive clients to the back of the list for `Get`s
for _, s := range oc.stats {
if s.client == c {
s.rtt = math.MaxInt64
s.startTime = time.Unix(maxUnixTime, maxNanoSec)
}
}
}
// String returns the name of this client.
func (oc *optimizingClient) String() string {
names := make([]string, len(oc.clients))
for i, c := range oc.clients {
names[i] = fmt.Sprint(c)
}
return fmt.Sprintf("OptimizingClient(%s)", strings.Join(names, ", "))
}
type requestStat struct {
// client is the client used to make the request.
client Client
// rtt is the time it took to make the request.
rtt time.Duration
// startTime is the time at which the request was started.
startTime time.Time
}
type requestResult struct {
// client is the client used to make the request.
client Client
// result is the return value from the call to Get.
result Result
// err is the error that occurred from a call to Get (not including context error).
err error
// stat is stats from the call to Get.
stat *requestStat
}
// markedPassive checks if a client should be treated as passive
func (oc *optimizingClient) markedPassive(c Client) bool {
for _, p := range oc.passiveClients {
if p == c {
return true
}
}
return false
}
func (oc *optimizingClient) testSpeed() {
clients := make([]Client, 0, len(oc.clients))
for _, c := range oc.clients {
if !oc.markedPassive(c) {
clients = append(clients, c)
}
}
for {
var stats []*requestStat
ctx, cancel := context.WithCancel(context.Background())
ch := parallelGet(ctx, clients, 1, oc.requestTimeout, oc.requestConcurrency)
LOOP:
for {
select {
case rr, ok := <-ch:
if !ok {
cancel()
break LOOP
}
if rr.err != nil {
oc.log.Infow("", "optimizing_client", "endpoint down when speed tested", "client", fmt.Sprintf("%s", rr.client), "err", rr.err)
}
stats = append(stats, rr.stat)
case <-oc.done:
cancel()
return
}
}
oc.updateStats(stats)
t := time.NewTimer(oc.speedTestInterval)
select {
case <-t.C:
case <-oc.done:
t.Stop()
return
}
}
}
// SetLog configures the client log output.
func (oc *optimizingClient) SetLog(l log.Logger) {
oc.log = l
}
// fastestClients returns a ordered slice of clients - fastest first.
func (oc *optimizingClient) fastestClients() []Client {
oc.RLock()
defer oc.RUnlock()
// copy the current ordered client list so we iterate over a stable slice
clients := make([]Client, 0, len(oc.stats))
for _, s := range oc.stats {
clients = append(clients, s.client)
}
return clients
}
// Get returns the randomness at `round` or an error.
func (oc *optimizingClient) Get(ctx context.Context, round uint64) (res Result, err error) {
clients := oc.fastestClients()
var stats []*requestStat
ch := raceGet(ctx, clients, round, oc.requestTimeout, oc.requestConcurrency)
err = errors.New("no valid clients")
LOOP:
for {
select {
case rr, ok := <-ch:
if !ok {
break LOOP
}
stats = append(stats, rr.stat)
res = rr.result
if rr.err != nil && !errors.Is(rr.err, errEmptyClientUnsupportedGet) {
//nolint:errorlint
err = fmt.Errorf("%v - %w", err, rr.err)
} else if rr.err == nil {
err = nil
}
case <-ctx.Done():
oc.updateStats(stats)
return nil, ctx.Err()
case <-oc.done:
oc.updateStats(stats)
return nil, errors.New("client closed")
}
}
oc.updateStats(stats)
return res, err
}
// get calls Get on the passed client and returns a requestResult or nil if the context was canceled.
func get(ctx context.Context, client Client, round uint64) *requestResult {
start := time.Now()
res, err := client.Get(ctx, round)
rtt := time.Since(start)
var stat requestStat
// client failure, set a large RTT so it is sent to the back of the list
if err != nil && !errors.Is(err, ctx.Err()) {
stat = requestStat{client, math.MaxInt64, start}
return &requestResult{client, res, err, &stat}
}
if ctx.Err() != nil {
return nil
}
stat = requestStat{client, rtt, start}
return &requestResult{client, res, err, &stat}
}
func raceGet(ctx context.Context, clients []Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult {
results := make(chan *requestResult, len(clients))
go func() {
rctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(results)
ch := parallelGet(rctx, clients, round, timeout, concurrency)
for {
select {
case rr, ok := <-ch:
if !ok {
return
}
results <- rr
if rr.err == nil { // race is won
return
}
case <-rctx.Done():
return
}
}
}()
return results
}
func parallelGet(ctx context.Context, clients []Client, round uint64, timeout time.Duration, concurrency int) <-chan *requestResult {
results := make(chan *requestResult, len(clients))
token := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
token <- struct{}{}
}
go func() {
wg := sync.WaitGroup{}
LOOP:
for _, c := range clients {
c := c
select {
case <-token:
wg.Add(1)
go func(c Client) {
gctx, cancel := context.WithTimeout(ctx, timeout)
rr := get(gctx, c, round)
cancel()
if rr != nil {
results <- rr
}
token <- struct{}{}
wg.Done()
}(c)
case <-ctx.Done():
break LOOP
}
}
wg.Wait()
close(results)
}()
return results
}
func (oc *optimizingClient) updateStats(stats []*requestStat) {
oc.Lock()
defer oc.Unlock()
// update the round trip times with new samples
for _, next := range stats {
for _, curr := range oc.stats {
if curr.client == next.client {
if curr.startTime.Before(next.startTime) {
curr.rtt = next.rtt
curr.startTime = next.startTime
}
break
}
}
}
// sort by fastest
sort.Slice(oc.stats, func(i, j int) bool {
return oc.stats[i].rtt < oc.stats[j].rtt
})
}
type watchResult struct {
Result
Client
}
func (oc *optimizingClient) trackWatchResults(info *chain.Info, in chan watchResult, out chan Result) {
defer close(out)
latest := uint64(0)
for r := range in {
round := r.Result.Round()
timeOfRound := time.Unix(chain.TimeOfRound(info.Period, info.GenesisTime, round), 0)
stat := requestStat{
client: r.Client,
rtt: time.Since(timeOfRound),
startTime: timeOfRound,
}
oc.updateStats([]*requestStat{&stat})
if round > latest {
latest = round
out <- r.Result
}
}
}
// Watch returns new randomness as it becomes available.
func (oc *optimizingClient) Watch(ctx context.Context) <-chan Result {
outChan := make(chan Result, defaultChannelBuffer)
inChan := make(chan watchResult, defaultChannelBuffer)
info, err := oc.Info(ctx)
if err != nil {
oc.log.Errorw("", "optimizing_client", "failed to learn info", "err", err)
close(outChan)
return outChan
}
state := watchState{
ctx: ctx,
optimizer: oc,
active: make([]watchingClient, 0),
protected: make([]watchingClient, 0),
failed: make([]failedClient, 0),
retryInterval: oc.watchRetryInterval,
}
closingClients := make(chan Client, 1)
for _, c := range oc.passiveClients {
c := c
go state.watchNext(ctx, c, inChan, closingClients)
state.protected = append(state.protected, watchingClient{c, nil})
}
go state.dispatchWatchingClients(inChan, closingClients)
go oc.trackWatchResults(info, inChan, outChan)
return outChan
}
type watchingClient struct {
Client
context.CancelFunc
}
type failedClient struct {
Client
backoffUntil time.Time
}
type watchState struct {
ctx context.Context
optimizer *optimizingClient
active []watchingClient
protected []watchingClient
failed []failedClient
retryInterval time.Duration
}
func (ws *watchState) dispatchWatchingClients(resultChan chan watchResult, closingClients chan Client) {
defer close(resultChan)
// spin up initial watcher(s)
ws.tryRepopulate(resultChan, closingClients)
ticker := time.NewTicker(ws.optimizer.watchRetryInterval)
defer ticker.Stop()
for {
select {
case c := <-closingClients:
// replace failed watchers
ws.done(c)
if ws.ctx.Err() == nil {
ws.tryRepopulate(resultChan, closingClients)
}
if len(ws.active) == 0 && len(ws.protected) == 0 {
return
}
case <-ticker.C:
// periodically cycle to fastest client.
clients := ws.optimizer.fastestClients()
if len(clients) == 0 {
continue
}
fastest := clients[0]
if ws.hasActive(fastest) == -1 && ws.hasProtected(fastest) == -1 {
ws.closeSlowest()
ws.tryRepopulate(resultChan, closingClients)
}
case <-ws.ctx.Done():
// trigger client close. Will return once len(ws.active) == 0
for _, c := range ws.active {
c.CancelFunc()
}
}
}
}
func (ws *watchState) tryRepopulate(results chan watchResult, done chan Client) {
ws.clean()
for {
if len(ws.active) >= ws.optimizer.requestConcurrency {
return
}
c := ws.nextUnwatched()
if c == nil {
return
}
cctx, cancel := context.WithCancel(ws.ctx)
ws.active = append(ws.active, watchingClient{c, cancel})
ws.optimizer.log.Infow("", "optimizing_client", "watching on client", "client", fmt.Sprintf("%s", c))
go ws.watchNext(cctx, c, results, done)
}
}
func (ws *watchState) watchNext(ctx context.Context, c Client, out chan watchResult, done chan Client) {
defer func() { done <- c }()
resultStream := c.Watch(ctx)
for r := range resultStream {
out <- watchResult{r, c}
}
ws.optimizer.log.Infow("", "optimizing_client", "watch ended", "client", fmt.Sprintf("%s", c))
}
func (ws *watchState) clean() {
nf := make([]failedClient, 0, len(ws.failed))
for _, f := range ws.failed {
if f.backoffUntil.After(time.Now()) {
nf = append(nf, f)
}
}
ws.failed = nf
}
func (ws *watchState) close(clientIdx int) {
ws.active[clientIdx].CancelFunc()
ws.active[clientIdx] = ws.active[len(ws.active)-1]
ws.active[len(ws.active)-1] = watchingClient{}
ws.active = ws.active[:len(ws.active)-1]
}
func (ws *watchState) done(c Client) {
idx := ws.hasActive(c)
if idx > -1 {
ws.close(idx)
ws.failed = append(ws.failed, failedClient{c, time.Now().Add(ws.retryInterval)})
} else if i := ws.hasProtected(c); i > -1 {
ws.protected[i] = ws.protected[len(ws.protected)-1]
ws.protected = ws.protected[:len(ws.protected)-1]
return
}
// note: it's expected that the client may already not be active.
// this happens when the optimizing client has closed it via `closeSlowest`
}
func (ws *watchState) hasActive(c Client) int {
for i, a := range ws.active {
if a.Client == c {
return i
}
}
return -1
}
func (ws *watchState) hasProtected(c Client) int {
for i, p := range ws.protected {
if p.Client == c {
return i
}
}
return -1
}
func (ws *watchState) closeSlowest() {
if len(ws.active) == 0 {
return
}
order := ws.optimizer.fastestClients()
idxs := make([]int, 0)
for _, c := range order {
if i := ws.hasActive(c); i > -1 {
idxs = append(idxs, i)
}
}
ws.close(idxs[len(idxs)-1])
}
func (ws *watchState) nextUnwatched() Client {
clients := ws.optimizer.fastestClients()
CLIENT_LOOP:
for _, c := range clients {
for _, a := range ws.active {
if c == a.Client {
continue CLIENT_LOOP
}
}
for _, f := range ws.failed {
if c == f.Client {
continue CLIENT_LOOP
}
}
for _, p := range ws.protected {
if c == p.Client {
continue CLIENT_LOOP
}
}
return c
}
return nil
}
// Info returns the parameters of the chain this client is connected to.
// The public key, when it started, and how frequently it updates.
func (oc *optimizingClient) Info(ctx context.Context) (chainInfo *chain.Info, err error) {
clients := oc.fastestClients()
for _, c := range clients {
ctx, cancel := context.WithTimeout(ctx, oc.requestTimeout)
chainInfo, err = c.Info(ctx)
cancel()
if err == nil {
break
}
}
return
}
// RoundAt will return the most recent round of randomness that will be available
// at time for the current client.
func (oc *optimizingClient) RoundAt(t time.Time) uint64 {
return oc.clients[0].RoundAt(t)
}
// Close stops the background speed tests and closes the client and it's
// underlying clients for further use.
func (oc *optimizingClient) Close() error {
var errs *multierror.Error
for _, c := range oc.clients {
errs = multierror.Append(errs, c.Close())
}
close(oc.done)
return errs.ErrorOrNil()
}