Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
460 lines (384 sloc) 10.1 KB
package rmq
import (
"fmt"
"log"
"math/rand"
"os"
"runtime"
"sync/atomic"
"time"
"github.com/satori/go.uuid"
"github.com/streadway/amqp"
)
type ExchangeOptions struct {
Name string
Type string
Durable bool
AutoDelete bool
Internal bool
NoWait bool
Args amqp.Table
}
type QueueOptions struct {
QueueName string
Durable bool // will be set to false when queue name is empty (autogenerated name)
AutoDelete bool
Exclusive bool
NoWait bool
Args amqp.Table
}
type QueueBindOptions struct {
RoutingKey string
NoWait bool
Args amqp.Table
}
type ConsumeOptions struct {
ClientTag string
NoAck bool
Exclusive bool
NoWait bool
Args amqp.Table
FnCallback func([]byte)
}
type exchangeQueueBinding struct {
exitOnErr bool // passed from parent
isConsumer atomic.Value // binding is consumer
autoReconnect bool // should be passed from parent
checker atomic.Value // queue inspect is running?
checkquit chan bool // `checker` terminate channel
queueThreshold int // passed from parent
tagPrefix string // this + generated uuid as tag when queue name is generated
tempTag string // this as tag when queue name is provided
exchangeOpt *ExchangeOptions
queueOpt *QueueOptions
queueBindOpt *QueueBindOptions
consumeOpt *ConsumeOptions
}
func (e *exchangeQueueBinding) setup(ch *amqp.Channel) error {
// At the very least, we should have an exchange option to be able to send to an exchange.
if e.exchangeOpt == nil {
return fmt.Errorf("exchange option not provided")
}
err := ch.ExchangeDeclare(
e.exchangeOpt.Name, // name of the exchange
e.exchangeOpt.Type, // type
e.exchangeOpt.Durable, // durable
e.exchangeOpt.AutoDelete, // delete when complete
e.exchangeOpt.Internal, // internal
e.exchangeOpt.NoWait, // no wait
e.exchangeOpt.Args, // arguments
)
if err != nil {
return err
}
if e.queueOpt == nil {
return nil
}
autoDelete := e.queueOpt.AutoDelete
if e.queueOpt.QueueName == "" {
autoDelete = true
}
queue, err := ch.QueueDeclare(
e.queueOpt.QueueName, // name of the queue
e.queueOpt.Durable, // durable
autoDelete, // delete when unused
e.queueOpt.Exclusive, // exclusive
e.queueOpt.NoWait, // no wait
e.queueOpt.Args, // arguments
)
if err != nil {
return err
}
if e.queueBindOpt == nil {
return fmt.Errorf("queue bind option not provided")
}
log.Printf("[info] queue (%q %d messages, %d consumers), binding to exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, e.queueBindOpt.RoutingKey)
err = ch.QueueBind(
queue.Name, // name of the queue
e.queueBindOpt.RoutingKey, // routing key
e.exchangeOpt.Name, // source exchange
e.queueBindOpt.NoWait, // no wait
e.queueBindOpt.Args, // arguments
)
if err != nil {
return err
}
if !e.checker.Load().(bool) {
thresholdErrCount := 0
tc := time.NewTicker(time.Second * 2).C
go func() {
time.Sleep(time.Second * 20)
for {
select {
case <-tc:
q, err := ch.QueueInspect(queue.Name)
if err != nil {
if e.exitOnErr {
log.Fatalf("[error] queue inspect failed: %v, quit", err)
} else {
log.Printf("[error] queue inspect failed: %v", err)
}
}
if err == nil {
log.Printf("[check] queue %q (%d messages, %d consumers)",
q.Name,
q.Messages,
q.Consumers)
if q.Consumers == 0 {
// if we are consumer and 0 consumers, something is wrong with this queue
if e.isConsumer.Load().(bool) {
log.Fatalf("[error] own consumer (self) not detected, quit")
}
if q.Messages >= e.queueThreshold {
thresholdErrCount += 1
if thresholdErrCount >= 5 {
log.Printf("[warn] queue has %d messages, 0 consumers", q.Messages)
}
}
} else {
thresholdErrCount = 0
}
}
case <-e.checkquit:
break
}
}
}()
e.checker.Store(true)
}
if e.consumeOpt == nil {
return nil
}
if e.consumeOpt.FnCallback == nil {
return fmt.Errorf("no callback function for consume")
}
tag := e.tempTag
if e.queueOpt.QueueName == "" {
tag = e.tagPrefix + fmt.Sprintf("_%s", uuid.NewV4())
}
d, err := ch.Consume(
queue.Name, // name
tag, // client tag,
e.consumeOpt.NoAck, // no ack
e.consumeOpt.Exclusive, // exclusive
false, // noLocal (not supported in RabbitMQ)
e.consumeOpt.NoWait, // no wait
e.consumeOpt.Args, // arguments
)
if err != nil {
return err
}
go func() {
for m := range d {
body := m.Body[:]
if e.consumeOpt.FnCallback != nil {
e.consumeOpt.FnCallback(body)
}
// manual ack
if !e.consumeOpt.NoAck {
m.Ack(false)
}
}
}()
return nil
}
type Config struct {
Host string
Port int
Username string
Password string
Vhost string
// AutoReconnect, if true, will attempt to listen to connection state channels and
// will reconnect to server using current configurations.
AutoReconnect bool
// QueueThreshold is the max number of messages in the queue in which we consider
// something is wrong, i.e. no consumer(s) is(are) draining on the other side.
QueueThreshold int
// ExitOnErr, if true, will cause this lib's client(s) to terminate using log.Fatal(f).
// This is useful when one doesn't need to utilize the autoreconnection routine and
// rely on the environment (i.e. Kubernetes) to restart the application.
ExitOnErr bool
}
type RabbitMqBroker struct {
config *Config
conn *amqp.Connection
channel *amqp.Channel
done chan bool
lastRecoverTime int64
currentStatus atomic.Value
bindings map[string]*exchangeQueueBinding
}
func New(c *Config) *RabbitMqBroker {
if c.QueueThreshold == 0 {
// our default threshold if not specified
c.QueueThreshold = 100
}
broker := &RabbitMqBroker{
config: c,
done: make(chan bool),
lastRecoverTime: time.Now().Unix(),
bindings: make(map[string]*exchangeQueueBinding),
}
broker.currentStatus.Store(false)
return broker
}
func (b *RabbitMqBroker) isUp() bool { return b.currentStatus.Load().(bool) }
func (b *RabbitMqBroker) Connect() error {
if b.config == nil {
return fmt.Errorf("config is nil")
}
conf := amqp.URI{
Scheme: "amqp",
Host: b.config.Host,
Port: b.config.Port,
Username: b.config.Username,
Password: b.config.Password,
Vhost: b.config.Vhost,
}.String()
var err error
b.conn, err = amqp.Dial(conf)
if err != nil {
return err
}
if b.config.AutoReconnect {
go func() {
v, ok := <-b.conn.NotifyClose(make(chan *amqp.Error))
if v == nil && !ok {
log.Println("[info] normal notifyclose, quit goroutine")
runtime.Goexit()
}
log.Printf("[error] notifyclose error: %v", v)
if b.config.ExitOnErr {
time.Sleep(time.Second * 5)
log.Fatalf("[error] notifyclose error: %v", v)
}
retry := 1
for {
b.Close()
time.Sleep(time.Duration(15+rand.Intn(60)+2*retry) * time.Second)
log.Println("[info] try reconnect:", retry)
// try to reconnect
if err := b.Connect(); err != nil {
log.Println("[error] retry:", retry, err)
retry += 1
} else {
log.Println("[info] reconnect successful")
break
}
}
}()
}
b.channel, err = b.conn.Channel()
if err != nil {
return err
}
log.Println("[info] connection established")
b.currentStatus.Store(true)
// re-establish exchange-queue bindings
if len(b.bindings) > 0 {
for k, v := range b.bindings {
log.Printf("[info] resetup binding: %v, %+v", k, v)
err = v.setup(b.channel)
if err != nil {
return err
}
}
}
return nil
}
type BindConfig struct {
ExchangeOpt *ExchangeOptions
QueueOpt *QueueOptions
QueueBindOpt *QueueBindOptions
ConsumeOpt *ConsumeOptions
}
func (b *RabbitMqBroker) AddBinding(bc *BindConfig) (string, error) {
name, err := os.Hostname()
if err != nil {
name = "sim"
}
id := fmt.Sprintf("%s", uuid.NewV4())
var tagPrefix, tempTag string
if bc.ConsumeOpt != nil {
tagPrefix = fmt.Sprintf("%s_%s", bc.ConsumeOpt.ClientTag, name)
tempTag = fmt.Sprintf("%s_%s", tagPrefix, id)
}
b.bindings[id] = &exchangeQueueBinding{
exitOnErr: b.config.ExitOnErr,
autoReconnect: b.config.AutoReconnect,
checkquit: make(chan bool),
queueThreshold: b.config.QueueThreshold,
exchangeOpt: bc.ExchangeOpt,
queueOpt: bc.QueueOpt,
queueBindOpt: bc.QueueBindOpt,
consumeOpt: bc.ConsumeOpt,
tagPrefix: tagPrefix,
tempTag: tempTag,
}
b.bindings[id].checker.Store(false)
b.bindings[id].isConsumer.Store(false)
v, _ := b.bindings[id]
err = v.setup(b.channel)
if err != nil {
return "", err
}
return id, nil
}
// Send publishes a message payload using the binding indicated by `id`. An optional
// content type mime can be provided with the default value of "text/plain".
func (b *RabbitMqBroker) Send(id, key string, payload []byte, ct ...string) error {
/*
if !b.isUp() {
return fmt.Errorf("connection is closed")
}
*/
bind, ok := b.bindings[id]
if !ok {
return fmt.Errorf("binding not found")
}
contentType := "text/plain"
if len(ct) > 0 {
contentType = ct[0]
}
return b.channel.Publish(bind.exchangeOpt.Name,
key,
false,
false,
amqp.Publishing{
ContentType: contentType,
Body: payload,
},
)
}
type SendConfig struct {
Mandatory bool
Immediate bool
PublishConf *amqp.Publishing
}
func (b *RabbitMqBroker) SendWithConfig(id, key string, sc SendConfig) error {
/*
if !b.isUp() {
return fmt.Errorf("connection is closed")
}
*/
bind, ok := b.bindings[id]
if !ok {
return fmt.Errorf("binding not found")
}
if sc.PublishConf == nil {
return fmt.Errorf("publish config should not be nil")
}
return b.channel.Publish(bind.exchangeOpt.Name, key, sc.Mandatory, sc.Immediate, *sc.PublishConf)
}
func (b *RabbitMqBroker) Close() {
if b.channel != nil {
b.channel.Close()
b.channel = nil
}
if b.conn != nil {
b.conn.Close()
b.conn = nil
}
b.currentStatus.Store(false)
}
You can’t perform that action at this time.