-
Notifications
You must be signed in to change notification settings - Fork 245
/
iterator.go
120 lines (102 loc) · 2.49 KB
/
iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package pagination
import (
"context"
"errors"
"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)
// NewPaginatedIterator creates an implementation of the datastore.Iterator
// interface that internally paginates over datastore results.
func NewPaginatedIterator(
ctx context.Context,
reader datastore.Reader,
filter datastore.RelationshipsFilter,
pageSize uint64,
order options.SortOrder,
startCursor options.Cursor,
) (datastore.RelationshipIterator, error) {
pi := &paginatedIterator{
ctx: ctx,
reader: reader,
filter: filter,
pageSize: pageSize,
order: order,
delegate: common.NewSliceRelationshipIterator(nil, options.ByResource),
}
pi.startNewBatch(startCursor)
return pi, pi.err
}
type paginatedIterator struct {
ctx context.Context
reader datastore.Reader
filter datastore.RelationshipsFilter
pageSize uint64
order options.SortOrder
delegate datastore.RelationshipIterator
returnedFromBatch uint64
err error
closed bool
}
func (pi *paginatedIterator) Next() *core.RelationTuple {
if pi.Err() != nil {
return nil
}
var next *core.RelationTuple
for next = pi.delegate.Next(); next == nil; next = pi.delegate.Next() {
if pi.delegate.Err() != nil {
pi.err = pi.delegate.Err()
return nil
}
if pi.returnedFromBatch < pi.pageSize {
// No more tuples to get
return nil
}
cursor, err := pi.delegate.Cursor()
if err != nil {
if errors.Is(err, datastore.ErrCursorEmpty) {
// The last batch had no data
return nil
}
pi.err = err
return nil
}
pi.startNewBatch(cursor)
if pi.err != nil {
return nil
}
}
pi.returnedFromBatch++
return next
}
func (pi *paginatedIterator) startNewBatch(cursor options.Cursor) {
pi.delegate.Close()
pi.returnedFromBatch = 0
pi.delegate, pi.err = pi.reader.QueryRelationships(
pi.ctx,
pi.filter,
options.WithSort(pi.order),
options.WithLimit(&pi.pageSize),
options.WithAfter(cursor),
)
}
func (pi *paginatedIterator) Cursor() (options.Cursor, error) {
return pi.delegate.Cursor()
}
func (pi *paginatedIterator) Err() error {
switch {
case pi.closed:
return datastore.ErrClosedIterator
case pi.err != nil:
return pi.err
case pi.ctx.Err() != nil:
return pi.ctx.Err()
default:
return nil
}
}
func (pi *paginatedIterator) Close() {
pi.closed = true
pi.delegate.Close()
}