-
Notifications
You must be signed in to change notification settings - Fork 320
/
ruler.go
81 lines (75 loc) · 1.79 KB
/
ruler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package main
import (
"github.com/256dpi/gomqtt/packet"
"github.com/baidu/openedge/logger"
"github.com/baidu/openedge/protocol/mqtt"
)
type ruler struct {
rule *Rule
hub *mqtt.Dispatcher
remote *mqtt.Dispatcher
log logger.Logger
}
func create(rule Rule, hub, remote mqtt.ClientInfo) *ruler {
defaults(&rule, &hub, &remote)
log := logger.WithField("rule", rule.Remote.Name)
return &ruler{
rule: &rule,
hub: mqtt.NewDispatcher(hub, log),
remote: mqtt.NewDispatcher(remote, log),
log: log,
}
}
func (rr *ruler) start() error {
hubHandler := mqtt.NewHandlerWrapper(
func(p *packet.Publish) error {
return rr.remote.Send(p)
},
func(p *packet.Puback) error {
return rr.remote.Send(p)
},
func(e error) {
rr.log.Errorln("hub error:", e.Error())
},
)
if err := rr.hub.Start(hubHandler); err != nil {
return err
}
remoteHandler := mqtt.NewHandlerWrapper(
func(p *packet.Publish) error {
return rr.hub.Send(p)
},
func(p *packet.Puback) error {
return rr.hub.Send(p)
},
func(e error) {
rr.log.Errorln("remote error:", e.Error())
},
)
if err := rr.remote.Start(remoteHandler); err != nil {
return err
}
return nil
}
func (rr *ruler) close() {
rr.hub.Close()
rr.remote.Close()
}
func defaults(rule *Rule, hub, remote *mqtt.ClientInfo) {
// set remote client id
// rules[].remote.clientid > remotes[].clientid > rules[].remote.name
if rule.Remote.ClientID != "" {
remote.ClientID = rule.Remote.ClientID
} else if remote.ClientID == "" {
remote.ClientID = rule.Remote.Name
}
// set hub client id
// rules[].hub.clientid > remote.ClientID
if rule.Hub.ClientID != "" {
hub.ClientID = rule.Hub.ClientID
} else {
hub.ClientID = remote.ClientID
}
hub.Subscriptions = rule.Hub.Subscriptions
remote.Subscriptions = rule.Remote.Subscriptions
}