/
nats.go
152 lines (133 loc) · 3.89 KB
/
nats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package nats
import (
"errors"
"fmt"
"strings"
"time"
"github.com/Bendomey/nucleo-go"
"github.com/Bendomey/nucleo-go/serializer"
"github.com/Bendomey/nucleo-go/transit"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type NatsTransporter struct {
prefix string
opts *nats.Options
conn *nats.Conn
logger *log.Entry
serializer serializer.Serializer
subscriptions []*nats.Subscription
}
type NATSOptions struct {
URL string
Name string
Logger *log.Entry
Serializer serializer.Serializer
ValidateMsg transit.ValidateMsgFunc
AllowReconnect bool
ReconnectWait time.Duration
MaxReconnect int
}
func natsOptions(options NATSOptions) *nats.Options {
opts := nats.GetDefaultOptions()
opts.Name = options.Name
opts.Url = options.URL
opts.AllowReconnect = options.AllowReconnect
if options.ReconnectWait != 0 {
opts.ReconnectWait = options.ReconnectWait
}
if options.MaxReconnect != 0 {
opts.MaxReconnect = options.MaxReconnect
}
return &opts
}
func CreateNatsTransporter(options NATSOptions) transit.Transport {
return &NatsTransporter{
opts: natsOptions(options),
logger: options.Logger,
serializer: options.Serializer,
subscriptions: []*nats.Subscription{},
}
}
func (t *NatsTransporter) Connect() chan error {
endChan := make(chan error)
go func() {
t.logger.Debugln("NATS Connect() - url: ", t.opts.Url, " Name: ", t.opts.Name)
conn, err := t.opts.Connect()
if err != nil {
t.logger.Errorln("NATS Connect() - Error: ", err, " url: ", t.opts.Url, " Name: ", t.opts.Name)
endChan <- errors.New(fmt.Sprint("Error connection to NATS. error: ", err, " url: ", t.opts.Url))
return
}
t.logger.Infoln("Connected to ", t.opts.Url)
t.conn = conn
endChan <- nil
}()
return endChan
}
func (t *NatsTransporter) Disconnect() chan error {
endChan := make(chan error)
go func() {
if t.conn == nil {
endChan <- nil
return
}
for _, sub := range t.subscriptions {
if err := sub.Unsubscribe(); err != nil {
t.logger.Errorln(err)
}
}
t.conn.Close()
t.conn = nil
endChan <- nil
}()
return endChan
}
func (t *NatsTransporter) topicName(command string, nodeID string) string {
parts := []string{t.prefix, command}
if nodeID != "" {
parts = append(parts, nodeID)
}
return strings.Join(parts, ".")
}
func (t *NatsTransporter) Subscribe(command, nodeID string, handler transit.TransportHandler) {
if t.conn == nil {
msg := fmt.Sprint("nats.Subscribe() No connection :( -> command: ", command, " nodeID: ", nodeID)
t.logger.Warnln(msg)
panic(errors.New(msg))
}
topic := t.topicName(command, nodeID)
sub, err := t.conn.Subscribe(topic, func(msg *nats.Msg) {
payload := t.serializer.BytesToPayload(&msg.Data)
t.logger.Debugln(fmt.Sprintf("Incoming %s packet from '%s'", topic, payload.Get("sender").String()))
handler(payload)
})
if err != nil {
t.logger.Errorln("Cannot subscribe: ", topic, " error: ", err)
return
}
t.subscriptions = append(t.subscriptions, sub)
}
func (t *NatsTransporter) Publish(command, nodeID string, message nucleo.Payload) {
if t.conn == nil {
msg := fmt.Sprint("nats.Publish() No connection :( -> command: ", command, " nodeID: ", nodeID)
t.logger.Warnln(msg)
panic(errors.New(msg))
}
topic := t.topicName(command, nodeID)
t.logger.Debugln("nats.Publish() command: ", command, " topic: ", topic, " nodeID: ", nodeID)
t.logger.Traceln("message: \n", message, "\n - end")
err := t.conn.Publish(topic, t.serializer.PayloadToBytes(message))
if err != nil {
t.logger.Errorln("Error on publish: error: ", err, " command: ", command, " topic: ", topic)
panic(err)
}
}
func (t *NatsTransporter) SetPrefix(prefix string) {
t.prefix = prefix
}
func (t *NatsTransporter) SetNodeID(nodeID string) {
}
func (t *NatsTransporter) SetSerializer(serializer serializer.Serializer) {
// Ignored while transporter initialized in pubsub function
}