-
Notifications
You must be signed in to change notification settings - Fork 36
/
rabbitmq.go
99 lines (90 loc) · 2.04 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package storage
import (
"fmt"
"github.com/streadway/amqp"
"github.com/xfyun/xsf/utils"
"strings"
"sync"
)
const rabbitSep = "://"
var (
rabUrl string
rabQueue string
rabLog *utils.Logger
con *amqp.Connection
channel *amqp.Channel
queue amqp.Queue
lock sync.Mutex
)
// TODO pre-init multi rmq‘s channel
// TODO publish retry if fail
func RabInit(host string, usr string, pass string, name string, logger *utils.Logger) (err error) {
if len(host) > 0 {
rabLog = logger
rabQueue = name
rabUrl = host
if index := strings.Index(host, rabbitSep); index != -1 {
rabUrl = host[:index+len(rabbitSep)] + usr + ":" + pass + "@" + host[index+len(rabbitSep):]
}
con, err = amqp.Dial(rabUrl)
if err != nil {
rabLog.Errorw("rabbitmq init Dial fail", "err", err.Error(), "url", rabUrl)
return
}
channel, err = con.Channel()
if err != nil {
rabLog.Errorw("rabbitmq init Channel fail", "err", err.Error(), "url", rabUrl)
return
}
if queue, err = channel.QueueDeclare(
rabQueue,
true,
false,
false,
false,
nil); err != nil {
rabLog.Errorw("rabbitmq init QueueDeclare fail", "err", err.Error(), "url", rabUrl)
}
}
return
}
func RabPublish(body []byte, retry int) (err error) {
for len(rabUrl) > 0 && retry >= 0 {
lock.Lock()
if err = channel.Publish(
"",
rabQueue,
false,
false,
amqp.Publishing{
Body: body,
}); err != nil {
rabRecon() // reconnect
retry--
rabLog.Errorw("rabbitmq publish fail", "err", err.Error(), "url", rabUrl)
} else {
retry = -1 // 成功则无需重试
}
lock.Unlock()
}
return
}
func RabFini() {
if len(rabUrl) > 0 {
channel.Close()
con.Close()
fmt.Println("aiService.Finit: fini rabbit success!")
}
}
func rabRecon() {
var err error
if con, err = amqp.Dial(rabUrl); err != nil {
rabLog.Errorw("rabbitmq recon Dial fail", "err", err.Error(), "url", rabUrl)
return
}
if channel, err = con.Channel(); err != nil {
rabLog.Errorw("rabbitmq recon Channel fail", "err", err.Error(), "url", rabUrl)
return
}
return
}