-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
iterator.go
173 lines (154 loc) · 4.22 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"bytes"
"fmt"
"k8s.io/apimachinery/pkg/util/sets"
)
// Collect creates a slice of objects out of the iterator.
// The iterator is consumed in the process.
func Collect[Obj any](iter Iterator[Obj]) []Obj {
objs := []Obj{}
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
objs = append(objs, obj)
}
return objs
}
// CollectSet creates a set of objects out of the iterator.
// The iterator is consumed in the process.
func CollectSet[Obj comparable](iter Iterator[Obj]) sets.Set[Obj] {
objs := sets.New[Obj]()
for obj, _, ok := iter.Next(); ok; obj, _, ok = iter.Next() {
objs.Insert(obj)
}
return objs
}
// ProcessEach invokes the given function for each object provided by the iterator.
func ProcessEach[Obj any, It Iterator[Obj]](iter It, fn func(Obj, Revision) error) (err error) {
for obj, rev, ok := iter.Next(); ok; obj, rev, ok = iter.Next() {
err = fn(obj, rev)
if err != nil {
return
}
}
return
}
// iterator adapts the "any" object iterator to a typed object.
type iterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
}
func (it *iterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
_, iobj, ok := it.iter.Next()
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
// uniqueIterator iterates over objects in a unique index. Since
// we find the node by prefix search, we may see a key that shares
// the search prefix but is longer. We skip those objects.
type uniqueIterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
key []byte
}
func (it *uniqueIterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
var iobj object
for {
var key []byte
key, iobj, ok = it.iter.Next()
if !ok || bytes.Equal(key, it.key) {
break
}
}
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
// nonUniqueIterator iterates over a non-unique index. Since we seek by prefix and don't
// require that indexers terminate the keys, the iterator checks that the prefix
// has the right length.
type nonUniqueIterator[Obj any] struct {
iter interface{ Next() ([]byte, object, bool) }
key []byte
}
func (it *nonUniqueIterator[Obj]) Next() (obj Obj, revision uint64, ok bool) {
var iobj object
for {
var key []byte
key, iobj, ok = it.iter.Next()
if !ok {
return
}
_, secondary := decodeNonUniqueKey(key)
// Equal length implies equal key since we got here via
// prefix search and all child nodes share the same prefix.
if len(secondary) == len(it.key) {
break
}
// This node has a longer secondary key that shares our search
// prefix, skip it.
}
if ok {
obj = iobj.data.(Obj)
revision = iobj.revision
}
return
}
func NewDualIterator[Obj any](left, right Iterator[Obj]) *DualIterator[Obj] {
return &DualIterator[Obj]{
left: iterState[Obj]{iter: left},
right: iterState[Obj]{iter: right},
}
}
type iterState[Obj any] struct {
iter Iterator[Obj]
obj Obj
rev Revision
ok bool
}
// DualIterator allows iterating over two iterators in revision order.
// Meant to be used for combined iteration of LowerBound(ByRevision)
// and Deleted().
type DualIterator[Obj any] struct {
left iterState[Obj]
right iterState[Obj]
}
func (it *DualIterator[Obj]) Next() (obj Obj, revision uint64, fromLeft, ok bool) {
// Advance the iterators
if !it.left.ok && it.left.iter != nil {
it.left.obj, it.left.rev, it.left.ok = it.left.iter.Next()
if !it.left.ok {
it.left.iter = nil
}
}
if !it.right.ok && it.right.iter != nil {
it.right.obj, it.right.rev, it.right.ok = it.right.iter.Next()
if !it.right.ok {
it.right.iter = nil
}
}
// Find the lowest revision object
switch {
case !it.left.ok && !it.right.ok:
ok = false
return
case it.left.ok && !it.right.ok:
it.left.ok = false
return it.left.obj, it.left.rev, true, true
case it.right.ok && !it.left.ok:
it.right.ok = false
return it.right.obj, it.right.rev, false, true
case it.left.rev <= it.right.rev:
it.left.ok = false
return it.left.obj, it.left.rev, true, true
case it.right.rev <= it.left.rev:
it.right.ok = false
return it.right.obj, it.right.rev, false, true
default:
panic(fmt.Sprintf("BUG: Unhandled case: %+v", it))
}
}