forked from moby/swarmkit
-
Notifications
You must be signed in to change notification settings - Fork 1
/
storage.go
265 lines (235 loc) · 7.68 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package raft
import (
"context"
"fmt"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/encryption"
"github.com/docker/swarmkit/manager/state/raft/membership"
"github.com/docker/swarmkit/manager/state/raft/storage"
"github.com/docker/swarmkit/manager/state/store"
"github.com/pkg/errors"
)
var (
// Snapshot create latency timer.
snapshotLatencyTimer metrics.Timer
)
func init() {
ns := metrics.NewNamespace("swarm", "raft", nil)
snapshotLatencyTimer = ns.NewTimer("snapshot_latency",
"Raft snapshot create latency.")
metrics.Register(ns)
}
func (n *Node) readFromDisk(ctx context.Context) (*raftpb.Snapshot, storage.WALData, error) {
keys := n.keyRotator.GetKeys()
n.raftLogger = &storage.EncryptedRaftLogger{
StateDir: n.opts.StateDir,
EncryptionKey: keys.CurrentDEK,
FIPS: n.opts.FIPS,
}
if keys.PendingDEK != nil {
n.raftLogger.EncryptionKey = keys.PendingDEK
}
snap, walData, err := n.raftLogger.BootstrapFromDisk(ctx)
if keys.PendingDEK != nil {
switch errors.Cause(err).(type) {
case nil:
if err = n.keyRotator.UpdateKeys(EncryptionKeys{CurrentDEK: keys.PendingDEK}); err != nil {
err = errors.Wrap(err, "previous key rotation was successful, but unable mark rotation as complete")
}
case encryption.ErrCannotDecrypt:
snap, walData, err = n.raftLogger.BootstrapFromDisk(ctx, keys.CurrentDEK)
}
}
if err != nil {
return nil, storage.WALData{}, err
}
return snap, walData, nil
}
// bootstraps a node's raft store from the raft logs and snapshots on disk
func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
snapshot, waldata, err := n.readFromDisk(ctx)
if err != nil {
return err
}
// Read logs to fully catch up store
var raftNode api.RaftMember
if err := raftNode.Unmarshal(waldata.Metadata); err != nil {
return errors.Wrap(err, "failed to unmarshal WAL metadata")
}
n.Config.ID = raftNode.RaftID
if snapshot != nil {
snapCluster, err := n.clusterSnapshot(snapshot.Data)
if err != nil {
return err
}
var bootstrapMembers []*api.RaftMember
if forceNewCluster {
for _, m := range snapCluster.Members {
if m.RaftID != n.Config.ID {
n.cluster.RemoveMember(m.RaftID)
continue
}
bootstrapMembers = append(bootstrapMembers, m)
}
} else {
bootstrapMembers = snapCluster.Members
}
n.bootstrapMembers = bootstrapMembers
for _, removedMember := range snapCluster.Removed {
n.cluster.RemoveMember(removedMember)
}
}
ents, st := waldata.Entries, waldata.HardState
// All members that are no longer part of the cluster must be added to
// the removed list right away, so that we don't try to connect to them
// before processing the configuration change entries, which could make
// us get stuck.
for _, ent := range ents {
if ent.Index <= st.Commit && ent.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(ent.Data); err != nil {
return errors.Wrap(err, "failed to unmarshal config change")
}
if cc.Type == raftpb.ConfChangeRemoveNode {
n.cluster.RemoveMember(cc.NodeID)
}
}
}
if forceNewCluster {
// discard the previously uncommitted entries
for i, ent := range ents {
if ent.Index > st.Commit {
log.G(ctx).Infof("discarding %d uncommitted WAL entries", len(ents)-i)
ents = ents[:i]
break
}
}
// force append the configuration change entries
toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), n.Config.ID, st.Term, st.Commit)
// All members that are being removed as part of the
// force-new-cluster process must be added to the
// removed list right away, so that we don't try to
// connect to them before processing the configuration
// change entries, which could make us get stuck.
for _, ccEnt := range toAppEnts {
if ccEnt.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(ccEnt.Data); err != nil {
return errors.Wrap(err, "error unmarshalling force-new-cluster config change")
}
if cc.Type == raftpb.ConfChangeRemoveNode {
n.cluster.RemoveMember(cc.NodeID)
}
}
}
ents = append(ents, toAppEnts...)
// force commit newly appended entries
err := n.raftLogger.SaveEntries(st, toAppEnts)
if err != nil {
log.G(ctx).WithError(err).Fatal("failed to save WAL while forcing new cluster")
}
if len(toAppEnts) != 0 {
st.Commit = toAppEnts[len(toAppEnts)-1].Index
}
}
if snapshot != nil {
if err := n.raftStore.ApplySnapshot(*snapshot); err != nil {
return err
}
}
if err := n.raftStore.SetHardState(st); err != nil {
return err
}
return n.raftStore.Append(ents)
}
func (n *Node) newRaftLogs(nodeID string) (raft.Peer, error) {
raftNode := &api.RaftMember{
RaftID: n.Config.ID,
NodeID: nodeID,
Addr: n.opts.Addr,
}
metadata, err := raftNode.Marshal()
if err != nil {
return raft.Peer{}, errors.Wrap(err, "error marshalling raft node")
}
if err := n.raftLogger.BootstrapNew(metadata); err != nil {
return raft.Peer{}, err
}
n.cluster.AddMember(&membership.Member{RaftMember: raftNode})
return raft.Peer{ID: n.Config.ID, Context: metadata}, nil
}
func (n *Node) triggerSnapshot(ctx context.Context, raftConfig api.RaftConfig) {
snapshot := api.Snapshot{Version: api.Snapshot_V0}
for _, member := range n.cluster.Members() {
snapshot.Membership.Members = append(snapshot.Membership.Members,
&api.RaftMember{
NodeID: member.NodeID,
RaftID: member.RaftID,
Addr: member.Addr,
})
}
snapshot.Membership.Removed = n.cluster.Removed()
viewStarted := make(chan struct{})
n.asyncTasks.Add(1)
n.snapshotInProgress = make(chan raftpb.SnapshotMetadata, 1) // buffered in case Shutdown is called during the snapshot
go func(appliedIndex uint64, snapshotMeta raftpb.SnapshotMetadata) {
// Deferred latency capture.
defer metrics.StartTimer(snapshotLatencyTimer)()
defer func() {
n.asyncTasks.Done()
n.snapshotInProgress <- snapshotMeta
}()
var err error
n.memoryStore.View(func(tx store.ReadTx) {
close(viewStarted)
var storeSnapshot *api.StoreSnapshot
storeSnapshot, err = n.memoryStore.Save(tx)
snapshot.Store = *storeSnapshot
})
if err != nil {
log.G(ctx).WithError(err).Error("failed to read snapshot from store")
return
}
d, err := snapshot.Marshal()
if err != nil {
log.G(ctx).WithError(err).Error("failed to marshal snapshot")
return
}
snap, err := n.raftStore.CreateSnapshot(appliedIndex, &n.confState, d)
if err == nil {
if err := n.raftLogger.SaveSnapshot(snap); err != nil {
log.G(ctx).WithError(err).Error("failed to save snapshot")
return
}
snapshotMeta = snap.Metadata
if appliedIndex > raftConfig.LogEntriesForSlowFollowers {
err := n.raftStore.Compact(appliedIndex - raftConfig.LogEntriesForSlowFollowers)
if err != nil && err != raft.ErrCompacted {
log.G(ctx).WithError(err).Error("failed to compact snapshot")
}
}
} else if err != raft.ErrSnapOutOfDate {
log.G(ctx).WithError(err).Error("failed to create snapshot")
}
}(n.appliedIndex, n.snapshotMeta)
// Wait for the goroutine to establish a read transaction, to make
// sure it sees the state as of this moment.
<-viewStarted
}
func (n *Node) clusterSnapshot(data []byte) (api.ClusterSnapshot, error) {
var snapshot api.Snapshot
if err := snapshot.Unmarshal(data); err != nil {
return snapshot.Membership, err
}
if snapshot.Version != api.Snapshot_V0 {
return snapshot.Membership, fmt.Errorf("unrecognized snapshot version %d", snapshot.Version)
}
if err := n.memoryStore.Restore(&snapshot.Store); err != nil {
return snapshot.Membership, err
}
return snapshot.Membership, nil
}