Skip to content

Commit

Permalink
fix: consume and publish blocked after rabbitmq reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
maxinglun committed Apr 28, 2022
1 parent fe827c4 commit 9f64710
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
61 changes: 27 additions & 34 deletions v4/broker/rabbitmq/connection.go
Expand Up @@ -129,42 +129,35 @@ func (r *rabbitMQConn) reconnect(secure bool, config *amqp.Config) {
chanNotifyClose := make(chan *amqp.Error)
channel := r.ExchangeChannel.channel
channel.NotifyClose(chanNotifyClose)
channelNotifyReturn := make(chan amqp.Return)
channel.NotifyReturn(channelNotifyReturn)

// block until closed
select {
case result, ok := <-channelNotifyReturn:
if !ok {
// Channel closed, probably also the channel or connection.
// To avoid deadlocks it is necessary to consume the messages from all channels.
for notifyClose != nil || chanNotifyClose != nil {
// block until closed
select {
case err := <-chanNotifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
chanNotifyClose = nil
case err := <-notifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
notifyClose = nil
case <-r.close:
return
}
// Do what you need with messageFailing.
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Errorf("notify error reason: %s, description: %s", result.ReplyText, result.Exchange)
}
case err := <-chanNotifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
case err := <-notifyClose:
if logger.V(logger.ErrorLevel, logger.DefaultLogger) {
logger.Error(err)
}
// block all resubscribe attempt - they are useless because there is no connection to rabbitmq
// create channel 'waitConnection' (at this point channel is nil or closed, create it without unnecessary checks)
r.Lock()
r.connected = false
r.waitConnection = make(chan struct{})
r.Unlock()
case <-r.close:
return
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions v4/broker/rabbitmq/rabbitmq.go
Expand Up @@ -102,6 +102,10 @@ func (s *subscriber) resubscribe() {
return
//wait until we reconect to rabbit
case <-s.r.conn.waitConnection:
// When the connection is disconnected, the waitConnection will be re-assigned, so '<-s.r.conn.waitConnection' maybe blocked.
// Here, it returns once a second, and then the latest waitconnection will be used
case <-time.After(time.Second):
continue
}

// it may crash (panic) in case of Consume without connection, so recheck it
Expand Down Expand Up @@ -267,6 +271,15 @@ func (r *rbroker) Subscribe(topic string, handler broker.Handler, opts ...broker
for k, v := range msg.Headers {
header[k], _ = v.(string)
}

// Messages sent from other frameworks to rabbitmq do not have this header.
// The 'RoutingKey' in the message can be used as this header.
// Then the message can be transfered to the subscriber which bind this topic.
msgTopic := header["Micro-Topic"]
if msgTopic == "" {
header["Micro-Topic"] = msg.RoutingKey
}

m := &broker.Message{
Header: header,
Body: msg.Body,
Expand Down

0 comments on commit 9f64710

Please sign in to comment.