-
Notifications
You must be signed in to change notification settings - Fork 1
/
route.go
122 lines (105 loc) · 2.46 KB
/
route.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package ktmt
import (
"container/list"
"sync"
"github.com/eleztian/ktmt/packets"
)
type Customer interface {
GetAckFunc(msg *packets.PublishPacket) func()
}
type MessageHandler func(Message) bool
type Router interface {
// AddRoute 添加路由
AddRoute(topic string, callback MessageHandler)
// DeleteRoute 删除路由
DeleteRoute(topic string)
// SetDefaultHandler 设置默认handler.
SetDefaultHandler(handler MessageHandler)
// MatchAndDispatch 分发消息,并处理. order 顺序执行.
MatchAndDispatch(messages <-chan Message, order bool)
}
type route struct {
topic string
callback MessageHandler
}
func (r *route) match(topic string) bool {
return r.topic == topic
}
type router struct {
sync.RWMutex
routes *list.List
defaultHandler MessageHandler
messages chan *packets.PublishPacket
}
func (r *router) AddRoute(topic string, callback MessageHandler) {
r.Lock()
defer r.Unlock()
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).topic == topic {
r := e.Value.(*route)
r.callback = callback
return
}
}
r.routes.PushBack(&route{topic: topic, callback: callback})
}
func (r *router) DeleteRoute(topic string) {
r.Lock()
defer r.Unlock()
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).topic == topic {
r.routes.Remove(e)
return
}
}
}
func (r *router) SetDefaultHandler(handler MessageHandler) {
r.Lock()
defer r.Unlock()
r.defaultHandler = handler
}
func (r *router) MatchAndDispatch(messages <-chan Message, order bool) {
for message := range messages {
sent := false
handlers := make([]MessageHandler, 0)
r.RLock()
for e := r.routes.Front(); e != nil; e = e.Next() {
if e.Value.(*route).match(message.Topic()) {
if order {
handlers = append(handlers, e.Value.(*route).callback)
} else {
hd := e.Value.(*route).callback
go func() {
if hd(message) {
message.Ack()
}
}()
}
sent = true
}
}
if !sent { // not found callback, do default handler.
if r.defaultHandler != nil {
if order {
handlers = append(handlers, r.defaultHandler)
} else {
go func() {
if r.defaultHandler(message) {
message.Ack()
}
}()
}
}
}
r.RUnlock()
for _, handler := range handlers {
if handler(message) {
message.Ack()
}
}
}
}
func NewRouter() *router {
router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
return router
}