-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
consul.go
228 lines (195 loc) · 7.68 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package e2eutil
import (
"fmt"
"testing"
"time"
capi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/testutil"
"github.com/kr/pretty"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// RequireConsulStatus asserts the aggregate health of the service converges to the expected status.
func RequireConsulStatus(require *require.Assertions, client *capi.Client, namespace, service, expectedStatus string) {
testutil.WaitForResultRetries(30, func() (bool, error) {
defer time.Sleep(time.Second) // needs a long time for killing tasks/clients
_, status := serviceStatus(require, client, namespace, service)
return status == expectedStatus, fmt.Errorf("service %s/%s: expected %s but found %s", namespace, service, expectedStatus, status)
}, func(err error) {
require.NoError(err, "timedout waiting for consul status")
})
}
// serviceStatus gets the aggregate health of the service and returns the []ServiceEntry for further checking.
func serviceStatus(require *require.Assertions, client *capi.Client, namespace, service string) ([]*capi.ServiceEntry, string) {
services, _, err := client.Health().Service(service, "", false, &capi.QueryOptions{Namespace: namespace})
require.NoError(err, "expected no error for %s/%s, got %s", namespace, service, err)
if len(services) > 0 {
return services, services[0].Checks.AggregatedStatus()
}
return nil, "(unknown status)"
}
// RequireConsulDeregistered asserts that the service eventually is de-registered from Consul.
func RequireConsulDeregistered(require *require.Assertions, client *capi.Client, namespace, service string) {
testutil.WaitForResultRetries(5, func() (bool, error) {
defer time.Sleep(time.Second)
services, _, err := client.Health().Service(service, "", false, &capi.QueryOptions{Namespace: namespace})
require.NoError(err)
if len(services) != 0 {
return false, fmt.Errorf("service %v: expected empty services but found %v %v", service, len(services), pretty.Sprint(services))
}
return true, nil
}, func(err error) {
require.NoError(err)
})
}
// RequireConsulRegistered assert that the service is registered in Consul.
func RequireConsulRegistered(require *require.Assertions, client *capi.Client, namespace, service string, count int) {
testutil.WaitForResultRetries(10, func() (bool, error) {
defer time.Sleep(2 * time.Second)
services, _, err := client.Catalog().Service(service, "", &capi.QueryOptions{Namespace: namespace})
require.NoError(err)
if len(services) != count {
return false, fmt.Errorf("service %v: expected %v services but found %v %v", service, count, len(services), pretty.Sprint(services))
}
return true, nil
}, func(err error) {
require.NoError(err)
})
}
// CreateConsulNamespaces will create each namespace in Consul, with a description
// containing the namespace name.
//
// Requires Consul Enterprise.
func CreateConsulNamespaces(t *testing.T, client *capi.Client, namespaces []string) {
nsClient := client.Namespaces()
for _, namespace := range namespaces {
_, _, err := nsClient.Create(&capi.Namespace{
Name: namespace,
Description: fmt.Sprintf("An e2e namespace called %q", namespace),
}, nil)
require.NoError(t, err)
}
}
// DeleteConsulNamespaces will delete each namespace from Consul.
//
// Requires Consul Enterprise.
func DeleteConsulNamespaces(t *testing.T, client *capi.Client, namespaces []string) {
nsClient := client.Namespaces()
for _, namespace := range namespaces {
_, err := nsClient.Delete(namespace, nil)
assert.NoError(t, err) // be lenient; used in cleanup
}
}
// ListConsulNamespaces will list the namespaces in Consul.
//
// Requires Consul Enterprise.
func ListConsulNamespaces(t *testing.T, client *capi.Client) []string {
nsClient := client.Namespaces()
namespaces, _, err := nsClient.List(nil)
require.NoError(t, err)
result := make([]string, 0, len(namespaces))
for _, namespace := range namespaces {
result = append(result, namespace.Name)
}
return result
}
// PutConsulKey sets key:value in the Consul KV store under given namespace.
//
// Requires Consul Enterprise.
func PutConsulKey(t *testing.T, client *capi.Client, namespace, key, value string) {
kvClient := client.KV()
opts := &capi.WriteOptions{Namespace: namespace}
_, err := kvClient.Put(&capi.KVPair{Key: key, Value: []byte(value)}, opts)
require.NoError(t, err)
}
// DeleteConsulKey deletes the key from the Consul KV store from given namespace.
//
// Requires Consul Enterprise.
func DeleteConsulKey(t *testing.T, client *capi.Client, namespace, key string) {
kvClient := client.KV()
opts := &capi.WriteOptions{Namespace: namespace}
_, err := kvClient.Delete(key, opts)
require.NoError(t, err)
}
// ReadConsulConfigEntry retrieves the ConfigEntry of the given namespace, kind,
// and name.
//
// Requires Consul Enterprise.
func ReadConsulConfigEntry(t *testing.T, client *capi.Client, namespace, kind, name string) capi.ConfigEntry {
ceClient := client.ConfigEntries()
opts := &capi.QueryOptions{Namespace: namespace}
ce, _, err := ceClient.Get(kind, name, opts)
require.NoError(t, err)
return ce
}
// DeleteConsulConfigEntry deletes the ConfigEntry of the given namespace, kind,
// and name.
//
// Requires Consul Enterprise.
func DeleteConsulConfigEntry(t *testing.T, client *capi.Client, namespace, kind, name string) {
ceClient := client.ConfigEntries()
opts := &capi.WriteOptions{Namespace: namespace}
_, err := ceClient.Delete(kind, name, opts)
require.NoError(t, err)
}
// ConsulPolicy is used for create Consul ACL policies that Consul ACL tokens
// can make use of.
type ConsulPolicy struct {
Name string // e.g. nomad-operator
Rules string // e.g. service "" { policy="write" }
}
// CreateConsulPolicy is used to create a Consul ACL policy backed by the given
// ConsulPolicy in the specified namespace.
//
// Requires Consul Enterprise.
func CreateConsulPolicy(t *testing.T, client *capi.Client, namespace string, policy ConsulPolicy) string {
aclClient := client.ACL()
opts := &capi.WriteOptions{Namespace: namespace}
result, _, err := aclClient.PolicyCreate(&capi.ACLPolicy{
Name: policy.Name,
Rules: policy.Rules,
Description: fmt.Sprintf("An e2e test policy %q", policy.Name),
}, opts)
require.NoError(t, err, "failed to create consul acl policy")
return result.ID
}
// DeleteConsulPolicies is used to delete a set Consul ACL policies from Consul.
//
// Requires Consul Enterprise.
func DeleteConsulPolicies(t *testing.T, client *capi.Client, policies map[string][]string) {
aclClient := client.ACL()
for namespace, policyIDs := range policies {
opts := &capi.WriteOptions{Namespace: namespace}
for _, policyID := range policyIDs {
_, err := aclClient.PolicyDelete(policyID, opts)
assert.NoError(t, err)
}
}
}
// CreateConsulToken is used to create a Consul ACL token backed by the policy of
// the given policyID in the specified namespace.
//
// Requires Consul Enterprise.
func CreateConsulToken(t *testing.T, client *capi.Client, namespace, policyID string) string {
aclClient := client.ACL()
opts := &capi.WriteOptions{Namespace: namespace}
token, _, err := aclClient.TokenCreate(&capi.ACLToken{
Policies: []*capi.ACLTokenPolicyLink{{ID: policyID}},
Description: "An e2e test token",
}, opts)
require.NoError(t, err, "failed to create consul acl token")
return token.SecretID
}
// DeleteConsulTokens is used to delete a set of tokens from Consul.
//
// Requires Consul Enterprise.
func DeleteConsulTokens(t *testing.T, client *capi.Client, tokens map[string][]string) {
aclClient := client.ACL()
for namespace, tokenIDs := range tokens {
opts := &capi.WriteOptions{Namespace: namespace}
for _, tokenID := range tokenIDs {
_, err := aclClient.TokenDelete(tokenID, opts)
assert.NoError(t, err)
}
}
}