-
Notifications
You must be signed in to change notification settings - Fork 0
/
events.go
154 lines (130 loc) · 3.81 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package events
import (
"sync"
"time"
"github.com/docker/docker/pkg/pubsub"
eventtypes "github.com/docker/engine-api/types/events"
)
const (
eventsLimit = 64
bufferSize = 1024
)
// Events is pubsub channel for events generated by the engine.
type Events struct {
mu sync.Mutex
events []eventtypes.Message
pub *pubsub.Publisher
}
// New returns new *Events instance
func New() *Events {
return &Events{
events: make([]eventtypes.Message, 0, eventsLimit),
pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize),
}
}
// Subscribe adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion), and a function to call
// to stop the stream of events.
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
e.mu.Lock()
current := make([]eventtypes.Message, len(e.events))
copy(current, e.events)
l := e.pub.Subscribe()
e.mu.Unlock()
cancel := func() {
e.Evict(l)
}
return current, l, cancel
}
// SubscribeTopic adds new listener to events, returns slice of 64 stored
// last events, a channel in which you can expect new events (in form
// of interface{}, so you need type assertion).
func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
e.mu.Lock()
var topic func(m interface{}) bool
if ef != nil && ef.filter.Len() > 0 {
topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
}
buffered := e.loadBufferedEvents(since, until, topic)
var ch chan interface{}
if topic != nil {
ch = e.pub.SubscribeTopic(topic)
} else {
// Subscribe to all events if there are no filters
ch = e.pub.Subscribe()
}
e.mu.Unlock()
return buffered, ch
}
// Evict evicts listener from pubsub
func (e *Events) Evict(l chan interface{}) {
e.pub.Evict(l)
}
// Log broadcasts event to listeners. Each listener has 100 millisecond for
// receiving event or it will be skipped.
func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
now := time.Now().UTC()
jm := eventtypes.Message{
Action: action,
Type: eventType,
Actor: actor,
Time: now.Unix(),
TimeNano: now.UnixNano(),
}
// fill deprecated fields for container and images
switch eventType {
case eventtypes.ContainerEventType:
jm.ID = actor.ID
jm.Status = action
jm.From = actor.Attributes["image"]
case eventtypes.ImageEventType:
jm.ID = actor.ID
jm.Status = action
}
e.mu.Lock()
if len(e.events) == cap(e.events) {
// discard oldest event
copy(e.events, e.events[1:])
e.events[len(e.events)-1] = jm
} else {
e.events = append(e.events, jm)
}
e.mu.Unlock()
e.pub.Publish(jm)
}
// SubscribersCount returns number of event listeners
func (e *Events) SubscribersCount() int {
return e.pub.Len()
}
// loadBufferedEvents iterates over the cached events in the buffer
// and returns those that were emitted between two specific dates.
// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
var buffered []eventtypes.Message
if since.IsZero() && until.IsZero() {
return buffered
}
var sinceNanoUnix int64
if !since.IsZero() {
sinceNanoUnix = since.UnixNano()
}
var untilNanoUnix int64
if !until.IsZero() {
untilNanoUnix = until.UnixNano()
}
for i := len(e.events) - 1; i >= 0; i-- {
ev := e.events[i]
if ev.TimeNano < sinceNanoUnix {
break
}
if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
continue
}
if topic == nil || topic(ev) {
buffered = append([]eventtypes.Message{ev}, buffered...)
}
}
return buffered
}