forked from hashicorp/vault
-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
146 lines (127 loc) · 4.16 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package etcd
import (
"context"
"errors"
"fmt"
"net/url"
"os"
"strings"
"github.com/coreos/etcd/client"
"github.com/coreos/go-semver/semver"
"github.com/hashicorp/vault/physical"
log "github.com/mgutz/logxi/v1"
)
var (
EtcdSyncConfigError = errors.New("client setup failed: unable to parse etcd sync field in config")
EtcdSyncClusterError = errors.New("client setup failed: unable to sync etcd cluster")
EtcdMultipleBootstrapError = errors.New("client setup failed: multiple discovery or bootstrap flags specified, use either \"address\" or \"discovery_srv\"")
EtcdAddressError = errors.New("client setup failed: address must be valid URL (ex. 'scheme://host:port')")
EtcdSemaphoreKeysEmptyError = errors.New("lock queue is empty")
EtcdLockHeldError = errors.New("lock already held")
EtcdLockNotHeldError = errors.New("lock not held")
EtcdSemaphoreKeyRemovedError = errors.New("semaphore key removed before lock aquisition")
EtcdVersionUnknown = errors.New("etcd: unknown API version")
)
// NewEtcdBackend constructs a etcd backend using a given machine address.
func NewEtcdBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) {
var (
apiVersion string
ok bool
)
// v2 client can talk to both etcd2 and etcd3 thought API v2
c, err := newEtcdV2Client(conf)
if err != nil {
return nil, errors.New("failed to create etcd client: " + err.Error())
}
remoteAPIVersion, err := getEtcdAPIVersion(c)
if err != nil {
return nil, errors.New("failed to get etcd API version: " + err.Error())
}
if apiVersion, ok = conf["etcd_api"]; !ok {
apiVersion = os.Getenv("ETCD_API")
}
if apiVersion == "" {
path, ok := conf["path"]
if !ok {
path = "/vault"
}
kAPI := client.NewKeysAPI(c)
// keep using v2 if vault data exists in v2 and user does not explicitly
// ask for v3.
_, err := kAPI.Get(context.Background(), path, &client.GetOptions{})
if errorIsMissingKey(err) {
apiVersion = remoteAPIVersion
} else if err == nil {
apiVersion = "2"
} else {
return nil, errors.New("failed to check etcd status: " + err.Error())
}
}
switch apiVersion {
case "2", "etcd2", "v2":
return newEtcd2Backend(conf, logger)
case "3", "etcd3", "v3":
if remoteAPIVersion == "2" {
return nil, errors.New("etcd3 is required: etcd2 is running")
}
return newEtcd3Backend(conf, logger)
default:
return nil, EtcdVersionUnknown
}
}
// getEtcdAPIVersion gets the latest supported API version.
// If etcd cluster version >= 3.1, "3" will be returned.
// Otherwise, "2" will be returned.
func getEtcdAPIVersion(c client.Client) (string, error) {
v, err := c.GetVersion(context.Background())
if err != nil {
return "", err
}
sv, err := semver.NewVersion(v.Cluster)
if err != nil {
return "", nil
}
if sv.LessThan(*semver.Must(semver.NewVersion("3.1.0"))) {
return "2", nil
}
return "3", nil
}
// Retrieves the config option in order of priority:
// 1. The named environment variable if it exist
// 2. The key in the config map
func getEtcdOption(conf map[string]string, confKey, envVar string) (string, bool) {
confVal, inConf := conf[confKey]
envVal, inEnv := os.LookupEnv(envVar)
if inEnv {
return envVal, true
}
return confVal, inConf
}
func getEtcdEndpoints(conf map[string]string) ([]string, error) {
address, staticBootstrap := getEtcdOption(conf, "address", "ETCD_ADDR")
domain, useSrv := getEtcdOption(conf, "discovery_srv", "ETCD_DISCOVERY_SRV")
if useSrv && staticBootstrap {
return nil, EtcdMultipleBootstrapError
}
if staticBootstrap {
endpoints := strings.Split(address, Etcd2MachineDelimiter)
// Verify that the machines are valid URLs
for _, e := range endpoints {
u, urlErr := url.Parse(e)
if urlErr != nil || u.Scheme == "" {
return nil, EtcdAddressError
}
}
return endpoints, nil
}
if useSrv {
discoverer := client.NewSRVDiscover()
endpoints, err := discoverer.Discover(domain)
if err != nil {
return nil, fmt.Errorf("failed to discover etcd endpoints through SRV discovery: %v", err)
}
return endpoints, nil
}
// Set a default endpoints list if no option was set
return []string{"http://127.0.0.1:2379"}, nil
}