-
Notifications
You must be signed in to change notification settings - Fork 3.5k
/
manager.go
486 lines (423 loc) · 14.3 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
package snapshots
import (
"bytes"
"crypto/sha256"
"errors"
"fmt"
"io"
"math"
"sort"
"sync"
"github.com/tendermint/tendermint/libs/log"
"github.com/cosmos/cosmos-sdk/snapshots/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)
// Manager manages snapshot and restore operations for an app, making sure only a single
// long-running operation is in progress at any given time, and provides convenience methods
// mirroring the ABCI interface.
//
// Although the ABCI interface (and this manager) passes chunks as byte slices, the internal
// snapshot/restore APIs use IO streams (i.e. chan io.ReadCloser), for two reasons:
//
// 1. In the future, ABCI should support streaming. Consider e.g. InitChain during chain
// upgrades, which currently passes the entire chain state as an in-memory byte slice.
// https://github.com/tendermint/tendermint/issues/5184
//
// 2. io.ReadCloser streams automatically propagate IO errors, and can pass arbitrary
// errors via io.Pipe.CloseWithError().
type Manager struct {
extensions map[string]types.ExtensionSnapshotter
// store is the snapshot store where all completed snapshots are persisted.
store *Store
opts types.SnapshotOptions
// multistore is the store from which snapshots are taken.
multistore types.Snapshotter
logger log.Logger
mtx sync.Mutex
operation operation
chRestore chan<- io.ReadCloser
chRestoreDone <-chan restoreDone
restoreChunkHashes [][]byte
restoreChunkIndex uint32
}
// operation represents a Manager operation. Only one operation can be in progress at a time.
type operation string
// restoreDone represents the result of a restore operation.
type restoreDone struct {
complete bool // if true, restore completed successfully (not prematurely)
err error // if non-nil, restore errored
}
const (
opNone operation = ""
opSnapshot operation = "snapshot"
opPrune operation = "prune"
opRestore operation = "restore"
chunkBufferSize = 4
snapshotMaxItemSize = int(64e6) // SDK has no key/value size limit, so we set an arbitrary limit
)
var ErrOptsZeroSnapshotInterval = errors.New("snaphot-interval must not be 0")
// NewManager creates a new manager.
func NewManager(store *Store, opts types.SnapshotOptions, multistore types.Snapshotter, extensions map[string]types.ExtensionSnapshotter, logger log.Logger) *Manager {
if extensions == nil {
extensions = map[string]types.ExtensionSnapshotter{}
}
return &Manager{
store: store,
opts: opts,
multistore: multistore,
extensions: extensions,
logger: logger,
}
}
// RegisterExtensions register extension snapshotters to manager
func (m *Manager) RegisterExtensions(extensions ...types.ExtensionSnapshotter) error {
if m.extensions == nil {
m.extensions = make(map[string]types.ExtensionSnapshotter, len(extensions))
}
for _, extension := range extensions {
name := extension.SnapshotName()
if _, ok := m.extensions[name]; ok {
return fmt.Errorf("duplicated snapshotter name: %s", name)
}
if !IsFormatSupported(extension, extension.SnapshotFormat()) {
return fmt.Errorf("snapshotter don't support it's own snapshot format: %s %d", name, extension.SnapshotFormat())
}
m.extensions[name] = extension
}
return nil
}
// begin starts an operation, or errors if one is in progress. It manages the mutex itself.
func (m *Manager) begin(op operation) error {
m.mtx.Lock()
defer m.mtx.Unlock()
return m.beginLocked(op)
}
// beginLocked begins an operation while already holding the mutex.
func (m *Manager) beginLocked(op operation) error {
if op == opNone {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "can't begin a none operation")
}
if m.operation != opNone {
return sdkerrors.Wrapf(sdkerrors.ErrConflict, "a %v operation is in progress", m.operation)
}
m.operation = op
return nil
}
// end ends the current operation.
func (m *Manager) end() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.endLocked()
}
// endLocked ends the current operation while already holding the mutex.
func (m *Manager) endLocked() {
m.operation = opNone
if m.chRestore != nil {
close(m.chRestore)
m.chRestore = nil
}
m.chRestoreDone = nil
m.restoreChunkHashes = nil
m.restoreChunkIndex = 0
}
// GetInterval returns snapshot interval represented in heights.
func (m *Manager) GetInterval() uint64 {
return m.opts.Interval
}
// GetKeepRecent returns snapshot keep-recent represented in heights.
func (m *Manager) GetKeepRecent() uint32 {
return m.opts.KeepRecent
}
// GetSnapshotBlockRetentionHeights returns the number of heights needed
// for block retention. Blocks since the oldest available snapshot must be
// available for state sync nodes to catch up (oldest because a node may be
// restoring an old snapshot while a new snapshot was taken).
func (m *Manager) GetSnapshotBlockRetentionHeights() int64 {
return int64(m.opts.Interval * uint64(m.opts.KeepRecent))
}
// Create creates a snapshot and returns its metadata.
func (m *Manager) Create(height uint64) (*types.Snapshot, error) {
if m == nil {
return nil, sdkerrors.Wrap(sdkerrors.ErrLogic, "no snapshot store configured")
}
defer m.multistore.PruneSnapshotHeight(int64(height))
err := m.begin(opSnapshot)
if err != nil {
return nil, err
}
defer m.end()
latest, err := m.store.GetLatest()
if err != nil {
return nil, sdkerrors.Wrap(err, "failed to examine latest snapshot")
}
if latest != nil && latest.Height >= height {
return nil, sdkerrors.Wrapf(sdkerrors.ErrConflict,
"a more recent snapshot already exists at height %v", latest.Height)
}
// Spawn goroutine to generate snapshot chunks and pass their io.ReadClosers through a channel
ch := make(chan io.ReadCloser)
go m.createSnapshot(height, ch)
return m.store.Save(height, types.CurrentFormat, ch)
}
// createSnapshot do the heavy work of snapshotting after the validations of request are done
// the produced chunks are written to the channel.
func (m *Manager) createSnapshot(height uint64, ch chan<- io.ReadCloser) {
streamWriter := NewStreamWriter(ch)
if streamWriter == nil {
return
}
defer func() {
if err := streamWriter.Close(); err != nil {
streamWriter.CloseWithError(err)
}
}()
if err := m.multistore.Snapshot(height, streamWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
for _, name := range m.sortedExtensionNames() {
extension := m.extensions[name]
// write extension metadata
err := streamWriter.WriteMsg(&types.SnapshotItem{
Item: &types.SnapshotItem_Extension{
Extension: &types.SnapshotExtensionMeta{
Name: name,
Format: extension.SnapshotFormat(),
},
},
})
if err != nil {
streamWriter.CloseWithError(err)
return
}
payloadWriter := func(payload []byte) error {
return types.WriteExtensionPayload(streamWriter, payload)
}
if err := extension.SnapshotExtension(height, payloadWriter); err != nil {
streamWriter.CloseWithError(err)
return
}
}
}
// List lists snapshots, mirroring ABCI ListSnapshots. It can be concurrent with other operations.
func (m *Manager) List() ([]*types.Snapshot, error) {
return m.store.List()
}
// LoadChunk loads a chunk into a byte slice, mirroring ABCI LoadChunk. It can be called
// concurrently with other operations. If the chunk does not exist, nil is returned.
func (m *Manager) LoadChunk(height uint64, format uint32, chunk uint32) ([]byte, error) {
reader, err := m.store.LoadChunk(height, format, chunk)
if err != nil {
return nil, err
}
if reader == nil {
return nil, nil
}
defer reader.Close()
return io.ReadAll(reader)
}
// Prune prunes snapshots, if no other operations are in progress.
func (m *Manager) Prune(retain uint32) (uint64, error) {
err := m.begin(opPrune)
if err != nil {
return 0, err
}
defer m.end()
return m.store.Prune(retain)
}
// Restore begins an async snapshot restoration, mirroring ABCI OfferSnapshot. Chunks must be fed
// via RestoreChunk() until the restore is complete or a chunk fails.
func (m *Manager) Restore(snapshot types.Snapshot) error {
if snapshot.Chunks == 0 {
return sdkerrors.Wrap(types.ErrInvalidMetadata, "no chunks")
}
if uint32(len(snapshot.Metadata.ChunkHashes)) != snapshot.Chunks {
return sdkerrors.Wrapf(types.ErrInvalidMetadata, "snapshot has %v chunk hashes, but %v chunks",
uint32(len(snapshot.Metadata.ChunkHashes)),
snapshot.Chunks)
}
m.mtx.Lock()
defer m.mtx.Unlock()
// check multistore supported format preemptive
if snapshot.Format != types.CurrentFormat {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "snapshot format %v", snapshot.Format)
}
if snapshot.Height == 0 {
return sdkerrors.Wrap(sdkerrors.ErrLogic, "cannot restore snapshot at height 0")
}
if snapshot.Height > uint64(math.MaxInt64) {
return sdkerrors.Wrapf(types.ErrInvalidMetadata,
"snapshot height %v cannot exceed %v", snapshot.Height, int64(math.MaxInt64))
}
err := m.beginLocked(opRestore)
if err != nil {
return err
}
// Start an asynchronous snapshot restoration, passing chunks and completion status via channels.
chChunks := make(chan io.ReadCloser, chunkBufferSize)
chDone := make(chan restoreDone, 1)
go func() {
err := m.restoreSnapshot(snapshot, chChunks)
chDone <- restoreDone{
complete: err == nil,
err: err,
}
close(chDone)
}()
m.chRestore = chChunks
m.chRestoreDone = chDone
m.restoreChunkHashes = snapshot.Metadata.ChunkHashes
m.restoreChunkIndex = 0
return nil
}
// restoreSnapshot do the heavy work of snapshot restoration after preliminary checks on request have passed.
func (m *Manager) restoreSnapshot(snapshot types.Snapshot, chChunks <-chan io.ReadCloser) error {
var nextItem types.SnapshotItem
streamReader, err := NewStreamReader(chChunks)
if err != nil {
return err
}
defer streamReader.Close()
// payloadReader reads an extension payload for extension snapshotter, it returns `io.EOF` at extension boundaries.
payloadReader := func() ([]byte, error) {
nextItem.Reset()
if err := streamReader.ReadMsg(&nextItem); err != nil {
return nil, err
}
payload := nextItem.GetExtensionPayload()
if payload == nil {
return nil, io.EOF
}
return payload.Payload, nil
}
nextItem, err = m.multistore.Restore(snapshot.Height, snapshot.Format, streamReader)
if err != nil {
return sdkerrors.Wrap(err, "multistore restore")
}
for {
if nextItem.Item == nil {
// end of stream
break
}
metadata := nextItem.GetExtension()
if metadata == nil {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown snapshot item %T", nextItem.Item)
}
extension, ok := m.extensions[metadata.Name]
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrLogic, "unknown extension snapshotter %s", metadata.Name)
}
if !IsFormatSupported(extension, metadata.Format) {
return sdkerrors.Wrapf(types.ErrUnknownFormat, "format %v for extension %s", metadata.Format, metadata.Name)
}
if err := extension.RestoreExtension(snapshot.Height, metadata.Format, payloadReader); err != nil {
return sdkerrors.Wrapf(err, "extension %s restore", metadata.Name)
}
if nextItem.GetExtensionPayload() != nil {
return sdkerrors.Wrapf(err, "extension %s don't exhausted payload stream", metadata.Name)
}
}
return nil
}
// RestoreChunk adds a chunk to an active snapshot restoration, mirroring ABCI ApplySnapshotChunk.
// Chunks must be given until the restore is complete, returning true, or a chunk errors.
func (m *Manager) RestoreChunk(chunk []byte) (bool, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
if m.operation != opRestore {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "no restore operation in progress")
}
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "received unexpected chunk")
}
// Check if any errors have occurred yet.
select {
case done := <-m.chRestoreDone:
m.endLocked()
if done.err != nil {
return false, done.err
}
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended unexpectedly")
default:
}
// Verify the chunk hash.
hash := sha256.Sum256(chunk)
expected := m.restoreChunkHashes[m.restoreChunkIndex]
if !bytes.Equal(hash[:], expected) {
return false, sdkerrors.Wrapf(types.ErrChunkHashMismatch,
"expected %x, got %x", hash, expected)
}
// Pass the chunk to the restore, and wait for completion if it was the final one.
m.chRestore <- io.NopCloser(bytes.NewReader(chunk))
m.restoreChunkIndex++
if int(m.restoreChunkIndex) >= len(m.restoreChunkHashes) {
close(m.chRestore)
m.chRestore = nil
done := <-m.chRestoreDone
m.endLocked()
if done.err != nil {
return false, done.err
}
if !done.complete {
return false, sdkerrors.Wrap(sdkerrors.ErrLogic, "restore ended prematurely")
}
return true, nil
}
return false, nil
}
// sortedExtensionNames sort extension names for deterministic iteration.
func (m *Manager) sortedExtensionNames() []string {
names := make([]string, 0, len(m.extensions))
for name := range m.extensions {
names = append(names, name)
}
sort.Strings(names)
return names
}
// IsFormatSupported returns if the snapshotter supports restoration from given format.
func IsFormatSupported(snapshotter types.ExtensionSnapshotter, format uint32) bool {
for _, i := range snapshotter.SupportedFormats() {
if i == format {
return true
}
}
return false
}
// SnapshotIfApplicable takes a snapshot of the current state if we are on a snapshot height.
// It also prunes any old snapshots.
func (m *Manager) SnapshotIfApplicable(height int64) {
if m == nil {
return
}
if !m.shouldTakeSnapshot(height) {
m.logger.Debug("snapshot is skipped", "height", height)
return
}
m.snapshot(height)
}
// shouldTakeSnapshot returns true is snapshot should be taken at height.
func (m *Manager) shouldTakeSnapshot(height int64) bool {
return m.opts.Interval > 0 && uint64(height)%m.opts.Interval == 0
}
func (m *Manager) snapshot(height int64) {
m.logger.Info("creating state snapshot", "height", height)
if height <= 0 {
m.logger.Error("snapshot height must be positive", "height", height)
return
}
snapshot, err := m.Create(uint64(height))
if err != nil {
m.logger.Error("failed to create state snapshot", "height", height, "err", err)
return
}
m.logger.Info("completed state snapshot", "height", height, "format", snapshot.Format)
if m.opts.KeepRecent > 0 {
m.logger.Debug("pruning state snapshots")
pruned, err := m.Prune(m.opts.KeepRecent)
if err != nil {
m.logger.Error("Failed to prune state snapshots", "err", err)
return
}
m.logger.Debug("pruned state snapshots", "pruned", pruned)
}
}