forked from asonawalla/gazette
/
fsm.go
321 lines (275 loc) · 9.93 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
package recoverylog
import (
"fmt"
"hash/crc32"
"sort"
pb "github.com/LiveRamp/gazette/v2/pkg/protocol"
)
var (
ErrChecksumMismatch = fmt.Errorf("checksum mismatch")
ErrExpectedHintedFnode = fmt.Errorf("op is not create, and an Fnode was hinted")
ErrFnodeNotTracked = fmt.Errorf("fnode not tracked")
ErrLinkExists = fmt.Errorf("link exists")
ErrNoSuchLink = fmt.Errorf("fnode has no such link")
ErrNotHintedAuthor = fmt.Errorf("op author does not match the next hinted author")
ErrPropertyExists = fmt.Errorf("property exists")
ErrWrongSeqNo = fmt.Errorf("wrong sequence number")
crcTable = crc32.MakeTable(crc32.Castagnoli)
)
// An Fnode is an identifier which represents a file across its renames, links,
// and un-links within a file-system. When a file is created, it's assigned an
// Fnode value equal to the RecordedOp.SeqNo which created it.
type Fnode int64
// Author is a random, unique ID which identifies a processes that creates RecordedOps.
// It's used by FSM to allow for reconciliation of divergent histories in the recovery
// log, through hints as to which Authors produced which Segments in the final history.
type Author uint32
// fnodeState is the state of an individual Fnode as tracked by FSM.
type fnodeState struct {
// Links is the current set of filesystem paths (hard-links) of this Fnode.
Links map[string]struct{}
// Segments is the ordered set of log Segments containing the Fnode's operations.
// It's tracked only for the production of FSMHints.
Segments []Segment
}
// FSM implements a finite state machine over RecordedOp. In particular FSM
// applies RecordedOp in order, verifying the SeqNo and Checksum of each
// operation. This ensures that only operations which are linear and
// consistent are applied.
type FSM struct {
// Recovery log which this FSM tracks operations of.
Log pb.Journal
// Expected sequence number and checksum of next operation.
NextSeqNo int64
NextChecksum uint32
// Target paths and contents of small files which are managed outside of
// regular Fnode tracking. Property updates are triggered upon rename of
// a tracked Fnode to a well-known property file path.
//
// Property paths must be "sinks" which:
// * Are never directly written to.
// * Are never renamed or linked from.
// * Have exactly one hard-link (eg, only "rename" to the property path is
// supported; "link" is not as it would introduce a second hard-link).
Properties map[string]string
// Maps from Fnode to current state of the node.
LiveNodes map[Fnode]*fnodeState
// Indexes current target paths of LiveNodes.
Links map[string]Fnode
// Ordered, non-overlapping segments of log to process.
hintedSegments []Segment
// Ordered Fnodes which are still live at |hintedSegments| completion.
hintedFnodes []Fnode
}
// LiveLogSegments flattens hinted LiveNodes into an ordered list of Fnodes,
// and the set of recovery log Segments which fully contain them.
func (m FSMHints) LiveLogSegments() ([]Fnode, SegmentSet, error) {
var fnodes []Fnode
var set SegmentSet
for i, n := range m.LiveNodes {
if len(n.Segments) == 0 || Fnode(n.Segments[0].FirstSeqNo) != n.Fnode {
return nil, nil, fmt.Errorf("expected Fnode to match Segment FirstSeqNo: %v", n)
} else if i != 0 && fnodes[i-1] >= n.Fnode {
return nil, nil, fmt.Errorf("expected monotonic Fnode ordering: %v vs %v", fnodes[i-1], n.Fnode)
}
fnodes = append(fnodes, n.Fnode)
for _, s := range n.Segments {
if err := set.Add(s); err != nil {
return nil, nil, err
}
}
}
return fnodes, set, nil
}
// NewFSM returns an FSM which is prepared to apply the provided |hints|.
func NewFSM(hints FSMHints) (*FSM, error) {
if hints.Log == "" {
return nil, fmt.Errorf("hinted log not provided")
}
var fnodes, set, err = hints.LiveLogSegments()
if err != nil {
return nil, err
}
var fsm = &FSM{
Log: hints.Log,
NextSeqNo: 1,
NextChecksum: 0,
Properties: make(map[string]string),
LiveNodes: make(map[Fnode]*fnodeState),
Links: make(map[string]Fnode),
hintedFnodes: fnodes,
}
if len(set) != 0 {
fsm.NextSeqNo, fsm.NextChecksum = set[0].FirstSeqNo, set[0].FirstChecksum
fsm.hintedSegments = []Segment(set)
}
// Flatten hinted properties into |fsm|.
for _, p := range hints.Properties {
fsm.Properties[p.Path] = p.Content
}
return fsm, nil
}
// Apply attempts to transition the FSMs state by |op| & |frame|. It either
// performs a transition, or returns an error detailing how the operation
// is not consistent with prior applied operations or hints. A state
// transition occurs if and only if the returned err is nil, with one
// exception: ErrFnodeNotTracked may be returned to indicate that the operation
// is consistent and a transition occurred, but that FSMHints also indicate the
// Fnode no longer exists at the point-in-time at which the FSMHints were created,
// and the caller may therefor want to skip corresponding local playback actions.
func (m *FSM) Apply(op *RecordedOp, frame []byte) error {
// If hints remain, ensure that op.Author is the expected next operation author.
if len(m.hintedSegments) != 0 && m.hintedSegments[0].Author != op.Author {
// This may be a consistent operation, but is written by a non-hinted
// Author for the next SeqNo of interest: the operation represents a
// (likely dead) branch in recovery-log history relative to the
// FSMHints we're re-building.
return ErrNotHintedAuthor
}
if op.SeqNo != m.NextSeqNo {
return ErrWrongSeqNo
} else if op.Checksum != m.NextChecksum {
return ErrChecksumMismatch
}
// Note apply*() functions do not modify FSM state if they return an error.
var err error
if op.Create != nil {
err = m.applyCreate(op)
} else if len(m.hintedFnodes) != 0 && int64(m.hintedFnodes[0]) == op.SeqNo {
err = ErrExpectedHintedFnode
} else if op.Link != nil {
err = m.applyLink(op)
} else if op.Unlink != nil {
err = m.applyUnlink(op)
} else if op.Write != nil {
err = m.applyWrite(op)
} else if op.Property != nil {
err = m.applyProperty(op.Property)
}
if err != nil && err != ErrFnodeNotTracked {
// No state transition (or FSM mutation) occurred.
return err
}
// Step the FSM to the next state.
m.NextSeqNo += 1
m.NextChecksum = crc32.Update(m.NextChecksum, crcTable, frame)
// If we've exhausted the current hinted Segment, pop and skip to the next.
if len(m.hintedSegments) != 0 && m.hintedSegments[0].LastSeqNo < m.NextSeqNo {
m.hintedSegments = m.hintedSegments[1:]
if len(m.hintedSegments) != 0 {
m.NextSeqNo = m.hintedSegments[0].FirstSeqNo
m.NextChecksum = m.hintedSegments[0].FirstChecksum
}
}
return err
}
func (m *FSM) applyCreate(op *RecordedOp) error {
if _, ok := m.Links[op.Create.Path]; ok {
return ErrLinkExists
} else if _, ok := m.Properties[op.Create.Path]; ok {
return ErrPropertyExists
}
// Assigned fnode ID is the SeqNo of the current operation.
var fnode = Fnode(op.SeqNo)
// Determine whether |fnode| is hinted.
if len(m.hintedFnodes) != 0 {
if m.hintedFnodes[0] != fnode {
return ErrFnodeNotTracked
}
m.hintedFnodes = m.hintedFnodes[1:] // Pop hint.
}
var node = &fnodeState{Links: map[string]struct{}{op.Create.Path: {}}}
node.Segments = m.extendSegments(node.Segments, op)
m.LiveNodes[fnode] = node
m.Links[op.Create.Path] = fnode
return nil
}
func (m *FSM) applyLink(op *RecordedOp) error {
if _, ok := m.Links[op.Link.Path]; ok {
return ErrLinkExists
} else if _, ok := m.Properties[op.Link.Path]; ok {
return ErrPropertyExists
}
var node, ok = m.LiveNodes[op.Link.Fnode]
if !ok {
return ErrFnodeNotTracked
}
node.Links[op.Link.Path] = struct{}{}
m.Links[op.Link.Path] = op.Link.Fnode
node.Segments = m.extendSegments(node.Segments, op)
return nil
}
func (m *FSM) applyUnlink(op *RecordedOp) error {
var node, ok = m.LiveNodes[op.Unlink.Fnode]
if !ok {
return ErrFnodeNotTracked
} else if _, ok = node.Links[op.Unlink.Path]; !ok {
return ErrNoSuchLink
}
delete(m.Links, op.Unlink.Path)
delete(node.Links, op.Unlink.Path)
node.Segments = m.extendSegments(node.Segments, op)
if len(node.Links) == 0 {
// Fnode is no longer live (all links are removed).
delete(m.LiveNodes, op.Unlink.Fnode)
}
return nil
}
func (m *FSM) applyWrite(op *RecordedOp) error {
var node, ok = m.LiveNodes[op.Write.Fnode]
if !ok {
return ErrFnodeNotTracked
}
node.Segments = m.extendSegments(node.Segments, op)
return nil
}
func (m *FSM) applyProperty(op *Property) error {
if _, ok := m.Links[op.Path]; ok {
return ErrLinkExists
} else if content, ok := m.Properties[op.Path]; ok && content != op.Content {
return ErrPropertyExists
}
if m.Properties == nil {
m.Properties = make(map[string]string)
}
m.Properties[op.Path] = op.Content
return nil
}
// BuildHints constructs FSMHints which enable a future FSM to rebuild this FSM's state.
func (m *FSM) BuildHints() FSMHints {
var hints = FSMHints{Log: m.Log}
// Flatten LiveNodes into deep-copied FnodeSegments.
for fnode, state := range m.LiveNodes {
hints.LiveNodes = append(hints.LiveNodes, FnodeSegments{
Fnode: fnode,
Segments: append([]Segment(nil), state.Segments...),
})
}
// Order LiveNodes on ascending Fnode ID, which is also the order LiveNodes will appear in the log.
sort.Slice(hints.LiveNodes, func(i, j int) bool {
return hints.LiveNodes[i].Fnode < hints.LiveNodes[j].Fnode
})
// Flatten properties.
for path, content := range m.Properties {
hints.Properties = append(hints.Properties, Property{Path: path, Content: content})
}
return hints
}
func (m *FSM) hasRemainingHints() bool {
return len(m.hintedSegments) != 0 || len(m.hintedFnodes) != 0
}
func (m *FSM) extendSegments(s []Segment, op *RecordedOp) []Segment {
if l := len(s) - 1; l >= 0 && s[l].Author == op.Author {
s[l].LastSeqNo = op.SeqNo
s[l].LastOffset = op.LastOffset
return s
}
return append(s, Segment{
Author: op.Author,
FirstSeqNo: op.SeqNo,
FirstOffset: op.FirstOffset,
FirstChecksum: op.Checksum,
LastSeqNo: op.SeqNo,
LastOffset: op.LastOffset,
})
}