/
eventbus.go
71 lines (58 loc) · 1.52 KB
/
eventbus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package eventbus
import (
"time"
"github.com/BrobridgeOrg/gravity-data-handler/pkg/app"
nats "github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
type Options struct {
PingInterval time.Duration
MaxPingsOutstanding int
MaxReconnects int
}
type EventBusHandler struct {
Reconnect func(natsConn *nats.Conn)
Disconnect func(natsConn *nats.Conn)
}
type EventBus struct {
app app.App
connection *nats.Conn
host string
handler *EventBusHandler
options *Options
}
func NewEventBus(a app.App, host string, handler EventBusHandler, options Options) *EventBus {
return &EventBus{
app: a,
connection: nil,
host: host,
handler: &handler,
options: &options,
}
}
func (eb *EventBus) Connect() error {
log.WithFields(log.Fields{
"host": eb.host,
"PingInterval": eb.options.PingInterval * time.Second,
"MaxPingsOutstanding": eb.options.MaxPingsOutstanding,
"MaxReconnects": eb.options.MaxReconnects,
}).Info("Connecting to event server")
nc, err := nats.Connect(eb.host,
nats.PingInterval(eb.options.PingInterval*time.Second),
nats.MaxPingsOutstanding(eb.options.MaxPingsOutstanding),
nats.MaxReconnects(eb.options.MaxReconnects),
nats.ReconnectHandler(eb.handler.Reconnect),
nats.DisconnectHandler(eb.handler.Disconnect),
)
if err != nil {
return err
}
eb.connection = nc
return nil
}
func (eb *EventBus) Close() {
eb.connection.Close()
}
func (eb *EventBus) GetConnection() *nats.Conn {
return eb.connection
}