-
Notifications
You must be signed in to change notification settings - Fork 2
/
partitionedlossyqueue.go
64 lines (52 loc) · 1.71 KB
/
partitionedlossyqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package partitionedlossyqueue
import (
"sync"
)
// This queue implements topic-based partitioning for possibly slow consumers
// for situations where we are only interested about the latest value of the partition.
// In this case the lower the ratio partition_count/sent_messages_across_partitions is,
// the more we benefit from discarding stuff which would increase unnecessary
// buffering or grow buffers' RAM usage unbounded.
//
// This queue guarantees that the latest message for a topic is always delivered.
type Queue struct {
ReceiveAvailable chan bool
mu *sync.Mutex
partitionStore map[string]string
}
func New() *Queue {
return &Queue{
ReceiveAvailable: make(chan bool, 1),
mu: &sync.Mutex{},
partitionStore: make(map[string]string),
}
}
func (p *Queue) ReceiveAndClear() map[string]string {
p.mu.Lock()
defer p.mu.Unlock()
// we can just take a reference to existing map because
// we'll "clear" the one we store by allocating a new map for it
ref := p.partitionStore
// clear
p.partitionStore = make(map[string]string)
return ref
}
func (p *Queue) Put(partitionKey string, message string) {
p.mu.Lock()
// if had an entry, only the latest message survives
p.partitionStore[partitionKey] = message
p.mu.Unlock()
// notify consumer in nonblocking/nonbuffering way
select {
case p.ReceiveAvailable <- true:
// noop
default:
// notification message dropped. since the channel is buffered, we know
// that there already was one "ReceiveAvailable" notification queued.
// the consumer will react to it and get the latest data.
}
}
func (p *Queue) Close() {
// closing is ok, consumer reads any messages buffered on the channel
close(p.ReceiveAvailable)
}