-
Notifications
You must be signed in to change notification settings - Fork 0
/
router.go
133 lines (122 loc) · 3.08 KB
/
router.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package auramq
import (
"sync"
"github.com/aurawing/auramq/msg"
"github.com/fatih/set"
)
//Router routing message to subscriber
type Router struct {
rtable map[string]set.Interface
rrtable map[Subscriber]set.Interface
broadcast chan *msg.Message
done chan struct{}
lock sync.RWMutex
}
//NewRouter create a new router instance
func NewRouter(bufferSize int) *Router {
return &Router{
rtable: make(map[string]set.Interface),
rrtable: make(map[Subscriber]set.Interface),
broadcast: make(chan *msg.Message, bufferSize),
done: make(chan struct{}),
}
}
//Register register topics for subscriber
func (router *Router) Register(client Subscriber, topics []string) {
router.lock.Lock()
defer router.lock.Unlock()
if router.rrtable[client] == nil {
router.rrtable[client] = set.New(set.NonThreadSafe)
}
s := set.New(set.NonThreadSafe)
for _, t := range topics {
s.Add(t)
}
intersect := set.Difference(s, router.rrtable[client])
for _, t := range intersect.List() {
router.rrtable[client].Add(t)
}
for _, topic := range intersect.List() {
if router.rtable[topic.(string)] == nil {
router.rtable[topic.(string)] = set.New(set.NonThreadSafe)
}
router.rtable[topic.(string)].Add(client)
}
}
//UnregisterSubscriber unregister all topics for subscriber
func (router *Router) UnregisterSubscriber(client Subscriber) {
router.lock.Lock()
defer router.lock.Unlock()
if _, ok := router.rrtable[client]; !ok {
return
}
if router.rrtable[client].Size() == 0 {
delete(router.rrtable, client)
return
}
topics := router.rrtable[client].List()
topicList := make([]string, 0)
for _, t := range topics {
topicList = append(topicList, t.(string))
}
router.unregister(client, topicList)
}
//Unregister unregister topics for subscriber
func (router *Router) Unregister(client Subscriber, topics []string) {
router.lock.Lock()
defer router.lock.Unlock()
router.unregister(client, topics)
}
func (router *Router) unregister(client Subscriber, topics []string) {
if _, ok := router.rrtable[client]; !ok {
return
}
if router.rrtable[client].Size() == 0 {
delete(router.rrtable, client)
return
}
for _, topic := range topics {
router.rtable[topic].Remove(client)
if router.rtable[topic].Size() == 0 {
delete(router.rtable, topic)
}
router.rrtable[client].Remove(topic)
}
if router.rrtable[client].Size() == 0 {
delete(router.rrtable, client)
}
}
//Publish publish message to a topic
func (router *Router) Publish(msg *msg.Message) {
router.broadcast <- msg
}
//Run start router
func (router *Router) Run() {
OUT:
for {
select {
case msg := <-router.broadcast:
if router.rtable[msg.Topic] != nil {
for _, client := range router.rtable[msg.Topic].List() {
cli := client.(Subscriber)
cli.Send(msg)
// if !cli.Send(msg) {
// cli.Close()
// router.UnregisterSubscriber(cli)
// }
}
}
case _ = <-router.done:
break OUT
}
}
close(router.broadcast)
close(router.done)
for cli := range router.rrtable {
cli.Close()
}
}
//Close shutdown router
func (router *Router) Close() {
router.done <- struct{}{}
}