/
journalPersister.go
179 lines (158 loc) · 4.69 KB
/
journalPersister.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package persistence
import (
"encoding/gob"
"io"
"io/ioutil"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/syndtr/goleveldb/leveldb/journal"
)
// JournalPersister saves data in an embedded Journal store
type JournalPersister struct {
stream chan gob.GobEncoder // Internal stream so that all writes are ordered
storage Storage
writer *journal.Writer
// leveldb journal Writer sadly doesn't propage close to the underlying writer
// so we are keeping a reference here to close it on Finalize. We could probably
// move to using Gob streams directly on underlying storage
storeWriter io.Closer
}
// NewJournalPersister initializes a Journal backed persister
func NewJournalPersister(s Storage) Persister {
lp := &JournalPersister{
stream: make(chan gob.GobEncoder, 10),
storage: s,
writer: nil,
}
log.Info().Str("store", s.String()).Msg("Created Journal persister with store")
return lp
}
// ResetDataDir tells persister to *delete* everything in the datadir
func (lp *JournalPersister) ResetDataDir() error {
return lp.storage.Reset()
}
// Finalize tells persister that it can finalize and close writes
// It is an error to send new items to persist once Finalize has been called
func (lp *JournalPersister) Finalize() {
log.Info().Msg("JournalPersister:Finalize finalizing persister")
// close db
if lp.writer != nil {
log.Info().Msg("JournalPersister:Finalize closing writer db")
err := lp.writer.Flush()
if err != nil {
log.Error().Err(err).Msg("JournalPersister:Finalize error flushing journal")
}
err = lp.writer.Close()
if err != nil {
log.Error().Err(err).Msg("JournalPersister:Finalize error closing journal writer")
}
}
// close storage writer
if lp.storeWriter != nil {
err := lp.storeWriter.Close()
if err != nil {
log.Error().Err(err).Msg("JournalPersister:Finalize error closing store writer")
}
}
log.Info().Msg("JournalPersister:Finalize done")
}
// Persist stores an entry to given storage
func (lp *JournalPersister) Persist(enc gob.GobEncoder) error {
log.Debug().Msg("JournalPersister:Persist persisting an entry")
err := lp.write(enc)
if err != nil {
log.Error().Err(err).Send()
}
return err
}
// PersistStream listens to the input channel and persists entries to storage
func (lp *JournalPersister) PersistStream(encC chan gob.GobEncoder) chan error {
errC := make(chan error)
go func() {
defer close(errC)
for e := range encC {
log.Debug().Msg("JournalPersister:PersistStream persisting an entry")
if err := lp.write(e); err != nil {
errC <- err
continue
}
}
}()
return errC
}
// Recover reads back persisted data and emits entries
func (lp *JournalPersister) Recover() (chan []byte, error) {
log.Info().Msg("JournalPersister:Recover starting recovery")
// Get a reader from the store
sr, err := lp.storage.Reader()
if err != nil {
err = errors.Wrap(err, "Failed to open store")
log.Error().Err(err).Msg("JournalPersister:Recover")
return nil, err
}
// Get a new journal reader wrapping the store reader
r := journal.NewReader(sr, nil, false, true)
if err != nil {
err = errors.Wrap(err, "Failed to open peristence journal reader")
log.Error().Err(err).Msg("JournalPersister:Recover")
return nil, err
}
log.Info().Msg("JournalPersister:Recover streaming items for recovery")
bufC := make(chan []byte)
go func() {
defer close(bufC)
// Close storage reader
defer sr.Close()
for {
j, err := r.Next()
if err == io.EOF {
break
}
if err != nil {
err = errors.Wrap(err, "Failed to fetch next journal reader")
log.Error().Err(err).Msg("JournalPersister:Recover")
break
}
buf, err := ioutil.ReadAll(j)
if err != nil {
log.Debug().Err(err).Msg("JournalPersister:Recover error reading from journal")
continue
}
bufC <- buf
}
log.Info().Msg("JournalPersister:Recover finished recovery stream")
}()
return bufC, nil
}
func (lp *JournalPersister) write(enc gob.GobEncoder) error {
// lazy init journal writer
if lp.writer == nil {
var err error
log.Info().Msg("JournalPersister:writer starting persistence")
sw, err := lp.storage.Writer()
if err != nil {
err = errors.Wrap(err, "JournalPersister:writer Failed to open store")
log.Error().Err(err).Send()
return err
}
lp.storeWriter = sw
lp.writer = journal.NewWriter(sw)
}
w, err := lp.writer.Next()
if err != nil {
err = errors.Wrap(err, "JournalPersister:writer failed to get next journal writer")
log.Error().Err(err).Send()
return err
}
buf, err := enc.GobEncode()
if err != nil {
return err
}
_, err = w.Write(buf)
if err != nil {
err = errors.Wrap(err, "JournalPersister:writer failed to persist entry")
log.Error().Err(err).Send()
return err
}
return nil
}