-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
66 lines (59 loc) · 1.19 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package pool
import (
"log"
"runtime"
"sync"
)
type worker[T any] struct {
id int
done *sync.WaitGroup
readyPool chan chan Task[T]
work chan Task[T]
quit chan bool
callback func(T)
}
func NewWorker[T any](id int, readyPool chan chan Task[T], done *sync.WaitGroup) *worker[T] {
return &worker[T]{
id: id,
done: done,
readyPool: readyPool,
work: make(chan Task[T]),
quit: make(chan bool),
}
}
func (w *worker[T]) Process(task Task[T]) {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Printf("panic running process: %v\n%s\n", r, buf)
}
}()
result := task.Run()
// If callback defined, invoke callback
if w.callback != nil {
go w.callback(result)
}
}
// Start wait for tasks with optional
func (w *worker[T]) Start(callback func(T)) {
w.callback = callback
go func() {
w.done.Add(1)
for {
w.readyPool <- w.work
select {
case work := <-w.work:
w.Process(work)
case <-w.quit:
w.done.Done()
return
}
}
}()
}
// Stop notify worker to stop after current process
func (w *worker[T]) Stop() {
w.quit <- true
}