forked from hashicorp/consul
/
connect_ca_leaf.go
275 lines (239 loc) · 8.54 KB
/
connect_ca_leaf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package cachetype
import (
"crypto/sha256"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
)
// Recommended name for registration.
const ConnectCALeafName = "connect-ca-leaf"
// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
type ConnectCALeaf struct {
caIndex uint64 // Current index for CA roots
issuedCertsLock sync.RWMutex
issuedCerts map[string]*structs.IssuedCert
RPC RPC // RPC client for remote requests
Cache *cache.Cache // Cache that has CA root certs via ConnectCARoot
}
// issuedKey returns the issuedCerts cache key for a given service and token. We
// use a hash rather than concatenating strings to provide resilience against
// user input containing our separator - both service name and token ID can be
// freely manipulated by user so may contain any delimiter we choose. It also
// has the benefit of not leaking the ACL token to a new place in memory it
// might get accidentally dumped etc.
func issuedKey(service, token string) string {
hash := sha256.New()
hash.Write([]byte(service))
hash.Write([]byte(token))
return fmt.Sprintf("%x", hash.Sum(nil))
}
func (c *ConnectCALeaf) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result cache.FetchResult
// Get the correct type
reqReal, ok := req.(*ConnectCALeafRequest)
if !ok {
return result, fmt.Errorf(
"Internal cache failure: request wrong type: %T", req)
}
// This channel watches our overall timeout. The other goroutines
// launched in this function should end all around the same time so
// they clean themselves up.
timeoutCh := time.After(opts.Timeout)
// Kick off the goroutine that waits for new CA roots. The channel buffer
// is so that the goroutine doesn't block forever if we return for other
// reasons.
newRootCACh := make(chan error, 1)
go c.waitNewRootCA(reqReal.Datacenter, newRootCACh, opts.Timeout)
// Generate a cache key to lookup/store the cert. We MUST generate a new cert
// per token used to ensure revocation by ACL token is robust.
issuedKey := issuedKey(reqReal.Service, reqReal.Token)
// Get our prior cert (if we had one) and use that to determine our
// expiration time. If no cert exists, we expire immediately since we
// need to generate.
c.issuedCertsLock.RLock()
lastCert := c.issuedCerts[issuedKey]
c.issuedCertsLock.RUnlock()
var leafExpiryCh <-chan time.Time
if lastCert != nil {
// Determine how long we wait until triggering. If we've already
// expired, we trigger immediately.
if expiryDur := lastCert.ValidBefore.Sub(time.Now()); expiryDur > 0 {
leafExpiryCh = time.After(expiryDur - 1*time.Hour)
// TODO(mitchellh): 1 hour buffer is hardcoded above
// We should not depend on the cache package de-duplicating requests for
// the same service/token (which is all we care about keying our local
// issued cert cache on) since it might later make sense to partition
// clients for other reasons too. So if the request has a 0 MinIndex, and
// the cached cert is still valid, then the client is expecting an
// immediate response and hasn't already seen the cached cert, return it
// now.
if opts.MinIndex == 0 {
result.Value = lastCert
result.Index = lastCert.ModifyIndex
return result, nil
}
}
}
if leafExpiryCh == nil {
// If the channel is still nil then it means we need to generate
// a cert no matter what: we either don't have an existing one or
// it is expired.
leafExpiryCh = time.After(0)
}
// Block on the events that wake us up.
select {
case <-timeoutCh:
// On a timeout, we just return the empty result and no error.
// It isn't an error to timeout, its just the limit of time the
// caching system wants us to block for. By returning an empty result
// the caching system will ignore.
return result, nil
case err := <-newRootCACh:
// A new root CA triggers us to refresh the leaf certificate.
// If there was an error while getting the root CA then we return.
// Otherwise, we leave the select statement and move to generation.
if err != nil {
return result, err
}
case <-leafExpiryCh:
// The existing leaf certificate is expiring soon, so we generate a
// new cert with a healthy overlapping validity period (determined
// by the above channel).
}
// Need to lookup RootCAs response to discover trust domain. First just lookup
// with no blocking info - this should be a cache hit most of the time.
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: reqReal.Datacenter,
})
if err != nil {
return result, err
}
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
return result, errors.New("invalid RootCA response type")
}
if roots.TrustDomain == "" {
return result, errors.New("cluster has no CA bootstrapped yet")
}
// Build the service ID
serviceID := &connect.SpiffeIDService{
Host: roots.TrustDomain,
Datacenter: reqReal.Datacenter,
Namespace: "default",
Service: reqReal.Service,
}
// Create a new private key
pk, pkPEM, err := connect.GeneratePrivateKey()
if err != nil {
return result, err
}
// Create a CSR.
csr, err := connect.CreateCSR(serviceID, pk)
if err != nil {
return result, err
}
// Request signing
var reply structs.IssuedCert
args := structs.CASignRequest{
WriteRequest: structs.WriteRequest{Token: reqReal.Token},
Datacenter: reqReal.Datacenter,
CSR: csr,
}
if err := c.RPC.RPC("ConnectCA.Sign", &args, &reply); err != nil {
return result, err
}
reply.PrivateKeyPEM = pkPEM
// Lock the issued certs map so we can insert it. We only insert if
// we didn't happen to get a newer one. This should never happen since
// the Cache should ensure only one Fetch per service, but we sanity
// check just in case.
c.issuedCertsLock.Lock()
defer c.issuedCertsLock.Unlock()
lastCert = c.issuedCerts[issuedKey]
if lastCert == nil || lastCert.ModifyIndex < reply.ModifyIndex {
if c.issuedCerts == nil {
c.issuedCerts = make(map[string]*structs.IssuedCert)
}
c.issuedCerts[issuedKey] = &reply
lastCert = &reply
}
result.Value = lastCert
result.Index = lastCert.ModifyIndex
return result, nil
}
// waitNewRootCA blocks until a new root CA is available or the timeout is
// reached (on timeout ErrTimeout is returned on the channel).
func (c *ConnectCALeaf) waitNewRootCA(datacenter string, ch chan<- error,
timeout time.Duration) {
// We always want to block on at least an initial value. If this isn't
minIndex := atomic.LoadUint64(&c.caIndex)
if minIndex == 0 {
minIndex = 1
}
// Fetch some new roots. This will block until our MinQueryIndex is
// matched or the timeout is reached.
rawRoots, _, err := c.Cache.Get(ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: datacenter,
QueryOptions: structs.QueryOptions{
MinQueryIndex: minIndex,
MaxQueryTime: timeout,
},
})
if err != nil {
ch <- err
return
}
roots, ok := rawRoots.(*structs.IndexedCARoots)
if !ok {
// This should never happen but we don't want to even risk a panic
ch <- fmt.Errorf(
"internal error: CA root cache returned bad type: %T", rawRoots)
return
}
// We do a loop here because there can be multiple waitNewRootCA calls
// happening simultaneously. Each Fetch kicks off one call. These are
// multiplexed through Cache.Get which should ensure we only ever
// actually make a single RPC call. However, there is a race to set
// the caIndex field so do a basic CAS loop here.
for {
// We only set our index if its newer than what is previously set.
old := atomic.LoadUint64(&c.caIndex)
if old == roots.Index || old > roots.Index {
break
}
// Set the new index atomically. If the caIndex value changed
// in the meantime, retry.
if atomic.CompareAndSwapUint64(&c.caIndex, old, roots.Index) {
break
}
}
// Trigger the channel since we updated.
ch <- nil
}
func (c *ConnectCALeaf) SupportsBlocking() bool {
return true
}
// ConnectCALeafRequest is the cache.Request implementation for the
// ConnectCALeaf cache type. This is implemented here and not in structs
// since this is only used for cache-related requests and not forwarded
// directly to any Consul servers.
type ConnectCALeafRequest struct {
Token string
Datacenter string
Service string // Service name, not ID
MinQueryIndex uint64
}
func (r *ConnectCALeafRequest) CacheInfo() cache.RequestInfo {
return cache.RequestInfo{
Token: r.Token,
Key: r.Service,
Datacenter: r.Datacenter,
MinIndex: r.MinQueryIndex,
}
}