This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 230
/
poller.go
130 lines (103 loc) · 2.56 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Package poller provides the core Poller struct.
package poller
import (
"context"
"sync"
"time"
"github.com/featurebasedb/featurebase/v3/dax"
"github.com/featurebasedb/featurebase/v3/logger"
)
// Poller maintains a list of nodes to poll. It also polls them.
type Poller struct {
mu sync.RWMutex
addressManager dax.AddressManager
nodeService dax.NodeService
nodePoller NodePoller
pollInterval time.Duration
stopping chan struct{}
logger logger.Logger
}
// New returns a new instance of Poller with default values.
func New(cfg Config) *Poller {
p := &Poller{
addressManager: dax.NewNopAddressManager(),
nodeService: dax.NewNopNodeService(),
nodePoller: NewNopNodePoller(),
pollInterval: time.Second,
logger: logger.NopLogger,
}
// Set config options.
if cfg.AddressManager != nil {
p.addressManager = cfg.AddressManager
}
if cfg.NodeService != nil {
p.nodeService = cfg.NodeService
}
if cfg.NodePoller != nil {
p.nodePoller = cfg.NodePoller
}
if cfg.PollInterval != 0 {
p.pollInterval = cfg.PollInterval
}
if cfg.Logger != nil {
p.logger = cfg.Logger
}
return p
}
func (p *Poller) Addresses() []dax.Address {
nodes, err := p.nodeService.Nodes(context.Background())
if err != nil {
p.logger.Errorf("POLLER: unable to get nodes from node service: %v", err)
}
addrs := make([]dax.Address, 0, len(nodes))
for _, node := range nodes {
addrs = append(addrs, node.Address)
}
return addrs
}
// Run starts the polling goroutine.
func (p *Poller) Run() error {
// Set up the stopping channel here in case the controller restarts and runs
// the Poller again.
p.stopping = make(chan struct{})
p.run()
return nil
}
func (p *Poller) run() {
ticker := time.NewTicker(p.pollInterval)
defer ticker.Stop()
for {
// Wait for tick or a close.
select {
case <-p.stopping:
return
case <-ticker.C:
}
p.pollAll()
}
}
// Stop stops the polling routine.
func (p *Poller) Stop() {
close(p.stopping)
}
func (p *Poller) pollAll() {
addrs := p.Addresses()
ctx := context.Background()
toRemove := []dax.Address{}
for _, addr := range addrs {
up := p.nodePoller.Poll(addr)
if !up {
p.logger.Printf("poller removing %s", addr)
toRemove = append(toRemove, addr)
}
}
if len(toRemove) > 0 {
p.logger.Debugf("POLLER: removing addresses: %v", toRemove)
start := time.Now()
err := p.addressManager.RemoveAddresses(ctx, toRemove...)
if err != nil {
p.logger.Printf("POLLER: error removing %s: %v", toRemove, err)
}
p.logger.Debugf("POLLER removing %v complete: %s", toRemove, time.Since(start))
}
}