-
Notifications
You must be signed in to change notification settings - Fork 7
/
taskqueue.go
160 lines (143 loc) · 3.62 KB
/
taskqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package taskqueue
import (
"container/ring"
"fmt"
"math"
"sync"
"time"
)
const defaultCapacity = 8
var (
ErrTaskQueue = fmt.Errorf("task queue error")
ErrTaskQueueCapacity = fmt.Errorf("%w: capacity is exhausted", ErrTaskQueue)
ErrTaskQueueUndefined = fmt.Errorf("%w: undefined", ErrTaskQueue)
)
// Task defines queued task
type Task struct {
done chan error
Args []interface{}
Idx uint8
Subject Subject
}
// Done returns channel for result
func (task Task) Done() chan error {
return task.done
}
// Handler defines task handler
type Handler func(*Task) error
// Subject defines task subject
// used as key in map and requires comparable type
type Subject interface{}
// TaskQueue defines task queue
type TaskQueue struct {
mu sync.Mutex
alarm time.Duration
alarmHandler Handler
capacity uint8
debugger func([]Task)
handlers map[Subject]Handler
idx uint8
queue chan *Task
ring *ring.Ring
}
// PushAsync adds task into queue and returns immediately
func (q *TaskQueue) PushAsync(subj Subject, args ...interface{}) (*Task, error) {
if _, ok := q.handlers[subj]; !ok {
return nil, fmt.Errorf("%w: %v", ErrTaskQueueUndefined, subj)
}
done := make(chan error, 1)
task := &Task{done: done, Args: args, Idx: q.idx + 1, Subject: subj}
/* put task into queue */
q.mu.Lock()
defer q.mu.Unlock()
select {
case q.queue <- task:
q.idx = task.Idx
if q.idx > math.MaxUint8-1 {
q.idx = 0
}
/* put task into ring buffer for debug */
q.ring.Value = *task
q.ring = q.ring.Next()
return task, nil
default:
if q.debugger != nil {
lastTasks := []Task{}
q.ring.Do(func(p interface{}) {
if p != nil {
lastTasks = append(lastTasks, p.(Task))
}
})
q.debugger(lastTasks)
}
return nil, fmt.Errorf("%w: %v", ErrTaskQueueCapacity, q.capacity)
}
}
// PushSync adds task into queue and returns after task processing
func (q *TaskQueue) PushSync(subj Subject, args ...interface{}) error {
if task, err := q.PushAsync(subj, args...); err != nil {
return err
} else {
return <-task.Done()
}
}
func (q *TaskQueue) runQueue() {
for task := range q.queue {
var alarmTimer *time.Timer
if q.alarm != 0 && q.alarmHandler != nil {
/* create closure to prevent the race condition
as loop var can be updated in time between timer trigger and handler call */
func(task *Task) {
alarmTimer = time.AfterFunc(q.alarm, func() {
_ = q.alarmHandler(task)
})
}(task)
}
handler := q.handlers[task.Subject]
err := handler(task)
if alarmTimer != nil {
alarmTimer.Stop()
}
task.done <- err
close(task.done)
}
}
// TaskQueueOption defines task queue option
type TaskQueueOption func(*TaskQueue)
// NewTaskQueue creates task queue
func NewTaskQueue(opts ...TaskQueueOption) *TaskQueue {
q := &TaskQueue{capacity: defaultCapacity}
for _, optFn := range opts {
optFn(q)
}
q.ring = ring.New(int(q.capacity))
q.queue = make(chan *Task, q.capacity)
go q.runQueue()
return q
}
// WithAlarm defines handler invoked on timeout after task start
// useful for log the long executed task
func WithAlarm(d time.Duration, h Handler) TaskQueueOption {
return func(q *TaskQueue) {
q.alarm = d
q.alarmHandler = h
}
}
// WithCapacity defines capacity of task queue
func WithCapacity(c uint8) TaskQueueOption {
return func(q *TaskQueue) {
q.capacity = c
}
}
// WithHandlers defines tasks
func WithHandlers(m map[Subject]Handler) TaskQueueOption {
return func(q *TaskQueue) {
q.handlers = m
}
}
// WithDebugger defines debug
func WithDebugger(fn func([]Task)) TaskQueueOption {
return func(q *TaskQueue) {
q.debugger = fn
}
}