/
pages.go
253 lines (215 loc) · 5.55 KB
/
pages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
package store
import (
"github.com/Symantec/scotty/store/btreepq"
"github.com/google/btree"
"reflect"
"sort"
)
// This file contains all the code related to an individual page in scotty
var (
kTsAndValueSize = tsAndValueSize()
)
func tsAndValueSize() uint {
var p pageType
return uint(reflect.TypeOf(p).Elem().Size())
}
// basicPageType is the interface that all page data must implement
type basicPageType interface {
Clear()
IsFull() bool
FindGreaterOrEqual(ts float64) int
FindGreater(ts float64) int
Len() int
StoreIndexToRecord(idx int, record *Record)
}
// We use this same struct for storing timestamp value pairs and for
// storing just timestamps so that we can use a given page for either data
// structure without needing to use the "unsafe" package.
type tsValueType struct {
TimeStamp float64
Value interface{} // Wasted when we store only timestamps.
}
// a single page of timestamps
type tsPageType []tsValueType
func (p tsPageType) IsFull() bool {
return len(p) == cap(p)
}
func (p *tsPageType) Clear() {
*p = (*p)[:0]
}
func (p tsPageType) Latest() (latest float64, ok bool) {
length := len(p)
if length == 0 {
return
}
latest = p[length-1].TimeStamp
ok = true
return
}
func (p tsPageType) Len() int {
return len(p)
}
func (p *tsPageType) Add(ts float64) {
length := len(*p)
*p = (*p)[0 : length+1]
(*p)[length].TimeStamp = ts
}
func (p tsPageType) StoreIndexToRecord(idx int, record *Record) {
record.TimeStamp = p[idx].TimeStamp
}
func (p tsPageType) FindGreaterOrEqual(ts float64) int {
return sort.Search(
len(p),
func(idx int) bool { return p[idx].TimeStamp >= ts })
}
func (p tsPageType) FindGreater(ts float64) int {
return sort.Search(
len(p),
func(idx int) bool { return p[idx].TimeStamp > ts })
}
// single page of timestamps with values
type pageType []tsValueType
func (p pageType) Len() int {
return len(p)
}
func (p *pageType) Add(val tsValueType) {
length := len(*p)
*p = (*p)[0 : length+1]
(*p)[length] = val
}
func (p *pageType) Clear() {
*p = (*p)[:0]
}
func (p pageType) Latest() (latest float64, ok bool) {
length := len(p)
if length == 0 {
return
}
latest = p[length-1].TimeStamp
ok = true
return
}
func (p pageType) IsFull() bool {
return len(p) == cap(p)
}
func (p pageType) StoreIndexToRecord(idx int, record *Record) {
record.TimeStamp = p[idx].TimeStamp
record.setValue(p[idx].Value)
}
func (p pageType) FindGreaterOrEqual(ts float64) int {
return sort.Search(
len(p),
func(idx int) bool { return p[idx].TimeStamp >= ts })
}
func (p pageType) FindGreater(ts float64) int {
return sort.Search(
len(p),
func(idx int) bool { return p[idx].TimeStamp > ts })
}
// Meta data for page
type pageMetaDataType struct {
seqNo uint64
ts float64
owner pageOwnerType
}
func (m *pageMetaDataType) SetSeqNo(i uint64) {
m.seqNo = i
}
func (m *pageMetaDataType) SeqNo() uint64 {
return m.seqNo
}
func (m *pageMetaDataType) SetTS(ts float64) {
m.ts = ts
}
func (m *pageMetaDataType) TS() float64 {
return m.ts
}
// Represents an actual page in scotty. These pages can either hold timestmps
// value pairs or just timestamps.
// These pages implement github.com/google/btree.Item
type pageWithMetaDataType struct {
// page queue lock protects this.
pageMetaDataType
// Lock of current page owner protects these.
values []tsValueType
}
func newPageWithMetaDataType(bytesPerPage uint) *pageWithMetaDataType {
return &pageWithMetaDataType{
values: make(pageType, 0, bytesPerPage/kTsAndValueSize)}
}
// As timestamp value pairs
func (p *pageWithMetaDataType) Values() *pageType {
return (*pageType)(&p.values)
}
// As timestamps
func (p *pageWithMetaDataType) Times() *tsPageType {
return (*tsPageType)(&p.values)
}
// As timestamp value pairs
func (p *pageWithMetaDataType) ValuePage() basicPageType {
return p.Values()
}
// As timestamps
func (p *pageWithMetaDataType) TimePage() basicPageType {
return p.Times()
}
// github.com/google/btree.Item
func (p *pageWithMetaDataType) Less(than btree.Item) bool {
pthan := than.(btreepq.Page)
return btreepq.IsPageLessThanThat(p, pthan)
}
// Fetch iterates over page data p from time start to time end.
// Fetch adds the data to result in descending order by time starting
// just before end and ending on or before start.
// start and end are seconds after Jan 1 1970.
// If caller should add more data from previous pages because caller is not
// back far enough in time, Fetch returns true. Otherwise Fetch returns false.
func Fetch(
p basicPageType,
start, end float64,
record *Record,
result Appender) (keepGoing bool) {
lastIdx := p.FindGreaterOrEqual(end)
if lastIdx == 0 {
return true
}
firstIdx := p.FindGreater(start) - 1
if firstIdx < 0 {
keepGoing = true
firstIdx = 0
}
for i := lastIdx - 1; i >= firstIdx; i-- {
p.StoreIndexToRecord(i, record)
if !result.Append(record) {
return false
}
}
return
}
// FetchForward works like Fetch but adds data to result in ascending order
// by time. If caller should continue to subsequent pages FetchForward returns
// true.
// Unlike Fetch where caller can start at the last page, it is the caller's
// responsibility to back up to the right page before calling FetchForward for
// the first time.
func FetchForward(
p basicPageType,
start, end float64,
record *Record,
result Appender) (keepGoing bool) {
firstIdx := p.FindGreater(start) - 1
if firstIdx < 0 {
firstIdx = 0
}
lastIdx := p.FindGreaterOrEqual(end)
if lastIdx == p.Len() {
keepGoing = true
}
for i := firstIdx; i < lastIdx; i++ {
p.StoreIndexToRecord(i, record)
if !result.Append(record) {
return false
}
}
return
}