/
producer.go
119 lines (104 loc) 路 2.22 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package nats
import (
"log"
"time"
"github.com/nats-io/nats.go"
)
// Producer define a nats producer
type Producer struct {
addr string
conn *nats.Conn
connClose chan bool
quit chan struct{}
}
// NewProducer create a producer
func NewProducer(addr string) *Producer {
p := &Producer{
addr: addr,
connClose: make(chan bool),
quit: make(chan struct{}),
}
if err := p.Start(); err != nil {
log.Println("nats start producer err: ", err)
}
return p
}
// Start .
func (p *Producer) Start() error {
if err := p.Run(); err != nil {
return err
}
log.Println("nats producer connected and running!")
go p.ReConnect()
return nil
}
// Stop .
func (p *Producer) Stop() {
close(p.quit)
if !p.conn.IsClosed() {
p.conn.Close()
}
}
// Run .
func (p *Producer) Run() error {
var err error
opts := nats.Options{
MaxReconnect: -1,
ClosedCB: func(conn *nats.Conn) {
p.connClose <- true
log.Println("nats producer - connection closed cb")
},
DisconnectedErrCB: func(conn *nats.Conn, err error) {
log.Println("nats producer - connection disconnected err cb")
},
ReconnectedCB: func(conn *nats.Conn) {
log.Println("nats producer - connection reconnected cb")
},
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("nats producer - connection async err cb")
},
}
p.conn, err = opts.Connect()
return err
}
// ReConnect .
func (p *Producer) ReConnect() {
for {
select {
case closed := <-p.connClose:
if closed {
log.Println("nats producer - connection closed")
}
case <-p.quit:
return
}
if !p.conn.IsClosed() {
p.conn.Close()
}
quit:
for {
select {
case <-p.quit:
return
default:
log.Println("nats producer - reconnect")
if err := p.Run(); err != nil {
log.Println("nats producer - failCheck: ", err)
// sleep 5s reconnect
time.Sleep(time.Second * 5)
continue
}
log.Println("nats producer connected and running!")
break quit
}
}
}
}
// Publish push data to queue
func (p *Producer) Publish(topic string, data interface{}) error {
encodeConn, err := nats.NewEncodedConn(p.conn, nats.JSON_ENCODER)
if err != nil {
return err
}
return encodeConn.Publish(topic, data)
}