/
poller.go
157 lines (130 loc) · 3.15 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package poller
import (
"context"
"fmt"
"sync"
"facette.io/facette/catalog"
"facette.io/facette/config"
"facette.io/facette/storage"
"facette.io/logger"
"github.com/pkg/errors"
)
// Poller represents a poller instance.
type Poller struct {
sync.RWMutex
ctx context.Context
storage *storage.Storage
searcher *catalog.Searcher
config *config.Config
logger *logger.Logger
workers map[string]*worker
errors map[string]error
wg *sync.WaitGroup
}
// New creates a new poller instance.
func New(
ctx context.Context,
storage *storage.Storage,
searcher *catalog.Searcher,
config *config.Config,
logger *logger.Logger,
) *Poller {
return &Poller{
ctx: ctx,
storage: storage,
searcher: searcher,
config: config,
logger: logger,
workers: make(map[string]*worker),
errors: make(map[string]error),
wg: &sync.WaitGroup{},
}
}
// Run starts polling the providers.
func (p *Poller) Run() error {
var providers []*storage.Provider
p.logger.Info("started")
// Get providers list from storage
_, err := p.storage.SQL().List(&providers, map[string]interface{}{"enabled": true}, nil, 0, 0, false)
if err != nil {
return errors.Wrap(err, "cannot list providers")
}
// Start providers and apply catalog searcher priorities
for _, prov := range providers {
p.StartWorker(prov)
}
// Wait for main context cancellation
<-p.ctx.Done()
p.Shutdown()
p.wg.Wait()
p.logger.Info("stopped")
return nil
}
// Shutdown stops the providers polling.
func (p *Poller) Shutdown() {
p.RLock()
defer p.RUnlock()
for _, w := range p.workers {
if w != nil {
go p.StopWorker(w.provider, false)
}
}
}
// StartWorker starts a new poller worker given a storage provider.
func (p *Poller) StartWorker(prov *storage.Provider) {
var err error
p.Lock()
defer p.Unlock()
if !prov.Enabled {
return
}
if _, ok := p.workers[prov.ID]; ok {
p.logger.Warning("worker %q is already registered", prov.Name)
return
}
// Initialize new poller worker and perform initial refresh
p.workers[prov.ID], err = newWorker(p, prov, p.logger.Context(fmt.Sprintf("poller[%s]", prov.Name)))
if err != nil {
p.errors[prov.ID] = err
p.logger.Error("failed to start %q worker: %s", prov.Name, err)
return
}
p.errors[prov.ID] = nil
go p.workers[prov.ID].Run()
}
// StopWorker stops an existing poller worker.
func (p *Poller) StopWorker(prov *storage.Provider, update bool) {
p.Lock()
if w, ok := p.workers[prov.ID]; ok {
if w != nil {
w.Shutdown()
}
delete(p.workers, prov.ID)
}
p.Unlock()
// Try to restart provider instance if in update mode
if update {
p.StartWorker(prov)
}
}
// WorkerError returns the error returned on poller worker initialization.
func (p *Poller) WorkerError(id string) error {
err, _ := p.errors[id]
return err
}
// RefreshAll triggers a refresh on all the registered poller workers.
func (p *Poller) RefreshAll() {
p.RLock()
defer p.RUnlock()
for _, w := range p.workers {
go w.Refresh()
}
}
// Refresh triggers a refresh on an existing poller worker.
func (p *Poller) Refresh(prov storage.Provider) {
p.RLock()
defer p.RUnlock()
if w, ok := p.workers[prov.ID]; ok {
go w.Refresh()
}
}