/
nats_client_runner.go
59 lines (51 loc) · 1.31 KB
/
nats_client_runner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package diegonats
import (
"errors"
"net/url"
"os"
"strings"
"code.cloudfoundry.org/lager/v3"
)
type NATSClientRunner struct {
addresses string
username string
password string
logger lager.Logger
client NATSClient
}
func NewClientRunner(addresses, username, password string, logger lager.Logger, client NATSClient) NATSClientRunner {
return NATSClientRunner{
addresses: addresses,
username: username,
password: password,
logger: logger.Session("nats-runner"),
client: client,
}
}
func (runner NATSClientRunner) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
natsMembers := []string{}
for _, addr := range strings.Split(runner.addresses, ",") {
uri := url.URL{
Scheme: "nats",
User: url.UserPassword(runner.username, runner.password),
Host: addr,
}
natsMembers = append(natsMembers, uri.String())
}
unexpectedConnClosed, err := runner.client.Connect(natsMembers)
if err != nil {
runner.logger.Error("connecting-to-nats-failed", err)
return err
}
runner.logger.Info("connecting-to-nats-succeeeded")
close(ready)
select {
case <-signals:
runner.client.Close()
runner.logger.Info("shutting-down")
return nil
case <-unexpectedConnClosed:
runner.logger.Error("unexpected-nats-close", nil)
return errors.New("nats closed unexpectedly")
}
}