forked from traefik/traefik
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kv.go
129 lines (119 loc) · 3.81 KB
/
kv.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package kv
import (
"errors"
"fmt"
"strings"
"time"
"github.com/abronan/valkeyrie"
"github.com/abronan/valkeyrie/store"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
)
// Provider holds common configurations of key-value providers.
type Provider struct {
provider.BaseProvider `mapstructure:",squash" export:"true"`
Endpoint string `description:"Comma separated server endpoints"`
Prefix string `description:"Prefix used for KV store" export:"true"`
TLS *types.ClientTLS `description:"Enable TLS support" export:"true"`
Username string `description:"KV Username"`
Password string `description:"KV Password"`
storeType store.Backend
kvClient store.Store
}
// CreateStore create the K/V store
func (p *Provider) CreateStore() (store.Store, error) {
storeConfig := &store.Config{
ConnectionTimeout: 30 * time.Second,
Bucket: "traefik",
Username: p.Username,
Password: p.Password,
}
if p.TLS != nil {
var err error
storeConfig.TLS, err = p.TLS.CreateTLSConfig()
if err != nil {
return nil, err
}
}
return valkeyrie.NewStore(
p.storeType,
strings.Split(p.Endpoint, ","),
storeConfig,
)
}
// SetStoreType storeType setter
func (p *Provider) SetStoreType(storeType store.Backend) {
p.storeType = storeType
}
// SetKVClient kvClient setter
func (p *Provider) SetKVClient(kvClient store.Store) {
p.kvClient = kvClient
}
func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
operation := func() error {
events, err := p.kvClient.WatchTree(p.Prefix, make(chan struct{}), nil)
if err != nil {
return fmt.Errorf("failed to KV WatchTree: %v", err)
}
for {
select {
case <-stop:
return nil
case _, ok := <-events:
if !ok {
return errors.New("watchtree channel closed")
}
configuration := p.buildConfiguration()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: string(p.storeType),
Configuration: configuration,
}
}
}
}
}
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
return fmt.Errorf("cannot connect to KV server: %v", err)
}
return nil
}
// Provide provides the configuration to traefik via the configuration channel
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
operation := func() error {
if _, err := p.kvClient.Exists(p.Prefix+"/qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj", nil); err != nil {
return fmt.Errorf("failed to test KV store connection: %v", err)
}
if p.Watch {
pool.Go(func(stop chan bool) {
err := p.watchKv(configurationChan, p.Prefix, stop)
if err != nil {
log.Errorf("Cannot watch KV store: %v", err)
}
})
}
configuration := p.buildConfiguration()
configurationChan <- types.ConfigMessage{
ProviderName: string(p.storeType),
Configuration: configuration,
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
return fmt.Errorf("cannot connect to KV server: %v", err)
}
return nil
}