/
spreader.go
95 lines (78 loc) · 1.56 KB
/
spreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package simulator
import (
"sync"
"github.com/Gimulator/protobuf/go/api"
"github.com/sirupsen/logrus"
)
type Channel struct {
mux sync.Mutex
Ch chan *api.Message
IsClosed bool
}
func NewChannel() *Channel {
return &Channel{
Ch: make(chan *api.Message, 128),
mux: sync.Mutex{},
IsClosed: false,
}
}
func (c *Channel) Close() {
c.mux.Lock()
defer c.mux.Unlock()
c.IsClosed = true
}
func (c *Channel) IsClose() bool {
c.mux.Lock()
defer c.mux.Unlock()
return c.IsClosed
}
type watcher struct {
key *api.Key
channel *Channel
}
type spreader struct {
watchers []watcher
log *logrus.Entry
}
func NewSpreader() *spreader {
return &spreader{
watchers: make([]watcher, 0),
log: logrus.WithField("entity", "spreader"),
}
}
func (s *spreader) AddWatcher(key *api.Key, ch *Channel) error {
s.watchers = append(s.watchers, watcher{
key: key,
channel: ch,
})
return nil
}
func (s *spreader) Spread(message *api.Message) {
for i := 0; i < len(s.watchers); i++ {
w := s.watchers[i]
if w.channel.IsClose() {
s.watchers[i] = s.watchers[len(s.watchers)-1]
s.watchers = s.watchers[:len(s.watchers)-1]
i--
continue
}
if s.match(w.key, message.Key) {
select {
case w.channel.Ch <- message:
default:
}
}
}
}
func (s *spreader) match(base, check *api.Key) bool {
if base.Type != "" && base.Type != check.Type {
return false
}
if base.Name != "" && base.Name != check.Name {
return false
}
if base.Namespace != "" && base.Namespace != check.Namespace {
return false
}
return true
}