-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnoaa_message_queue.go
50 lines (37 loc) · 1 KB
/
noaa_message_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package logs
import (
"sort"
"sync"
"github.com/cloudfoundry/sonde-go/events"
)
type NoaaMessageQueue struct {
messages []*events.LogMessage
mutex sync.Mutex
}
func NewNoaaMessageQueue() *NoaaMessageQueue {
return &NoaaMessageQueue{}
}
func (pq *NoaaMessageQueue) PushMessage(message *events.LogMessage) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
pq.messages = append(pq.messages, message)
}
// implement sort interface so we can sort messages as we receive them in PushMessage
func (pq *NoaaMessageQueue) Less(i, j int) bool {
return *pq.messages[i].Timestamp < *pq.messages[j].Timestamp
}
func (pq *NoaaMessageQueue) Swap(i, j int) {
pq.messages[i], pq.messages[j] = pq.messages[j], pq.messages[i]
}
func (pq *NoaaMessageQueue) Len() int {
return len(pq.messages)
}
func (pq *NoaaMessageQueue) EnumerateAndClear(onMessage func(*events.LogMessage)) {
pq.mutex.Lock()
defer pq.mutex.Unlock()
sort.Stable(pq)
for _, x := range pq.messages {
onMessage(x)
}
pq.messages = []*events.LogMessage{}
}