-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
snapshot.go
135 lines (121 loc) · 3.22 KB
/
snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package snapshot
import (
"encoding/gob"
"net/url"
"os"
"sync"
"time"
"github.com/btccom/go-micro/v2/store"
"github.com/pkg/errors"
)
// Snapshot creates snapshots of a go-micro store
type Snapshot interface {
// Init validates the Snapshot options and returns an error if they are invalid.
// Init must be called before the Snapshot is used
Init(opts ...SnapshotOption) error
// Start opens a channel that receives *store.Record, adding any incoming records to a backup
// close() the channel to commit the results.
Start() (chan<- *store.Record, error)
// Wait waits for any operations to be committed to underlying storage
Wait()
}
// SnapshotOptions configure a snapshotter
type SnapshotOptions struct {
Destination string
}
// SnapshotOption is an individual option
type SnapshotOption func(s *SnapshotOptions)
// Destination is the URL to snapshot to, e.g. file:///path/to/file
func Destination(dest string) SnapshotOption {
return func(s *SnapshotOptions) {
s.Destination = dest
}
}
// FileSnapshot backs up incoming records to a File
type FileSnapshot struct {
Options SnapshotOptions
records chan *store.Record
path string
encoder *gob.Encoder
file *os.File
wg *sync.WaitGroup
}
// NewFileSnapshot returns a FileSnapshot
func NewFileSnapshot(opts ...SnapshotOption) Snapshot {
f := &FileSnapshot{wg: &sync.WaitGroup{}}
for _, o := range opts {
o(&f.Options)
}
return f
}
// Init validates the options
func (f *FileSnapshot) Init(opts ...SnapshotOption) error {
for _, o := range opts {
o(&f.Options)
}
u, err := url.Parse(f.Options.Destination)
if err != nil {
return errors.Wrap(err, "destination is invalid")
}
if u.Scheme != "file" {
return errors.Errorf("unsupported scheme %s (wanted file)", u.Scheme)
}
if f.wg == nil {
f.wg = &sync.WaitGroup{}
}
f.path = u.Path
return nil
}
// Start opens a channel which recieves *store.Record and writes them to storage
func (f *FileSnapshot) Start() (chan<- *store.Record, error) {
if f.records != nil || f.encoder != nil || f.file != nil {
return nil, errors.New("Snapshot is already in use")
}
fi, err := os.OpenFile(f.path, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0o600)
if err != nil {
return nil, errors.Wrapf(err, "couldn't open file %s", f.path)
}
f.encoder = gob.NewEncoder(fi)
f.file = fi
f.records = make(chan *store.Record)
go f.receiveRecords(f.records)
return f.records, nil
}
// Wait waits for the snapshotter to commit the backups to persistent storage
func (f *FileSnapshot) Wait() {
f.wg.Wait()
}
func (f *FileSnapshot) receiveRecords(rec <-chan *store.Record) {
f.wg.Add(1)
for {
r, more := <-rec
if !more {
println("Stopping FileSnapshot")
f.file.Close()
f.encoder = nil
f.file = nil
f.records = nil
break
}
ir := record{
Key: r.Key,
}
if r.Expiry != 0 {
ir.ExpiresAt = time.Now().Add(r.Expiry)
}
ir.Value = make([]byte, len(r.Value))
copy(ir.Value, r.Value)
if err := f.encoder.Encode(ir); err != nil {
// only thing to do here is panic
panic(errors.Wrap(err, "couldn't write to file"))
}
println("encoded", ir.Key)
}
f.wg.Done()
}
// record is a store.Record when serialised to persistent storage.
type record struct {
Key string
Value []byte
ExpiresAt time.Time
}