-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
57 lines (50 loc) · 1.06 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package do
// BatchRun handle data per batch
func BatchRun[T any](s []T, batchNum int, handler func([]T) error) (err error) {
batch := make([]T, 0, batchNum)
for _, item := range s {
item := item
batch = append(batch, item)
if len(batch) < batchNum {
continue
}
// 达到一批了则执行
if err := handler(batch); err != nil {
return err
}
batch = make([]T, 0, batchNum)
}
// 剩余未满一批的
if len(batch) > 0 {
if err = handler(batch); err != nil {
return err
}
}
return
}
// StreamRun handle data by stream, if batchNum is >0, run with batch
func StreamRun[T any](s chan T, batchNum int, handler func([]T) error) (err error) {
batch := make([]T, 0, batchNum)
for e := range s {
e := e
if batchNum > 0 {
batch = append(batch, e)
if len(batch) < batchNum {
continue
}
} else {
batch = []T{e}
}
if err = handler(batch); err != nil {
return err
}
batch = make([]T, 0, batchNum)
}
// 剩余未满一批的
if len(batch) > 0 {
if err = handler(batch); err != nil {
return err
}
}
return
}