forked from asonawalla/gazette
/
spool.go
325 lines (279 loc) · 9.61 KB
/
spool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
package fragment
import (
"crypto/sha1"
"encoding"
"fmt"
"hash"
"io"
"time"
"github.com/LiveRamp/gazette/v2/pkg/codecs"
"github.com/LiveRamp/gazette/v2/pkg/metrics"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
log "github.com/sirupsen/logrus"
)
// Spool is a Fragment which is in the process of being created, backed by a
// local *os.File. As commits occur and the file extent is updated, the Spool
// Fragment is also updated to reflect the new committed extent. At all
// times, the Spool Fragment is a consistent, valid Fragment.
type Spool struct {
// Fragment at time of last commit.
Fragment
// FirstAppendTime is the timestamp of the first append of the current fragment.
FirstAppendTime time.Time
// Compressed form of the Fragment, compressed under Fragment.CompressionCodec.
compressedFile File
// Length of compressed content written to |compressedFile|. Set only after
// the compressor is finalized.
compressedLength int64
// Compressor of |compressedFile|.
compressor codecs.Compressor
delta int64 // Delta offset of next byte to write, relative to Fragment.End.
summer hash.Hash // Running SHA1 of the Fragment.File, through |Fragment.End + delta|.
sumState []byte // SHA1 |summer| internal state at the last Fragment commit.
observer SpoolObserver
}
// SpoolObserver is notified of important events in the Spool lifecycle.
type SpoolObserver interface {
// SpoolCommit is called when the Spool Fragment is extended.
SpoolCommit(Fragment)
// SpoolComplete is called when the Spool has been completed.
SpoolComplete(_ Spool, primary bool)
}
// NewSpool returns an empty Spool of |journal|.
func NewSpool(journal pb.Journal, observer SpoolObserver) Spool {
return Spool{
Fragment: Fragment{Fragment: pb.Fragment{
Journal: journal,
CompressionCodec: pb.CompressionCodec_NONE,
}},
summer: sha1.New(),
sumState: zeroedSHA1State,
observer: observer,
}
}
// Apply the ReplicateRequest to the Spool, returning any encountered error.
func (s *Spool) Apply(r *pb.ReplicateRequest, primary bool) (pb.ReplicateResponse, error) {
if r.Proposal != nil {
return s.applyCommit(r, primary), nil
} else {
return pb.ReplicateResponse{}, s.applyContent(r)
}
}
// MustApply applies the ReplicateRequest, and panics if a !OK status is returned
// or error occurs. MustApply is a convenience for cases such as rollbacks, where
// the request is derived from the Spool itself and cannot reasonably fail.
func (s *Spool) MustApply(r *pb.ReplicateRequest) {
if resp, err := s.Apply(r, false); err != nil {
panic(err.Error())
} else if resp.Status != pb.Status_OK {
panic(resp.Status.String())
}
}
// Next returns the next Fragment which can be committed by the Spool.
func (s *Spool) Next() pb.Fragment {
var f = s.Fragment.Fragment
f.End += s.delta
// Empty fragments are special-cased to have Sum of zero (as technically, SHA1('') != <zero>).
if f.Begin == f.End {
f.Sum = pb.SHA1Sum{}
} else {
f.Sum = pb.SHA1SumFromDigest(s.summer.Sum(nil))
}
return f
}
// String returns a debugging representation of the Spool.
func (s Spool) String() string {
return fmt.Sprintf("Spool<Fragment: %s, delta: %d>", s.Fragment.String(), s.delta)
}
func (s *Spool) applyCommit(r *pb.ReplicateRequest, primary bool) pb.ReplicateResponse {
// Do we need to roll the Spool forward? We do this if the proposal:
//
// 1) References an offset strictly larger than any we're aware of. This case
// happens, eg, when a new peer is introduced to a route and must "catch
// up" with recently written content. It can also happen on recovery from
// network partitions, where some replicas believe a commit occurred and
// others don't (note the Append RPC itself will have failed in this case,
// forcing the client to retry).
//
// 2) Begins at our exact End and has length zero. This case allows a primary
// pipeline to direct replicas to synchronously roll their Spools to a new
// and empty Fragment.
//
if r.Proposal.End > s.Fragment.End+s.delta ||
(r.Proposal.End == s.Fragment.End && r.Proposal.ContentLength() == 0) {
if s.compressor != nil {
s.finishCompression()
}
if s.ContentLength() != 0 {
s.observer.SpoolComplete(*s, primary)
}
*s = Spool{
Fragment: Fragment{
Fragment: pb.Fragment{
Journal: s.Fragment.Journal,
Begin: r.Proposal.End,
End: r.Proposal.End,
CompressionCodec: r.Proposal.CompressionCodec,
},
},
summer: sha1.New(),
sumState: zeroedSHA1State,
observer: s.observer,
}
}
// There are now two commit cases which can succeed:
// 1) Exact commit of current fragment.
// 2) Exact commit of current fragment, extended by |delta|.
// Case 1? "Undo" any partial content, by rolling back |delta| and |summer|.
if s.Fragment.Fragment == *r.Proposal {
s.delta = 0
s.restoreSumState()
return pb.ReplicateResponse{Status: pb.Status_OK}
}
// Case 2? Apply the |delta| bytes spooled since last commit.
if next := s.Next(); next == *r.Proposal {
if primary && s.CompressionCodec != pb.CompressionCodec_NONE {
s.compressThrough(next.End)
}
s.Fragment.Fragment = next
s.observer.SpoolCommit(s.Fragment)
if s.FirstAppendTime.IsZero() {
s.FirstAppendTime = timeNow()
}
metrics.CommittedBytesTotal.Add(float64(s.delta))
s.delta = 0
s.saveSumState()
metrics.CommitsTotal.WithLabelValues(pb.Status_OK.String()).Inc()
return pb.ReplicateResponse{Status: pb.Status_OK}
}
// This proposal cannot apply to our Spool; return an error to the primary.
return pb.ReplicateResponse{
Status: pb.Status_FRAGMENT_MISMATCH,
Fragment: &s.Fragment.Fragment,
}
}
func (s *Spool) applyContent(r *pb.ReplicateRequest) error {
if r.ContentDelta != s.delta {
return pb.NewValidationError("invalid ContentDelta (%d; expected %d)", r.ContentDelta, s.delta)
}
// Create Spool File (if it doesn't exist), and write content at the current
// location. Retry indefinitely on filesystem errors.
var err error
for {
if err != nil {
log.WithField("err", err).Error("failed to applyContent (will retry)")
time.Sleep(spoolRetryInterval)
}
if s.Fragment.File == nil {
if s.ContentLength() != 0 {
panic("Spool.Fragment not empty.")
} else if s.Fragment.File, err = newSpoolFile(); err != nil {
err = fmt.Errorf("creating spool file: %s", err)
continue
}
}
if _, err = s.Fragment.File.WriteAt(r.Content, s.ContentLength()+s.delta); err != nil {
err = fmt.Errorf("writing spool content: %s", err)
continue
}
break // Success.
}
if _, err := s.summer.Write(r.Content); err != nil {
panic("SHA1.Write cannot fail: " + err.Error())
}
s.delta += int64(len(r.Content))
return nil
}
func (s *Spool) compressThrough(end int64) {
if s.CompressionCodec == pb.CompressionCodec_NONE {
panic("expected CompressionCodec != NONE")
}
var err error
// Garden path: we've already compressed all content of the current Fragment,
// and now incrementally compress through |end|.
if s.compressor != nil {
var offset, delta = s.Fragment.ContentLength(), end - s.Fragment.End
if _, err = io.Copy(s.compressor, io.NewSectionReader(s.File, offset, delta)); err == nil {
return // Done.
}
err = fmt.Errorf("while incrementally compressing: %s", err)
s.compressor.Close()
s.compressor = nil
}
// We must build or rebuild compression of the Spool.
for {
if err != nil {
log.WithFields(log.Fields{"err": err, "end": end}).Error("failed to compressThrough (will retry)")
time.Sleep(spoolRetryInterval)
}
if s.compressedFile == nil {
if s.compressedFile, err = newSpoolFile(); err != nil {
err = fmt.Errorf("creating compressed spool file: %s", err)
continue
}
}
if _, err = s.compressedFile.Seek(0, io.SeekStart); err != nil {
err = fmt.Errorf("seeking compressedFile to start: %s", err)
continue
}
if s.compressor, err = codecs.NewCodecWriter(s.compressedFile, s.CompressionCodec); err != nil {
err = fmt.Errorf("initializing compressor: %s", err)
continue
}
if _, err = io.Copy(s.compressor, io.NewSectionReader(s.File, 0, end-s.Fragment.Begin)); err != nil {
err = fmt.Errorf("while compressing: %s", err)
s.compressor.Close()
s.compressor = nil
continue
}
break // Success.
}
}
func (s *Spool) finishCompression() {
if s.CompressionCodec == pb.CompressionCodec_NONE {
panic("expected CompressionCodec != NONE")
} else if s.compressedLength != 0 {
return // Already finalized.
}
var err error
if s.compressor == nil {
s.compressThrough(s.Fragment.End)
}
for {
if err != nil {
log.WithField("err", err).Error("failed to finishCompression (will retry)")
time.Sleep(spoolRetryInterval)
// |compressor| has been invalidated, and must be rebuilt.
s.compressThrough(s.Fragment.End)
}
err = s.compressor.Close()
s.compressor = nil
if err != nil {
err = fmt.Errorf("closing compressor: %s", err)
continue
}
if s.compressedLength, err = s.compressedFile.Seek(0, io.SeekCurrent); err != nil {
err = fmt.Errorf("seeking compressedFile current: %s", err)
continue
}
break // Success.
}
}
// saveSumState marshals internal state of |summer| into |sumState|.
func (s *Spool) saveSumState() {
if state, err := s.summer.(encoding.BinaryMarshaler).MarshalBinary(); err != nil {
panic(err.Error()) // Cannot fail.
} else {
s.sumState = state
}
}
// restoreSumState unmarshals |sumState| into |summer|.
func (s *Spool) restoreSumState() {
if err := s.summer.(encoding.BinaryUnmarshaler).UnmarshalBinary(s.sumState); err != nil {
panic(err.Error()) // Cannot fail.
}
}
var (
zeroedSHA1State, _ = sha1.New().(encoding.BinaryMarshaler).MarshalBinary()
spoolRetryInterval = time.Second * 5
)