-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
136 lines (114 loc) · 2.42 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package ramqp
import (
"fmt"
"github.com/streadway/amqp"
"log"
"time"
)
type publishing struct {
content amqp.Publishing
key string
exchange string
}
type POpt func(p *Publisher)
type Publisher struct {
ExchangeType string
ExchangeName string
NotifyReturn func(msg amqp.Return)
channel *amqp.Channel
msgChan chan publishing
retErr chan error
// exchange declare params
exchangeDurable bool
exchangeAutoDel bool
exchangeInternal bool
exchangeNoWait bool
exchangeArgs amqp.Table
publishMandatory bool
}
func (p *Publisher) init(options []POpt) error {
p.retErr = make(chan error)
p.msgChan = make(chan publishing)
p.dealOptions(options)
return nil
}
func (p *Publisher) dealOptions(options []POpt) {
if len(options) < 1 {
return
}
for _, opt := range options {
opt(p)
}
}
func (p *Publisher) createChannel(conn *amqp.Connection) error {
var err error
p.channel, err = conn.Channel()
if err != nil {
return err
}
if len(p.ExchangeName) > 0 && len(p.ExchangeType) > 0 {
err = p.channel.ExchangeDeclare(
p.ExchangeName,
p.ExchangeType,
p.exchangeDurable,
p.exchangeAutoDel,
p.exchangeInternal,
p.exchangeNoWait,
p.exchangeArgs,
)
if err != nil {
return fmt.Errorf("exchange declare err: %v", err)
}
}
return nil
}
func (p *Publisher) listen(conn *amqp.Connection) {
err := p.createChannel(conn)
if err != nil {
log.Println("create msg queue err: ", err)
p.retry(conn)
return
}
p.handleMsg()
p.retry(conn)
}
func (p *Publisher) handleMsg() {
closeErr := make(chan *amqp.Error)
p.channel.NotifyClose(closeErr)
notifyReturn := make(chan amqp.Return)
p.channel.NotifyReturn(notifyReturn)
for {
select {
case msg, valid := <-p.msgChan:
if valid {
err := p.channel.Publish(msg.exchange, msg.key, p.publishMandatory, false, msg.content)
p.retErr <- err
}
case Return, ok := <-notifyReturn:
if p.NotifyReturn != nil && ok {
go p.NotifyReturn(Return)
}
case <-closeErr:
return
}
}
}
func (p *Publisher) retry(conn *amqp.Connection) {
go func() {
if !conn.IsClosed() {
time.Sleep(5 * time.Second) // todo exponential backoff
log.Println("publisher retry to listen ...")
p.listen(conn)
}
}()
}
func (p *Publisher) Publish(exchange, key string, msg amqp.Publishing) (err error) {
pMsg := publishing{
exchange: exchange,
key: key,
content: msg,
}
p.msgChan <- pMsg
err = <-p.retErr
return err
}