/
batch_event.go
154 lines (137 loc) · 2.82 KB
/
batch_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Create and maintain by Chaiyapong Lapliengtrakul (chaiyapong@3dsinteractive.com), All right reserved (2021 - Present)
package main
import (
"sync/atomic"
"time"
"github.com/3dsinteractive/go-queue/queue"
)
// Batch is collection of items
type Batch struct {
q *queue.Queue
}
// NewBatch return new buffer
func NewBatch() *Batch {
return &Batch{
q: queue.New(),
}
}
// Add item into buffer
func (b *Batch) Add(item interface{}) {
b.q.PushBack(item)
}
// Read item from buffer
func (b *Batch) Read() interface{} {
v := b.q.PopFront()
return v
}
// Reset clear the buffer
func (b *Batch) Reset() {
b.q.Init()
}
// BatchFillFunc is the handler for fill the batch
type BatchFillFunc func(b *Batch, payload interface{}) error
// BatchExecFunc is the handler for execute the batch
type BatchExecFunc func(b *Batch) error
// BatchEvent is struct to manage batch life cycle event
type BatchEvent struct {
batchSize int
timeout time.Duration
fill BatchFillFunc
execute BatchExecFunc
done BatchExecFunc
payload chan interface{}
errc chan error
}
// NewBatchEvent return new BatchEvent
func NewBatchEvent(
batchSize int,
timeout time.Duration,
fill BatchFillFunc,
execute BatchExecFunc,
payload chan interface{},
errc chan error,
) *BatchEvent {
return &BatchEvent{
batchSize: batchSize,
timeout: timeout,
fill: fill,
execute: execute,
payload: payload,
errc: errc,
}
}
// Start start the batch event
// Loop will exit when payload has closed close(payload)
func (be *BatchEvent) Start() {
fill := make(chan interface{})
exec := make(chan bool)
done := make(chan bool, 1)
stop := make(chan bool, 1) // stop timer channel
defer func() {
close(stop)
close(done)
close(exec)
close(fill)
}()
var n int32
if be.timeout > 0 {
go func(timeout time.Duration) {
for {
timer := time.NewTimer(timeout)
select {
case <-stop:
timer.Stop()
return
case <-timer.C:
i := atomic.LoadInt32(&n)
if i > 0 {
atomic.StoreInt32(&n, 0)
exec <- true
}
}
}
}(be.timeout)
}
go func(payload chan interface{}, batchSize int, timeout time.Duration) {
for {
p, ok := <-payload
if ok {
fill <- p
i := atomic.AddInt32(&n, 1)
if i >= int32(batchSize) {
atomic.StoreInt32(&n, 0)
exec <- true
}
} else {
// close everything
if timeout > 0 {
// exit timer
stop <- true
}
// execute last batch
exec <- true
// exit from executor
done <- true
return
}
}
}(be.payload, be.batchSize, be.timeout)
batch := NewBatch()
for {
select {
case payload := <-fill:
err := be.fill(batch, payload)
if err != nil {
be.errc <- err
}
case <-exec:
err := be.execute(batch)
batch.Reset()
if err != nil {
be.errc <- err
}
case <-done:
return
}
}
}