/
nats_adaptor.go
128 lines (108 loc) · 3.02 KB
/
nats_adaptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package nats
import (
"net/url"
"strings"
"github.com/nats-io/nats.go"
"gobot.io/x/gobot"
)
// Adaptor is a configuration struct for interacting with a NATS server.
// Name is a logical name for the adaptor/nats server connection.
// Host is in the form "localhost:4222" which is the hostname/ip and port of the nats server.
// ClientID is a unique identifier integer that specifies the identity of the client.
type Adaptor struct {
name string
Host string
clientID int
username string
password string
client *nats.Conn
connect func() (*nats.Conn, error)
}
// Message is a message received from the server.
type Message *nats.Msg
// NewAdaptor populates a new NATS Adaptor.
func NewAdaptor(host string, clientID int, options ...nats.Option) *Adaptor {
hosts, err := processHostString(host)
return &Adaptor{
name: gobot.DefaultName("NATS"),
Host: hosts,
clientID: clientID,
connect: func() (*nats.Conn, error) {
if err != nil {
return nil, err
}
return nats.Connect(hosts, options...)
},
}
}
// NewAdaptorWithAuth populates a NATS Adaptor including username and password.
func NewAdaptorWithAuth(host string, clientID int, username string, password string, options ...nats.Option) *Adaptor {
hosts, err := processHostString(host)
return &Adaptor{
Host: hosts,
clientID: clientID,
username: username,
password: password,
connect: func() (*nats.Conn, error) {
if err != nil {
return nil, err
}
return nats.Connect(hosts, append(options, nats.UserInfo(username, password))...)
},
}
}
func processHostString(host string) (string, error) {
urls := strings.Split(host, ",")
for i, s := range urls {
s = strings.TrimSpace(s)
if !strings.HasPrefix(s, "tls://") && !strings.HasPrefix(s, "nats://") {
s = "nats://" + s
}
u, err := url.Parse(s)
if err != nil {
return "", err
}
urls[i] = u.String()
}
return strings.Join(urls, ","), nil
}
// Name returns the logical client name.
func (a *Adaptor) Name() string { return a.name }
// SetName sets the logical client name.
func (a *Adaptor) SetName(n string) { a.name = n }
// Connect makes a connection to the Nats server.
func (a *Adaptor) Connect() (err error) {
a.client, err = a.connect()
return
}
// Disconnect from the nats server.
func (a *Adaptor) Disconnect() (err error) {
if a.client != nil {
a.client.Close()
}
return
}
// Finalize is simply a helper method for the disconnect.
func (a *Adaptor) Finalize() (err error) {
a.Disconnect()
return
}
// Publish sends a message with the particular topic to the nats server.
func (a *Adaptor) Publish(topic string, message []byte) bool {
if a.client == nil {
return false
}
a.client.Publish(topic, message)
return true
}
// On is an event-handler style subscriber to a particular topic (named event).
// Supply a handler function to use the bytes returned by the server.
func (a *Adaptor) On(event string, f func(msg Message)) bool {
if a.client == nil {
return false
}
a.client.Subscribe(event, func(msg *nats.Msg) {
f(msg)
})
return true
}