-
Notifications
You must be signed in to change notification settings - Fork 46
/
streamer.go
45 lines (42 loc) · 1.24 KB
/
streamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package candihelper
import (
"context"
"math"
)
// StreamAllBatch helper func for stream data
func StreamAllBatch[T any, F FilterStreamer](ctx context.Context, totalData int, filter F, fetchAllFunc func(context.Context, F) ([]T, error), handleFunc func(idx int, data *T) error) error {
totalPages := int(math.Ceil(float64(totalData) / float64(filter.GetLimit())))
for filter.GetPage() <= totalPages {
list, err := fetchAllFunc(ctx, filter)
if err != nil {
return err
}
for i, data := range list {
offset := (filter.GetPage() - 1) * filter.GetLimit()
if err := handleFunc(offset+i, &data); err != nil {
return err
}
}
filter.IncrPage()
}
return nil
}
// StreamAllBatchDynamic helper func for stream data with dynamic source changes
func StreamAllBatchDynamic[T any, F FilterStreamer](ctx context.Context, filter F, fetchAllFunc func(context.Context, F) ([]T, error), handleFunc func(idx int, data *T) error) error {
for {
list, err := fetchAllFunc(ctx, filter)
if err != nil {
return err
}
if len(list) == 0 {
return nil
}
for i, data := range list {
offset := (filter.GetPage() - 1) * filter.GetLimit()
if err := handleFunc(offset+i, &data); err != nil {
return err
}
}
filter.IncrPage()
}
}