-
Notifications
You must be signed in to change notification settings - Fork 2k
/
snapshot.go
250 lines (215 loc) · 6.98 KB
/
snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// snapshot manages the interactions between Nomad and Raft in order to take
// and restore snapshots for disaster recovery. The internal format of a
// snapshot is simply a tar file, as described in archive.go.
package snapshot
import (
"compress/gzip"
"crypto/sha256"
"encoding/base64"
"fmt"
"io"
"io/ioutil"
"os"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/raft"
)
// Snapshot is a structure that holds state about a temporary file that is used
// to hold a snapshot. By using an intermediate file we avoid holding everything
// in memory.
type Snapshot struct {
file *os.File
index uint64
checksum string
}
// New takes a state snapshot of the given Raft instance into a temporary file
// and returns an object that gives access to the file as an io.Reader. You must
// arrange to call Close() on the returned object or else you will leak a
// temporary file.
func New(logger hclog.Logger, r *raft.Raft) (*Snapshot, error) {
// Take the snapshot.
future := r.Snapshot()
if err := future.Error(); err != nil {
return nil, fmt.Errorf("Raft error when taking snapshot: %v", err)
}
// Open up the snapshot.
metadata, snap, err := future.Open()
if err != nil {
return nil, fmt.Errorf("failed to open snapshot: %v:", err)
}
defer func() {
if err := snap.Close(); err != nil {
logger.Error("Failed to close Raft snapshot", "error", err)
}
}()
// Make a scratch file to receive the contents so that we don't buffer
// everything in memory. This gets deleted in Close() since we keep it
// around for re-reading.
archive, err := ioutil.TempFile("", "snapshot")
if err != nil {
return nil, fmt.Errorf("failed to create snapshot file: %v", err)
}
// If anything goes wrong after this point, we will attempt to clean up
// the temp file. The happy path will disarm this.
var keep bool
defer func() {
if keep {
return
}
if err := os.Remove(archive.Name()); err != nil {
logger.Error("Failed to clean up temp snapshot", "error", err)
}
}()
hash := sha256.New()
out := io.MultiWriter(hash, archive)
// Wrap the file writer in a gzip compressor.
compressor := gzip.NewWriter(out)
// Write the archive.
if err := write(compressor, metadata, snap); err != nil {
return nil, fmt.Errorf("failed to write snapshot file: %v", err)
}
// Finish the compressed stream.
if err := compressor.Close(); err != nil {
return nil, fmt.Errorf("failed to compress snapshot file: %v", err)
}
// Sync the compressed file and rewind it so it's ready to be streamed
// out by the caller.
if err := archive.Sync(); err != nil {
return nil, fmt.Errorf("failed to sync snapshot: %v", err)
}
if _, err := archive.Seek(0, 0); err != nil {
return nil, fmt.Errorf("failed to rewind snapshot: %v", err)
}
checksum := "sha-256=" + base64.StdEncoding.EncodeToString(hash.Sum(nil))
keep = true
return &Snapshot{archive, metadata.Index, checksum}, nil
}
// Index returns the index of the snapshot. This is safe to call on a nil
// snapshot, it will just return 0.
func (s *Snapshot) Index() uint64 {
if s == nil {
return 0
}
return s.index
}
func (s *Snapshot) Checksum() string {
if s == nil {
return ""
}
return s.checksum
}
// Read passes through to the underlying snapshot file. This is safe to call on
// a nil snapshot, it will just return an EOF.
func (s *Snapshot) Read(p []byte) (n int, err error) {
if s == nil {
return 0, io.EOF
}
return s.file.Read(p)
}
// Close closes the snapshot and removes any temporary storage associated with
// it. You must arrange to call this whenever NewSnapshot() has been called
// successfully. This is safe to call on a nil snapshot.
func (s *Snapshot) Close() error {
if s == nil {
return nil
}
if err := s.file.Close(); err != nil {
return err
}
return os.Remove(s.file.Name())
}
// Verify takes the snapshot from the reader and verifies its contents.
func Verify(in io.Reader) (*raft.SnapshotMeta, error) {
return CopySnapshot(in, ioutil.Discard)
}
// CopySnapshot copies the snapshot content from snapshot archive to dest
func CopySnapshot(in io.Reader, dest io.Writer) (*raft.SnapshotMeta, error) {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(in)
if err != nil {
return nil, fmt.Errorf("failed to decompress snapshot: %v", err)
}
defer decomp.Close()
// Read the archive, throwing away the snapshot data.
var metadata raft.SnapshotMeta
if err := read(decomp, &metadata, dest); err != nil {
return nil, fmt.Errorf("failed to read snapshot file: %v", err)
}
if err := concludeGzipRead(decomp); err != nil {
return nil, err
}
return &metadata, nil
}
// concludeGzipRead should be invoked after you think you've consumed all of
// the data from the gzip stream. It will error if the stream was corrupt.
//
// The docs for gzip.Reader say: "Clients should treat data returned by Read as
// tentative until they receive the io.EOF marking the end of the data."
func concludeGzipRead(decomp *gzip.Reader) error {
extra, err := ioutil.ReadAll(decomp) // ReadAll consumes the EOF
if err != nil {
return err
} else if len(extra) != 0 {
return fmt.Errorf("%d unread uncompressed bytes remain", len(extra))
}
return nil
}
type readWrapper struct {
in io.Reader
c int
}
func (r *readWrapper) Read(b []byte) (int, error) {
n, err := r.in.Read(b)
r.c += n
if err != nil && err != io.EOF {
return n, fmt.Errorf("failed to read after %v: %v", r.c, err)
}
return n, err
}
// Restore takes the snapshot from the reader and attempts to apply it to the
// given Raft instance.
func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
// Wrap the reader in a gzip decompressor.
decomp, err := gzip.NewReader(&readWrapper{in, 0})
if err != nil {
return fmt.Errorf("failed to decompress snapshot: %v", err)
}
defer func() {
if err := decomp.Close(); err != nil {
logger.Error("Failed to close snapshot decompressor", "error", err)
}
}()
// Make a scratch file to receive the contents of the snapshot data so
// we can avoid buffering in memory.
snap, err := ioutil.TempFile("", "snapshot")
if err != nil {
return fmt.Errorf("failed to create temp snapshot file: %v", err)
}
defer func() {
if err := snap.Close(); err != nil {
logger.Error("Failed to close temp snapshot", "error", err)
}
if err := os.Remove(snap.Name()); err != nil {
logger.Error("Failed to clean up temp snapshot", "error", err)
}
}()
// Read the archive.
var metadata raft.SnapshotMeta
if err := read(decomp, &metadata, snap); err != nil {
return fmt.Errorf("failed to read snapshot file: %v", err)
}
if err := concludeGzipRead(decomp); err != nil {
return err
}
// Sync and rewind the file so it's ready to be read again.
if err := snap.Sync(); err != nil {
return fmt.Errorf("failed to sync temp snapshot: %v", err)
}
if _, err := snap.Seek(0, 0); err != nil {
return fmt.Errorf("failed to rewind temp snapshot: %v", err)
}
// Feed the snapshot into Raft.
if err := r.Restore(&metadata, snap, 0); err != nil {
return fmt.Errorf("Raft error when restoring snapshot: %v", err)
}
return nil
}