-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
120 lines (96 loc) · 2.22 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package twitchchat
import (
"container/list"
"context"
"fmt"
"sync"
"golang.org/x/time/rate"
)
var ErrBucketClosed = fmt.Errorf("Error: Sink Closed")
// inspiration from github.com/Docker/go-events
type Event interface{}
// Emitter accepts and emits events
type Emitter interface {
// Emit event
Emit(event Event) error
OnError(err error)
Close() error
}
// Bucket controls the flow of events into the sink
type Bucket struct {
emitter Emitter
events *list.List
cond *sync.Cond
mutex sync.Mutex
limiter *rate.Limiter
context context.Context
closed bool
}
// Makes a new bucket that can be filled with events. Events are dripped at the
// passed in rate with given burstLimit. To have no rate limit, rate.Inf should be
// passed in
func NewBucket(emitter Emitter, tokenRate rate.Limit, burstLimit int) *Bucket {
bucket := Bucket{
emitter: emitter,
events: list.New(),
limiter: rate.NewLimiter(tokenRate, burstLimit),
context: context.Background(),
}
bucket.cond = sync.NewCond(&bucket.mutex)
go bucket.drip()
return &bucket
}
// Add event into bucket. If it's high priority the event will
// be pushed to the front of the list
func (bucket *Bucket) AddEvent(event Event, highPriority bool) error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
if bucket.closed {
return ErrBucketClosed
}
if highPriority {
bucket.events.PushFront(event)
} else {
bucket.events.PushBack(event)
}
bucket.cond.Signal()
return nil
}
func (bucket *Bucket) Close() error {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
if bucket.closed {
return nil
}
bucket.closed = true
bucket.cond.Signal()
bucket.cond.Wait()
return bucket.emitter.Close()
}
func (bucket *Bucket) drip() {
for {
event := bucket.next()
if event == nil {
return
}
// Wait for a token before emitting event
err := bucket.limiter.Wait(bucket.context)
if err != nil {
return
}
if err := bucket.emitter.Emit(event); err != nil {
bucket.emitter.OnError(err)
}
}
}
func (bucket *Bucket) next() Event {
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
for bucket.events.Len() < 1 {
bucket.cond.Wait()
}
front := bucket.events.Front()
event := front.Value.(Event)
bucket.events.Remove(front)
return event
}