forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cursor.go
154 lines (128 loc) · 3.67 KB
/
cursor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package tsdb
import (
"bytes"
"container/heap"
)
// Direction represents a cursor navigation direction.
type Direction bool
const (
// Forward indicates that a cursor will move forward over its values.
Forward Direction = true
// Reverse indicates that a cursor will move backwards over its values.
Reverse Direction = false
)
func (d Direction) String() string {
if d.Forward() {
return "forward"
}
return "reverse"
}
// Forward returns true if direction is forward
func (d Direction) Forward() bool {
return d == Forward
}
// Forward returns true if direction is reverse
func (d Direction) Reverse() bool {
return d == Reverse
}
// MultiCursor returns a single cursor that combines the results of all cursors in order.
//
// If the same key is returned from multiple cursors then the first cursor
// specified will take precendence. A key will only be returned once from the
// returned cursor.
func MultiCursor(d Direction, cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors, direction: d}
}
// multiCursor represents a cursor that combines multiple cursors into one.
type multiCursor struct {
cursors []Cursor
heap cursorHeap
prev []byte
direction Direction
}
// Seek moves the cursor to a given key.
func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
// Initialize heap.
h := make(cursorHeap, 0, len(mc.cursors))
for i, c := range mc.cursors {
// Move cursor to position. Skip if it's empty.
k, v := c.Seek(seek)
if k == nil {
continue
}
// Append cursor to heap.
h = append(h, &cursorHeapItem{
key: k,
value: v,
cursor: c,
priority: len(mc.cursors) - i,
})
}
heap.Init(&h)
mc.heap = h
mc.prev = nil
return mc.pop()
}
func (mc *multiCursor) Direction() Direction { return mc.direction }
// Next returns the next key/value from the cursor.
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }
// pop returns the next item from the heap.
// Reads the next key/value from item's cursor and puts it back on the heap.
func (mc *multiCursor) pop() (key, value []byte) {
// Read items until we have a key that doesn't match the previously read one.
// This is to perform deduplication when there's multiple items with the same key.
// The highest priority cursor will be read first and then remaining keys will be dropped.
for {
// Return nil if there are no more items left.
if len(mc.heap) == 0 {
return nil, nil
}
// Read the next item from the heap.
item := heap.Pop(&mc.heap).(*cursorHeapItem)
// Save the key/value for return.
key, value = item.key, item.value
// Read the next item from the cursor. Push back to heap if one exists.
if item.key, item.value = item.cursor.Next(); item.key != nil {
heap.Push(&mc.heap, item)
}
// Skip if this key matches the previously returned one.
if bytes.Equal(mc.prev, key) {
continue
}
mc.prev = key
return
}
}
// cursorHeap represents a heap of cursorHeapItems.
type cursorHeap []*cursorHeapItem
func (h cursorHeap) Len() int { return len(h) }
func (h cursorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h cursorHeap) Less(i, j int) bool {
dir := -1
if !h[i].cursor.Direction() {
dir = 1
}
if cmp := bytes.Compare(h[i].key, h[j].key); cmp == dir {
return true
} else if cmp == 0 {
return h[i].priority > h[j].priority
}
return false
}
func (h *cursorHeap) Push(x interface{}) {
*h = append(*h, x.(*cursorHeapItem))
}
func (h *cursorHeap) Pop() interface{} {
old := *h
n := len(old)
item := old[n-1]
*h = old[0 : n-1]
return item
}
// cursorHeapItem is something we manage in a priority queue.
type cursorHeapItem struct {
key []byte
value []byte
cursor Cursor
priority int
}