-
Notifications
You must be signed in to change notification settings - Fork 6
/
Batch.go
129 lines (106 loc) · 2.41 KB
/
Batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package iterators
import (
"context"
"sync"
"time"
"github.com/adamluzsi/frameless"
)
func Batch[T any](i frameless.Iterator[T], c BatchConfig) *BatchIter[T] {
return &BatchIter[T]{Iterator: i, Config: c}
}
type BatchConfig struct {
// Size is the max amount of element that a batch will contains.
// Default batch Size is 100.
Size int
// Timeout is batching wait timout duration that the batching process is willing to wait for, before starting to build a new batch.
// Default batch Timeout is 100 Millisecond.
Timeout time.Duration
}
func (c BatchConfig) getTimeout() time.Duration {
const defaultTimeout = 100 * time.Millisecond
if c.Timeout <= 0 {
return defaultTimeout
}
return c.Timeout
}
func (c BatchConfig) getSize() int {
const defaultSize = 100
if c.Size <= 0 {
return defaultSize
}
return c.Size
}
type BatchIter[T any] struct {
Iterator frameless.Iterator[T]
Config BatchConfig
init sync.Once
stream chan T
cancel func()
batch []T
}
func (i *BatchIter[T]) Init() {
i.init.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
i.stream = make(chan T)
i.cancel = cancel
go i.fetch(ctx)
})
}
func (i *BatchIter[T]) fetch(ctx context.Context) {
wrk:
for i.Iterator.Next() {
select {
case <-ctx.Done():
break wrk
case i.stream <- i.Iterator.Value():
}
}
}
func (i *BatchIter[T]) Close() error {
i.init.Do(func() {}) // prevent async interactions
if i.cancel != nil {
i.cancel()
}
return i.Iterator.Close()
}
// Err return the cause if for some reason by default the More return false all the time
func (i *BatchIter[T]) Err() error {
return i.Iterator.Err()
}
func (i *BatchIter[T]) Next() bool {
i.Init()
size := i.Config.getSize()
i.batch = make([]T, 0, size)
timer := time.NewTimer(i.Config.getTimeout())
defer stopTimer(timer)
batching:
for len(i.batch) < size {
resetTimer(timer, i.Config.getTimeout())
select {
case v, open := <-i.stream:
if !open {
break batching
}
i.batch = append(i.batch, v)
case <-timer.C:
break batching
}
}
return 0 < len(i.batch)
}
// Value returns the current value in the iterator.
// The action should be repeatable without side effect.
func (i *BatchIter[T]) Value() []T {
return i.batch
}
func stopTimer(timer *time.Timer) {
timer.Stop()
select {
case <-timer.C:
default:
}
}
func resetTimer(timer *time.Timer, timeout time.Duration) {
stopTimer(timer)
timer.Reset(timeout)
}