-
Notifications
You must be signed in to change notification settings - Fork 8
/
pagination.go
291 lines (233 loc) · 8.88 KB
/
pagination.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"reflect"
"github.com/go-logr/logr"
"go.anx.io/go-anxcloud/pkg/api/types"
)
// maxPageFetchRetry is the maximum number of retries to fetch a single page.
// When that retry count is reached, an error is set that cannot be cleared with ResetError().
const maxPageFetchRetry = 10
type pageFetcher func(page uint) (json.RawMessage, error)
type pageIter struct {
currentPage uint
totalPages uint
totalItems uint
itemsPerPage uint
err error
errRetryCounter uint
pageFetcher pageFetcher
singlePageMode bool
fullObjects bool
ctx context.Context
api API
}
// CurrentPage returns the page number the last Next call processed.
func (p *pageIter) CurrentPage() uint {
return p.currentPage
}
// TotalPages returns the total number of pages. Note: not all APIs support this and will then return 0.
func (p *pageIter) TotalPages() uint {
return p.totalPages
}
// TotalItems returns the total number of items. Note: not all APIs support this and will then return 0.
func (p *pageIter) TotalItems() uint {
return p.totalItems
}
// ItemsPerPage returns the maximum number of entries per page, corresponding to the Limit parameter given
// to the Paged attribute.
func (p *pageIter) ItemsPerPage() uint {
return p.itemsPerPage
}
// Next retrieves the next page of objects to process. On the first call, it gives the exact same
// objects as api.List() returned to allow iterating over all pages easily. It returns true when it
// has received another page of objects and false on completion or error. Errors can be retrieved by
// calling PageInfo.Error().
func (p *pageIter) Next(objects interface{}) bool {
if p.err != nil {
return false
}
if p.currentPage == 1 && p.singlePageMode {
return false
}
val := reflect.ValueOf(objects)
isPointer := val.Kind() == reflect.Ptr
isArrayOrSlice := false
isObjects := false
isRawMessages := false
wrongType := val.Type()
if isPointer {
kind := val.Elem().Kind()
isArrayOrSlice = kind == reflect.Slice || kind == reflect.Array
}
if isArrayOrSlice {
objectType := reflect.TypeOf((*types.Object)(nil)).Elem()
elementType := val.Elem().Type().Elem()
ptrToElementType := reflect.PtrTo(elementType)
isObjects = ptrToElementType.Implements(objectType)
isRawMessages = elementType == reflect.TypeOf((*json.RawMessage)(nil)).Elem()
}
// the check for isObjects || isRawMessages isn't actually required, but is kept to prevent users decoding their
// page of objects into something completely different by accident. I currently don't see a valid reason to do
// that, but if one comes up, this can probably be removed. -- Mara @LittleFox94 Grosch, 2021-10-16
// json.RawMessage is allowed for retrieving objects via channel, where the page is decoded into an array of
// json.RawMessage and every entry of that is decoded into the target object as soon as it is needed.
if !isPointer || !isArrayOrSlice || (!isObjects && !isRawMessages) {
p.err = fmt.Errorf("%w: the argument given to PageInfo.Next() must be a pointer to []T where *T implements types.Object or T is json.RawMessage; expected *[]T, you gave %v", ErrTypeNotSupported, wrongType)
return false
}
pageData, err := p.pageFetcher(p.currentPage + 1)
if err != nil {
p.errRetryCounter++
p.err = err
return false
}
_, _, _, _, data, err := decodePaginationResponseBody(pageData, types.ListOptions{Page: p.currentPage + 1, EntriesPerPage: p.itemsPerPage})
if err != nil {
p.errRetryCounter++
p.err = err
return false
}
// allocate new array for target type and already known number of objects
newVal := reflect.MakeSlice(val.Type().Elem(), len(data), len(data))
// decode every page response object into index in target type array
for i, e := range data {
decodeInto := newVal.Index(i).Addr().Interface()
err = decodeResponse(p.ctx, "application/json", bytes.NewBuffer(e), decodeInto)
if err != nil {
p.errRetryCounter++
p.err = err
return false
}
}
// If decoding into Object's (and not json.RawMessage), optionally retrieve the full object.
// We could do this in the loop above, but this way we already know the entries on the page are all valid.
if isObjects && p.fullObjects {
for i := 0; i < newVal.Len(); i++ {
retrieveInto := newVal.Index(i).Addr().Interface()
err := p.api.Get(p.ctx, retrieveInto.(types.IdentifiedObject))
if err != nil {
p.errRetryCounter++
p.err = err
return false
}
}
}
val.Elem().Set(newVal)
log := logr.FromContextOrDiscard(p.ctx)
retrievedElements := uint(val.Elem().Len())
if retrievedElements > p.itemsPerPage && p.itemsPerPage > 0 {
log.Info("Retrieved more elements in one Next() than wanted", "wanted", p.itemsPerPage, "retrieved", retrievedElements)
} else {
log.V(1).Info("Retrieved elements from engine", "limit", p.itemsPerPage, "retrieved", retrievedElements)
}
p.errRetryCounter = 0
p.currentPage++
return retrievedElements > 0
}
// Returns error. An iteration over all pages has successfully completed when Next() returns false and
// Error() returns nil. You should check for errors after Next() returns false to differentiate between
// "all pages done" and "error retrieving page".
func (p *pageIter) Error() error {
return p.err
}
// ResetError clears any stored error to resume the iterator. If the retry counter for the current page exceeded
// a package-defined maximum, the error cannot be cleared and Error() will return it after ResetError() was called.
// you have to check for this.
func (p *pageIter) ResetError() {
if p.errRetryCounter < maxPageFetchRetry {
p.err = nil
}
}
func newPageIter(ctx context.Context, api API, responseBody json.RawMessage, opts types.ListOptions, fetcher pageFetcher, singlePageMode bool) (types.PageInfo, error) {
if logger, err := logr.FromContext(ctx); err == nil {
ctx = logr.NewContext(ctx, logger.WithName("pagination"))
}
ret := pageIter{
ctx: ctx,
api: api,
singlePageMode: singlePageMode,
fullObjects: opts.FullObjects,
}
currentPage, limit, totalPages, totalItems, _, err := decodePaginationResponseBody(responseBody, opts)
if err != nil {
return nil, err
}
if currentPage == 1 {
currentPage = 0
}
ret.currentPage = currentPage
ret.itemsPerPage = limit
ret.totalPages = totalPages
ret.totalItems = totalItems
// first pageFetcher is returning the data we got with the initial request, after this is fetched, we
// use the pageFetcher provided as argument
ret.pageFetcher = func(page uint) (json.RawMessage, error) {
ret.pageFetcher = fetcher
return responseBody, nil
}
return &ret, nil
}
func decodePaginationResponseBody(data json.RawMessage, opts types.ListOptions) (page, limit, totalPages, totalItems uint, ret []json.RawMessage, err error) {
page = 0
limit = 0
totalPages = 0
totalItems = 0
// TODO(LittleFox94): this is not the same for every API and we need a way to override this or
// find the X ways it's done and have options for that. Currently we support those two types and
// "plain data array".
type dataResponse struct {
CurrentPage uint `json:"page"`
TotalPages uint `json:"total_pages"`
TotalItems uint `json:"total_items"`
EntriesPerPage uint `json:"limit"`
Data []json.RawMessage `json:"data"`
}
type dataDataResponse struct {
State string `json:"state"`
Messages []string `json:"messages"`
Data dataResponse `json:"data"`
}
// First dataData then data is important since we switch over the index of the decoded message,
// set data from dataData and fallthrough.
// The entries have to be pointers, else every entry matches every data - since it is an interface{} then.
//
// TODO(@LittleFox94): are there actually paginated APIs returning only an Array without any page metadata?
// I was sure there was one, but cannot find one right now and maybe "plain array returned" is already the
// info "don't even try to get the next page".
responseTypes := []interface{}{&dataDataResponse{}, &dataResponse{}, &[]json.RawMessage{}}
actualResponse := -1
for i, response := range responseTypes {
decoder := json.NewDecoder(bytes.NewBuffer(data))
// in case we receive a completely different response we have to prevent it being decodable into one
// of the supported formats by accident.
decoder.DisallowUnknownFields()
if err := decoder.Decode(&response); err == nil {
actualResponse = i
break
}
}
if actualResponse == -1 {
return page, limit, totalPages, totalItems, ret, ErrPageResponseNotSupported
}
switch actualResponse {
case 0:
responseTypes[1] = &responseTypes[0].(*dataDataResponse).Data
fallthrough
case 1:
data := responseTypes[1].(*dataResponse)
page = data.CurrentPage
limit = data.EntriesPerPage
totalPages = data.TotalPages
totalItems = data.TotalItems
ret = data.Data
case 2:
page = opts.Page
limit = opts.EntriesPerPage
ret = *(responseTypes[2].(*[]json.RawMessage))
}
return page, limit, totalPages, totalItems, ret, err
}