Skip to content

Commit

Permalink
event_rabbitmq: reconnect to rabbit-mq server on "Socket error"
Browse files Browse the repository at this point in the history
 - closes #535
  • Loading branch information
ovidiusas committed Jun 17, 2015
1 parent 1a89f57 commit 237b771
Showing 1 changed file with 28 additions and 5 deletions.
33 changes: 28 additions & 5 deletions modules/event_rabbitmq/rabbitmq_send.c
Expand Up @@ -298,7 +298,7 @@ static int rmq_reconnect(evi_reply_sock *sock)
}

#ifdef AMQP_VERSION_v04
static inline int amqp_check_status(rmq_params_t *rmqp, int r)
static inline int amqp_check_status(rmq_params_t *rmqp, int r, int* re_publish)
{
switch (r) {
case AMQP_STATUS_OK:
Expand Down Expand Up @@ -331,7 +331,8 @@ static inline int amqp_check_status(rmq_params_t *rmqp, int r)

/* This is happening on rabbitmq server restart */
case AMQP_STATUS_SOCKET_ERROR:
LM_ERR("Socket error\n");
LM_WARN("Socket error\n");
if (*re_publish == 0) *re_publish = 1;
break;

default:
Expand All @@ -344,7 +345,7 @@ static inline int amqp_check_status(rmq_params_t *rmqp, int r)
return r;
}
#else
static inline int amqp_check_status(rmq_params_t *rmqp, int r)
static inline int amqp_check_status(rmq_params_t *rmqp, int r, int* re_publish)
{
if (r != 0) {
LM_ERR("Unknown AMQP error [%d] while sending\n", r);
Expand All @@ -360,7 +361,8 @@ static inline int amqp_check_status(rmq_params_t *rmqp, int r)
static int rmq_sendmsg(rmq_send_t *rmqs)
{
rmq_params_t * rmqp = (rmq_params_t *)rmqs->sock->params;
int ret;
int ret,rtrn;
int re_publish = 0;

if (!(rmqp->flags & RMQ_PARAM_CONN))
return 0;
Expand All @@ -377,7 +379,28 @@ static int rmq_sendmsg(rmq_send_t *rmqs)
0,
amqp_cstring_bytes(rmqs->msg));

return amqp_check_status(rmqp, ret);
rtrn = amqp_check_status(rmqp, ret, &re_publish);

if (rtrn != 0 && re_publish != 0) {
if (rmq_reconnect(rmqs->sock) < 0) {
LM_ERR("cannot reconnect socket\n");
return rtrn;
}
/* all checks should be already done */
ret = amqp_basic_publish(rmqp->conn,
rmqp->channel,
rmqp->flags&RMQ_PARAM_EKEY?
amqp_cstring_bytes(rmqp->exchange.s) :
AMQP_EMPTY_BYTES ,
amqp_cstring_bytes(rmqp->routing_key.s),
0,
0,
0,
amqp_cstring_bytes(rmqs->msg));
rtrn = amqp_check_status(rmqp, ret, &re_publish);
}

return rtrn;
}

void rmq_process(int rank)
Expand Down

0 comments on commit 237b771

Please sign in to comment.