-
Notifications
You must be signed in to change notification settings - Fork 109
/
query.go
214 lines (195 loc) · 6.66 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package db
import (
"encoding/json"
"fmt"
"reflect"
"github.com/syndtr/goleveldb/leveldb"
"github.com/albrow/stringset"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/util"
)
// Query is used to return certain results from the database.
type Query struct {
colInfo *colInfo
reader dbReader
filter *Filter
max int
offset int
reverse bool
}
// Filter determines which models to return in the query and what order to
// return them in.
type Filter struct {
index *Index
slice *util.Range
}
func newQuery(colInfo *colInfo, reader dbReader, filter *Filter) *Query {
return &Query{
colInfo: colInfo,
reader: reader,
filter: filter,
}
}
// Max causes the query to only return up to max results. It is the analog of
// the LIMIT keyword in SQL:
// https://www.postgresql.org/docs/current/queries-limit.html
func (q *Query) Max(max int) *Query {
q.max = max
return q
}
// Reverse causes the query to return models in descending byte order according
// to their index values instead of the default (ascending byte order).
func (q *Query) Reverse() *Query {
q.reverse = true
return q
}
// Offset causes the query to skip offset models when iterating through models
// that match the query. Note that queries which use an offset have a runtime
// of O(max(K, offset) + N), where N is the number of models returned by the
// query and K is the total number of keys in the corresponding index. Queries
// with a high offset can take a long time to run, regardless of the number of
// models returned. This is due to limitations of the underlying database.
// Offset is the analog of the OFFSET keyword in SQL:
// https://www.postgresql.org/docs/current/queries-limit.html
func (q *Query) Offset(offset int) *Query {
q.offset = offset
return q
}
// ValueFilter returns a Filter which will match all models with an index value
// equal to the given value.
func (index *Index) ValueFilter(val []byte) *Filter {
prefix := []byte(fmt.Sprintf("%s:%s:", index.prefix(), escape(val)))
return &Filter{
index: index,
slice: util.BytesPrefix(prefix),
}
}
// RangeFilter returns a Filter which will match all models with an index value
// >= start and < limit.
func (index *Index) RangeFilter(start []byte, limit []byte) *Filter {
startWithPrefix := []byte(fmt.Sprintf("%s:%s", index.prefix(), escape(start)))
limitWithPrefix := []byte(fmt.Sprintf("%s:%s", index.prefix(), escape(limit)))
slice := &util.Range{Start: startWithPrefix, Limit: limitWithPrefix}
return &Filter{
index: index,
slice: slice,
}
}
// PrefixFilter returns a Filter which will match all models with an index value
// that starts with the given prefix.
func (index *Index) PrefixFilter(prefix []byte) *Filter {
keyPrefix := []byte(fmt.Sprintf("%s:%s", index.prefix(), escape(prefix)))
return &Filter{
index: index,
slice: util.BytesPrefix(keyPrefix),
}
}
// All returns a Filter which will match all models. It is useful for when you
// want to retrieve models in sorted order without excluding any of them.
func (index *Index) All() *Filter {
return index.PrefixFilter([]byte{})
}
// Run runs the query and scans the results into models. models should be a
// pointer to an empty slice of a concrete model type (e.g. *[]myModelType). It
// returns an error if models is the wrong type or there was a problem reading
// from the database. It does not return an error if no models match the query.
func (q *Query) Run(models interface{}) error {
if err := q.colInfo.checkModelsType(models); err != nil {
return err
}
iter := q.reader.NewIterator(q.filter.slice, nil)
defer iter.Release()
if q.reverse {
return q.getModelsWithIteratorReverse(iter, models)
}
return q.getModelsWithIteratorForward(iter, models)
}
// Count returns the number of unique models that match the query. It does not
// return an error if no models match the query. Note that this method *does*
// respect q.Max. If the number of models that match the filter is greater than
// q.Max, it will stop counting and return q.Max.
func (q *Query) Count() (int, error) {
iter := q.reader.NewIterator(q.filter.slice, nil)
defer iter.Release()
pkSet := stringset.New()
for i := 0; iter.Next() && iter.Error() == nil; i++ {
if i < q.offset {
continue
}
pk := q.filter.index.primaryKeyFromIndexKey(iter.Key())
pkSet.Add(string(pk))
if q.max != 0 && len(pkSet) >= q.max {
break
}
}
if iter.Error() != nil {
return 0, iter.Error()
}
return len(pkSet), nil
}
func (q *Query) getModelsWithIteratorForward(iter iterator.Iterator, models interface{}) error {
// MultiIndexes can result in the same model being included more than once. To
// prevent this, we keep track of the primaryKeys we have already seen using
// pkSet.
pkSet := stringset.New()
modelsVal := reflect.ValueOf(models).Elem()
for i := 0; iter.Next() && iter.Error() == nil; i++ {
if i < q.offset {
continue
}
if err := q.getAndAppendModelIfUnique(q.filter.index, pkSet, iter.Key(), modelsVal); err != nil {
return err
}
if q.max != 0 && modelsVal.Len() >= q.max {
return iter.Error()
}
}
return iter.Error()
}
func (q *Query) getModelsWithIteratorReverse(iter iterator.Iterator, models interface{}) error {
pkSet := stringset.New()
modelsVal := reflect.ValueOf(models).Elem()
// Move the iterator to the last key and then iterate backwards by calling
// Prev instead of Next for each iteration of the for loop.
iter.Last()
iter.Next()
for i := 0; iter.Prev() && iter.Error() == nil; i++ {
if i < q.offset {
continue
}
if err := q.getAndAppendModelIfUnique(q.filter.index, pkSet, iter.Key(), modelsVal); err != nil {
return err
}
if q.max != 0 && modelsVal.Len() >= q.max {
return iter.Error()
}
}
return iter.Error()
}
func (q *Query) getAndAppendModelIfUnique(index *Index, pkSet stringset.Set, key []byte, modelsVal reflect.Value) error {
// We assume that each key in the iterator consists of an index prefix, the
// value for a particular model, and the model ID. We can extract a primary
// key from this key and use it to get the encoded data for the model
// itself.
pk := index.primaryKeyFromIndexKey(key)
if pkSet.Contains(string(pk)) {
return nil
}
pkSet.Add(string(pk))
data, err := q.reader.Get(pk, nil)
if err == leveldb.ErrNotFound || data == nil {
// It is possible that a separate goroutine deleted the model while we were
// iterating through the keys in the index. This is not considered an error.
// We simply don't include this model in the final results.
return nil
}
if err != nil {
return err
}
model := reflect.New(q.colInfo.modelType)
if err := json.Unmarshal(data, model.Interface()); err != nil {
return err
}
modelsVal.Set(reflect.Append(modelsVal, model.Elem()))
return nil
}