-
Notifications
You must be signed in to change notification settings - Fork 18.7k
/
events.go
142 lines (123 loc) · 3.83 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package events
import (
"sync"
"time"
"github.com/docker/docker/pkg/pubsub"
eventtypes "github.com/docker/engine-api/types/events"
)
const (
eventsLimit = 64
bufferSize = 1024
)
// Events is pubsub channel for events generated by the engine.
type Events struct {
mu sync.Mutex
events []eventtypes.Message
pub *pubsub.Publisher
}
// New returns new *Events instance
func New() *Events {
return &Events{
events: make([]eventtypes.Message, 0, eventsLimit),
pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize),
}
}
// Subscribe adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
e.mu.Lock()
current := make([]eventtypes.Message, len(e.events))
copy(current, e.events)
l := e.pub.Subscribe()
e.mu.Unlock()
cancel := func() {
e.Evict(l)
}
return current, l, cancel
}
// SubscribeTopic adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion).
func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]eventtypes.Message, chan interface{}) {
e.mu.Lock()
var topic func(m interface{}) bool
if ef != nil && ef.filter.Len() > 0 {
topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
}
buffered := e.loadBufferedEvents(since, sinceNano, topic)
var ch chan interface{}
if topic != nil {
ch = e.pub.SubscribeTopic(topic)
} else {
// Subscribe to all events if there are no filters
ch = e.pub.Subscribe()
}
e.mu.Unlock()
return buffered, ch
}
// Evict evicts listener from pubsub
func (e *Events) Evict(l chan interface{}) {
e.pub.Evict(l)
}
// Log broadcasts event to listeners. Each listener has 100 millisecond for
// receiving event or it will be skipped.
func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
now := time.Now().UTC()
jm := eventtypes.Message{
Action: action,
Type: eventType,
Actor: actor,
Time: now.Unix(),
TimeNano: now.UnixNano(),
}
// fill deprecated fields for container and images
switch eventType {
case eventtypes.ContainerEventType:
jm.ID = actor.ID
jm.Status = action
jm.From = actor.Attributes["image"]
case eventtypes.ImageEventType:
jm.ID = actor.ID
jm.Status = action
}
e.mu.Lock()
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm)
}
// SubscribersCount returns number of event listeners
func (e *Events) SubscribersCount() int {
return e.pub.Len()
}
// loadBufferedEvents iterates over the cached events in the buffer
// and returns those that were emitted before a specific date.
// The date is splitted in two values:
// - the `since` argument is a date timestamp without nanoseconds, or -1 to return an empty slice.
// - the `sinceNano` argument is the nanoseconds offset from the timestamp.
// It uses `time.Unix(seconds, nanoseconds)` to generate a valid date with those two first arguments.
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
func (e *Events) loadBufferedEvents(since, sinceNano int64, topic func(interface{}) bool) []eventtypes.Message {
var buffered []eventtypes.Message
if since == -1 {
return buffered
}
sinceNanoUnix := time.Unix(since, sinceNano).UnixNano()
for i := len(e.events) - 1; i >= 0; i-- {
ev := e.events[i]
if ev.TimeNano < sinceNanoUnix {
break
}
if topic == nil || topic(ev) {
buffered = append([]eventtypes.Message{ev}, buffered...)
}
}
return buffered
}