forked from moby/swarmkit
/
reconciler.go
259 lines (236 loc) · 8.65 KB
/
reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package ca
import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/cloudflare/cfssl/helpers"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"github.com/pkg/errors"
)
// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update
const IssuanceStateRotateMaxBatchSize = 30
func hasIssuer(n *api.Node, info *IssuerInfo) bool {
if n.Description == nil || n.Description.TLSInfo == nil {
return false
}
return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey)
}
var errRootRotationChanged = errors.New("target root rotation has changed")
// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated
// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the
// cluster's RootCA has changed and when a node is added, updated, or removed.
type rootRotationReconciler struct {
mu sync.Mutex
clusterID string
batchUpdateInterval time.Duration
ctx context.Context
store *store.MemoryStore
currentRootCA *api.RootCA
currentIssuer IssuerInfo
unconvergedNodes map[string]*api.Node
wg sync.WaitGroup
cancel func()
}
// IssuerFromAPIRootCA returns the desired issuer given an API root CA object
func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) {
wantedIssuer := rootCA.CACert
if rootCA.RootRotation != nil {
wantedIssuer = rootCA.RootRotation.CACert
}
issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer)
if err != nil {
return nil, errors.Wrap(err, "invalid certificate in cluster root CA object")
}
if len(issuerCerts) == 0 {
return nil, errors.New("invalid certificate in cluster root CA object")
}
return &IssuerInfo{
Subject: issuerCerts[0].RawSubject,
PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo,
}, nil
}
// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
issuerInfo, err := IssuerFromAPIRootCA(newRootCA)
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to update process the current root CA")
return
}
var (
shouldStartNewLoop, waitForPrevLoop bool
loopCtx context.Context
)
r.mu.Lock()
defer func() {
r.mu.Unlock()
if shouldStartNewLoop {
if waitForPrevLoop {
r.wg.Wait()
}
r.wg.Add(1)
go r.runReconcilerLoop(loopCtx, newRootCA)
}
}()
// check if the issuer has changed, first
if reflect.DeepEqual(&r.currentIssuer, issuerInfo) {
r.currentRootCA = newRootCA
return
}
// If the issuer has changed, iterate through all the nodes to figure out which ones need rotation
if newRootCA.RootRotation != nil {
var nodes []*api.Node
r.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.ByMembership(api.NodeMembershipAccepted))
})
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
return
}
// from here on out, there will be no more errors that cause us to have to abandon updating the Root CA,
// so we can start making changes to r's fields
r.unconvergedNodes = make(map[string]*api.Node)
for _, n := range nodes {
if !hasIssuer(n, issuerInfo) {
r.unconvergedNodes[n.ID] = n
}
}
shouldStartNewLoop = true
if r.cancel != nil { // there's already a loop going, so cancel it
r.cancel()
waitForPrevLoop = true
}
loopCtx, r.cancel = context.WithCancel(r.ctx)
} else {
r.unconvergedNodes = nil
}
r.currentRootCA = newRootCA
r.currentIssuer = *issuerInfo
}
// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
r.mu.Lock()
defer r.mu.Unlock()
// if we're not in the middle of a root rotation, or if this node does not have membership, ignore it
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil || node.Spec.Membership != api.NodeMembershipAccepted {
return
}
if hasIssuer(node, &r.currentIssuer) {
delete(r.unconvergedNodes, node.ID)
} else {
r.unconvergedNodes[node.ID] = node
}
}
// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) DeleteNode(node *api.Node) {
r.mu.Lock()
delete(r.unconvergedNodes, node.ID)
r.mu.Unlock()
}
func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) {
defer r.wg.Done()
for {
r.mu.Lock()
if len(r.unconvergedNodes) == 0 {
r.mu.Unlock()
err := r.store.Update(func(tx store.Tx) error {
return r.finishRootRotation(tx, loopRootCA)
})
if err == nil {
log.G(r.ctx).Info("completed root rotation")
return
}
log.G(r.ctx).WithError(err).Error("could not complete root rotation")
if err == errRootRotationChanged {
// if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early
return
}
} else {
var toUpdate []*api.Node
for _, n := range r.unconvergedNodes {
iState := n.Certificate.Status.State
if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate {
n = n.Copy()
n.Certificate.Status.State = api.IssuanceStateRotate
toUpdate = append(toUpdate, n)
if len(toUpdate) >= IssuanceStateRotateMaxBatchSize {
break
}
}
}
r.mu.Unlock()
if err := r.batchUpdateNodes(toUpdate); err != nil {
log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate))
}
}
select {
case <-ctx.Done():
return
case <-time.After(r.batchUpdateInterval):
}
}
}
// This function assumes that the expected root CA has root rotation. This is intended to be used by
// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks
// that it has a root rotation before calling this function.
func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error {
cluster := store.GetCluster(tx, r.clusterID)
if cluster == nil {
return fmt.Errorf("unable to get cluster %s", r.clusterID)
}
// If the RootCA object has changed (because another root rotation was started or because some other node
// had finished the root rotation), we cannot finish the root rotation that we were working on.
if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) {
return errRootRotationChanged
}
var signerCert []byte
if len(cluster.RootCA.RootRotation.CAKey) > 0 {
signerCert = cluster.RootCA.RootRotation.CACert
}
// we don't actually have to parse out the default node expiration from the cluster - we are just using
// the ca.RootCA object to generate new tokens and the digest
updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey,
DefaultNodeCertExpiration, nil)
if err != nil {
return errors.Wrap(err, "invalid cluster root rotation object")
}
cluster.RootCA = api.RootCA{
CACert: cluster.RootCA.RootRotation.CACert,
CAKey: cluster.RootCA.RootRotation.CAKey,
CACertHash: updatedRootCA.Digest.String(),
JoinTokens: api.JoinTokens{
Worker: GenerateJoinToken(&updatedRootCA),
Manager: GenerateJoinToken(&updatedRootCA),
},
LastForcedRotation: cluster.RootCA.LastForcedRotation,
}
return store.UpdateCluster(tx, cluster)
}
func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error {
if len(toUpdate) == 0 {
return nil
}
_, err := r.store.Batch(func(batch *store.Batch) error {
// Directly update the nodes rather than get + update, and ignore version errors. Since
// `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have
// close to the latest versions of all the nodes. If not, the node will updated later and the
// next batch of updates should catch it.
for _, n := range toUpdate {
if err := batch.Update(func(tx store.Tx) error {
return store.UpdateNode(tx, n)
}); err != nil && err != store.ErrSequenceConflict {
log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID)
}
}
return nil
})
return err
}