forked from projectcalico/confd
/
processor.go
165 lines (144 loc) · 4.24 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package template
import (
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
var (
initialProcessRetryInterval = 250 * time.Millisecond
maxProcessRetryInterval = 5 * time.Second
)
type Processor interface {
Process()
}
func Process(config Config) error {
// Get the template resources.
ts, err := getTemplateResources(config)
if err != nil {
return err
}
// Configure the client with the set of prefixes.
if err := setClientPrefixes(config, ts); err != nil {
return err
}
var lastErr error
for _, t := range ts {
if err := t.process(); err != nil {
log.Error(err.Error())
lastErr = err
}
}
return lastErr
}
// Called to notify the client which prefixes will be monitored.
func setClientPrefixes(config Config, trs []*TemplateResource) error {
prefixes := []string{}
// Loop through the full set of template resources and get a complete set of
// unique prefixes that are being watched.
pmap := map[string]bool{}
for _, tr := range trs {
for _, pk := range tr.PrefixedKeys {
pmap[pk] = true
}
}
for p, _ := range pmap {
prefixes = append(prefixes, p)
}
// Tell the client the set of prefixes.
return config.StoreClient.SetPrefixes(prefixes)
}
type watchProcessor struct {
config Config
stopChan chan bool
doneChan chan bool
errChan chan error
wg sync.WaitGroup
}
func WatchProcessor(config Config, stopChan, doneChan chan bool, errChan chan error) Processor {
return &watchProcessor{config, stopChan, doneChan, errChan, sync.WaitGroup{}}
}
func (p *watchProcessor) Process() {
defer close(p.doneChan)
// Get the set of template resources.
ts, err := getTemplateResources(p.config)
if err != nil {
log.Fatal(err.Error())
return
}
// Configure the client with the set of prefixes.
if err := setClientPrefixes(p.config, ts); err != nil {
log.Fatal(err.Error())
return
}
// Start the individual watchers for each template.
for _, t := range ts {
t := t
p.wg.Add(1)
go p.monitorPrefix(t)
}
p.wg.Wait()
}
func (p *watchProcessor) monitorPrefix(t *TemplateResource) {
defer p.wg.Done()
var revision uint64
for {
// Watch from the last revision that we updated the templates with. This will exit it the
// data in the datastore for the requested prefixes has had updates since that revision.
err := t.storeClient.WatchPrefix(t.Prefix, t.PrefixedKeys, revision, p.stopChan)
if err != nil {
p.errChan <- err
// Prevent backend errors from consuming all resources.
time.Sleep(time.Second * 2)
continue
}
// Get the current datastore revision and then populate the template with the current settings.
// The templates will be populated with data that is at least as recent as the datastore
// revision.
retryInterval := initialProcessRetryInterval
for {
revision = t.storeClient.GetCurrentRevision()
if err = t.process(); err == nil {
break
}
// We hit an error processing the template - this means the template will not have been
// rendered. This may be because the rendered templates are interconnected and the
// check function for this template is dependent on the other templates being rendered.
// Rather than blocking on WatchPrefix, sleep for a short period and retry - we'll start
// with short retry intervals and increase up to 5s.
log.Debugf("Will retry processing the template in %s", retryInterval)
p.errChan <- err
time.Sleep(retryInterval)
retryInterval *= 2
if retryInterval > maxProcessRetryInterval {
retryInterval = maxProcessRetryInterval
}
}
}
}
func getTemplateResources(config Config) ([]*TemplateResource, error) {
var lastError error
templates := make([]*TemplateResource, 0)
log.Debug("Loading template resources from confdir " + config.ConfDir)
if !isFileExist(config.ConfDir) {
log.Warning(fmt.Sprintf("Cannot load template resources: confdir '%s' does not exist", config.ConfDir))
return nil, nil
}
paths, err := recursiveFindFiles(config.ConfigDir, "*toml")
if err != nil {
return nil, err
}
if len(paths) < 1 {
log.Warning("Found no templates")
}
for _, p := range paths {
log.Debug(fmt.Sprintf("Found template: %s", p))
t, err := NewTemplateResource(p, config)
if err != nil {
lastError = err
continue
}
templates = append(templates, t)
}
return templates, lastError
}