forked from gazette/core
-
Notifications
You must be signed in to change notification settings - Fork 2
/
segment.go
182 lines (162 loc) · 6.13 KB
/
segment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package recoverylog
import (
"errors"
"fmt"
"sort"
)
// Validate returns an error if the Segment is inconsistent.
func (s Segment) Validate() error {
if s.Author == 0 {
return errors.New("segment.Author is zero")
} else if s.FirstSeqNo <= 0 {
return errors.New("segment.FirstSeqNo <= 0")
} else if s.FirstOffset < 0 {
return errors.New("segment.FirstOffset < 0")
} else if s.LastSeqNo < s.FirstSeqNo {
return errors.New("segment.LastSeqNo < segment.FirstSeqNo")
} else if s.LastOffset != 0 && s.LastOffset <= s.FirstOffset {
return errors.New("segment.LastOffset <= segment.FirstOffset")
}
return nil
}
// SegmentSet is a collection of Segment with the following invariants:
// * Entries have strictly increasing SeqNo, and are non-overlapping
// (s[i].LastSeqNo < s[i+1].SeqNo; note this implies a single author
// for any covered SeqNo).
// * Entries have monotonically increasing FirstOffset.
// * Entries have monotonically increasing LastOffset, however a strict
// suffix of entries are permitted to have a LastOffset of zero (implied
// infinite LastOffset).
type SegmentSet []Segment
// Add a Segment to this SegmentSet. An error is returned
// if |segment| would result in an inconsistent SegmentSet.
func (s *SegmentSet) Add(segment Segment) error {
if err := segment.Validate(); err != nil {
return err
}
// Find the insertion range [begin, end) of |segment|.
var begin = sort.Search(len(*s), func(i int) bool {
return (*s)[i].FirstSeqNo >= segment.FirstSeqNo
})
// Attempt to reduce |segment| with the previous Segment. The SegmentSet
// invariant ensures there can be at most one such reduction (FirstSeqNo is
// strictly increasing).
if begin > 0 {
if s, err := reduceSegment((*s)[begin-1], segment); err == nil {
segment, begin = s, begin-1
} else if err == errNotReducible {
// Pass.
} else {
return err
}
}
// Perform iterative reductions of |segment| with following Segments of the set.
var end = begin
for ; end != len(*s); end++ {
if s, err := reduceSegment((*s)[end], segment); err == nil {
segment = s
continue
} else if err == errNotReducible {
break
} else {
return err
}
}
// Splice |segment| into the SegmentSet, replacing |begin| to |end| (exclusive).
if begin == len(*s) {
*s = append(*s, segment)
} else {
*s = append((*s)[:begin+1], (*s)[end:]...)
(*s)[begin] = segment
}
return nil
}
// Intersect returns a slice of this SegmentSet which overlaps with
// the provided [firstOffset, lastOffset) range.
func (s SegmentSet) Intersect(firstOffset, lastOffset int64) SegmentSet {
if lastOffset < firstOffset {
panic("invalid range")
}
// Find the first Segment which has a FirstOffset >= |lastOffset|, if one exists.
// This is the (exclusive) end of our range.
var j = sort.Search(len(s), func(i int) bool {
return s[i].FirstOffset >= lastOffset
})
// Walk backwards until we find a Segment having LastOffset != 0 and
// <= |firstOffset|, which is the (exclusive) beginning of our range. Since
// we enforce LastOffset monotonicity, and also that only a strict suffix
// may have LastOffset == 0, we're guaranteed no preceding Segment may overlap.
var i = j
for ; i != 0 && (s[i-1].LastOffset == 0 || s[i-1].LastOffset > firstOffset); i-- {
}
return s[i:j:j]
}
// reduceSegment returns a single Segment which is the reduction of |a| and |b|,
// an error indicating a data-model inconsistency, or errNotReducible if a
// reduction cannot be performed.
func reduceSegment(a, b Segment) (Segment, error) {
// Establish invariants:
// * a.FirstSeqNo <= b.FirstSeqNo.
// * If FirstSeqNo's are equal, then a.LastSeqNo >= b.LastSeqNo.
//
// This leaves the remaining possible cases:
//
// Overlap: Covered: Disjoint:
// |---- a ----| |---- a ----| |-- a --|
// |---- b ----| |-- b --| |-- b --|
//
// Equal-right: Equal-left:
// |---- a ----| |---- a ----|
// |-- b ---| |-- b ---|
//
// Identity: Adjacent:
// |---- a ----| |-- a --|
// |---- b ----| |-- b --|
if a.FirstSeqNo > b.FirstSeqNo ||
a.FirstSeqNo == b.FirstSeqNo && a.LastSeqNo < b.LastSeqNo {
a, b = b, a
}
switch {
// Offset ordering constraint checks.
case a.FirstSeqNo < b.FirstSeqNo && a.FirstOffset > b.FirstOffset:
return a, fmt.Errorf("expected monotonic FirstOffset: %#v vs %#v", a, b)
case a.FirstSeqNo < b.FirstSeqNo && a.LastOffset == 0 && b.LastOffset != 0:
return a, fmt.Errorf("expected preceding Segment to also include LastOffset: %#v vs %#v", a, b)
case a.LastSeqNo <= b.LastSeqNo && nonZeroAndLess(b.LastOffset, a.LastOffset):
fallthrough
case a.LastSeqNo >= b.LastSeqNo && nonZeroAndLess(a.LastOffset, b.LastOffset):
return a, fmt.Errorf("expected monotonic LastOffset: %#v vs %#v", a, b)
// Checksum constraint check.
case a.FirstSeqNo == b.FirstSeqNo && a.FirstChecksum != b.FirstChecksum:
return a, fmt.Errorf("expected FirstChecksum equality: %#v vs %#v", a, b)
// Cases where we cannot reduce further:
case a.LastSeqNo+1 < b.FirstSeqNo: // Disjoint.
return a, errNotReducible
case a.LastSeqNo+1 == b.FirstSeqNo && a.Author != b.Author: // Adjacent, but of different authors.
return a, errNotReducible
// Remaining cases have overlap, and therefor must have been written by the same Author.
case a.Author != b.Author:
return a, fmt.Errorf("expected Segment Author equality: %#v vs %#v", a, b)
case a.FirstSeqNo <= b.FirstSeqNo:
// FirstOffset is a lower-bound. Prefer the tighter bound.
if a.FirstSeqNo == b.FirstSeqNo && a.FirstOffset < b.FirstOffset {
a.FirstOffset = b.FirstOffset
}
// LastOffset is optional. Prefer that it be set. Note we've verified monotonicity already.
if a.LastSeqNo == b.LastSeqNo && a.LastOffset == 0 {
a.LastOffset = b.LastOffset
}
// Extend |a| in the Overlap & Adjacent cases.
if a.LastSeqNo < b.LastSeqNo {
a.LastSeqNo, a.LastOffset = b.LastSeqNo, b.LastOffset
}
return a, nil
default:
panic("not reached")
}
}
// nonZeroAndLess returns whether |a| is non-zero and less-than |b|.
func nonZeroAndLess(a, b int64) bool {
return a != 0 && a < b
}
var errNotReducible = errors.New("not reducible")