forked from sensu/sensu-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
273 lines (233 loc) · 7.24 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
// Package etcd manages the embedded etcd server that Sensu uses for storing
// state consistently across sensu-backend processes.
//
// To use the embedded Etcd, you must first call NewEtcd(). This will configure
// and start Etcd and ensure that it starts correctly. The channel returned by
// Err() should be monitored--these are terminal/fatal errors for Etcd.
package etcd
import (
"context"
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/pkg/transport"
etcdTypes "github.com/coreos/etcd/pkg/types"
"github.com/coreos/pkg/capnslog"
"github.com/sensu/sensu-go/types"
"github.com/sensu/sensu-go/util/path"
"google.golang.org/grpc/grpclog"
)
const (
// ClusterStateNew specifies this is a new etcd cluster
ClusterStateNew = "new"
// EtcdStartupTimeout is the amount of time we give the embedded Etcd Server
// to start.
EtcdStartupTimeout = 60 // seconds
// DefaultMaxRequestBytes is the default maximum request size for etcd
// requests (1.5 MB)
DefaultMaxRequestBytes = 1.5 * (1 << 20)
// DefaultQuotaBackendBytes is the default database size limit for etcd
// databases (4 GB)
DefaultQuotaBackendBytes int64 = (1 << 32)
)
func init() {
clientv3.SetLogger(grpclog.NewLoggerV2(ioutil.Discard, ioutil.Discard, ioutil.Discard))
}
// Config is a configuration for the embedded etcd
type Config struct {
DataDir string
Name string // Cluster Member Name
AdvertiseClientURLs []string
ListenPeerURLs []string
ListenClientURLs []string
InitialCluster string
InitialClusterState string
InitialClusterToken string
InitialAdvertisePeerURLs []string
ClientTLSInfo TLSInfo
PeerTLSInfo TLSInfo
CipherSuites []string
MaxRequestBytes uint
QuotaBackendBytes int64
}
// TLSInfo wraps etcd transport TLSInfo
type TLSInfo transport.TLSInfo
// NewConfig returns a pointer to an initialized Config object with defaults.
func NewConfig() *Config {
c := &Config{}
c.DataDir = path.SystemCacheDir("sensu-backend")
c.MaxRequestBytes = DefaultMaxRequestBytes
c.QuotaBackendBytes = DefaultQuotaBackendBytes
return c
}
func ensureDir(path string) error {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
if mkdirErr := os.MkdirAll(path, 0700); mkdirErr != nil {
return mkdirErr
}
} else {
return err
}
}
fi, err := os.Stat(path)
if err != nil {
return err
}
if !fi.IsDir() {
return fmt.Errorf("path exists and is not directory - %s", path)
}
return nil
}
// Etcd is a wrapper around github.com/coreos/etcd/embed.Etcd
type Etcd struct {
cfg *Config
etcd *embed.Etcd
clientURLs []string
}
// BackendID returns the ID of the etcd cluster member
func (e *Etcd) BackendID() (result string) {
return e.etcd.Server.ID().String()
}
// GetClusterVersion returns the cluster version of the etcd server
func (e *Etcd) GetClusterVersion() string {
return e.etcd.Server.ClusterVersion().String()
}
// NewEtcd returns a new, configured, and running Etcd. The running Etcd will
// panic on error. The calling goroutine should recover() from the panic and
// shutdown accordingly. Callers must also ensure that the running Etcd is
// cleanly shutdown before the process terminates.
//
// Callers should monitor the Err() channel for the running etcd--these are
// terminal errors.
func NewEtcd(config *Config) (*Etcd, error) {
// Parse the various URLs
var err error
var lcURLs etcdTypes.URLs
lcURLs, err = etcdTypes.NewURLs(config.ListenClientURLs)
if err != nil {
return nil, fmt.Errorf("invalid listen client urls: %s", err)
}
var acURLs etcdTypes.URLs
acURLs, err = etcdTypes.NewURLs(config.AdvertiseClientURLs)
if err != nil {
return nil, fmt.Errorf("invalid advertise client urls: %s", err)
}
var lpURLs etcdTypes.URLs
lpURLs, err = etcdTypes.NewURLs(config.ListenPeerURLs)
if err != nil {
return nil, fmt.Errorf("invalid listen peer urls: %s", err)
}
var apURLs etcdTypes.URLs
apURLs, err = etcdTypes.NewURLs(config.InitialAdvertisePeerURLs)
if err != nil {
return nil, fmt.Errorf("invalid initial advertise peer urls: %s", err)
}
cfg := embed.NewConfig()
cfg.Name = config.Name
cfg.Dir = filepath.Join(config.DataDir, "etcd", "data")
cfg.WalDir = filepath.Join(config.DataDir, "etcd", "wal")
if err := ensureDir(cfg.Dir); err != nil {
return nil, err
}
if err := ensureDir(cfg.WalDir); err != nil {
return nil, err
}
// Client config
cfg.ACUrls = acURLs
cfg.LCUrls = lcURLs
cfg.ClientTLSInfo = (transport.TLSInfo)(config.ClientTLSInfo)
// Peer config
cfg.APUrls = apURLs
cfg.LPUrls = lpURLs
cfg.PeerTLSInfo = (transport.TLSInfo)(config.PeerTLSInfo)
cfg.CipherSuites = config.CipherSuites
// Cluster config
cfg.InitialClusterToken = config.InitialClusterToken
cfg.InitialCluster = config.InitialCluster
cfg.ClusterState = config.InitialClusterState
// Every 5 minutes, we will prune all values in etcd to only their latest
// revision.
cfg.AutoCompactionMode = "revision"
cfg.AutoCompactionRetention = "2"
cfg.QuotaBackendBytes = config.QuotaBackendBytes
cfg.MaxRequestBytes = config.MaxRequestBytes
capnslog.SetFormatter(NewLogrusFormatter())
e, err := embed.StartEtcd(cfg)
if err != nil {
return nil, err
}
select {
case <-e.Server.ReadyNotify():
logger.Info("Etcd ready to serve client connections")
case <-time.After(EtcdStartupTimeout * time.Second):
e.Server.Stop()
return nil, fmt.Errorf("Etcd failed to start in %d seconds", EtcdStartupTimeout)
}
return &Etcd{cfg: config, etcd: e, clientURLs: config.AdvertiseClientURLs}, nil
}
// Name returns the configured name for Etcd.
func (e *Etcd) Name() string {
return e.cfg.Name
}
// Err returns the error channel for Etcd or nil if no etcd is started.
func (e *Etcd) Err() <-chan error {
return e.etcd.Err()
}
// Shutdown will cleanly shutdown the running Etcd.
func (e *Etcd) Shutdown() error {
etcdStopped := e.etcd.Server.StopNotify()
e.etcd.Close()
<-etcdStopped
return nil
}
// NewClient returns a new etcd v3 client. Clients must be closed after use.
func (e *Etcd) NewClient() (*clientv3.Client, error) {
// Define the TLS options for the client using the etcd client config
tlsOptions := &types.TLSOptions{
CertFile: e.cfg.ClientTLSInfo.CertFile,
KeyFile: e.cfg.ClientTLSInfo.KeyFile,
TrustedCAFile: e.cfg.ClientTLSInfo.TrustedCAFile,
}
// Translate our TLS options to a *tls.Config
tlsConfig, err := tlsOptions.ToClientTLSConfig()
if err != nil {
return nil, err
}
listeners := e.etcd.Clients
if len(listeners) == 0 {
return nil, errors.New("no etcd client listeners found for server")
}
cli, err := clientv3.New(clientv3.Config{
Endpoints: e.clientURLs,
DialTimeout: 5 * time.Second,
TLS: tlsConfig,
})
if err != nil {
return nil, err
}
return cli, nil
}
// Healthy returns Etcd status information.
func (e *Etcd) Healthy() bool {
if len(e.cfg.AdvertiseClientURLs) == 0 {
return false
}
client, err := e.NewClient()
if err != nil {
return false
}
mapi := clientv3.NewMaintenance(client)
_, err = mapi.Status(context.TODO(), e.cfg.AdvertiseClientURLs[0])
return err == nil
}
func (e *Etcd) ClientURLs() []string {
result := make([]string, len(e.clientURLs))
copy(result, e.clientURLs)
return result
}