diff --git a/modules/rabbitmq/rmq_servers.c b/modules/rabbitmq/rmq_servers.c index 0f3972d267b..fff89819943 100644 --- a/modules/rabbitmq/rmq_servers.c +++ b/modules/rabbitmq/rmq_servers.c @@ -32,6 +32,7 @@ #include "../../ut.h" #include "rmq_servers.h" +#include #if defined AMQP_VERSION && AMQP_VERSION >= 0x00040000 #define AMQP_VERSION_v04 @@ -520,3 +521,111 @@ void rmq_connect_servers(void) srv->uri.host, srv->uri.port); } } + +static inline int amqp_check_status(struct rmq_server *srv, int r, int* retry) +{ +#ifndef AMQP_VERSION_v04 + if (r != 0) { + LM_ERR("[%.*s] unknown AMQP error [%d] while sending\n", + srv->cid.len, srv->cid.s, r); + /* we close the connection here to be able to re-connect later */ + /* TODO: close the connection */ + return r; + } + return 0; +#else + switch (r) { + case AMQP_STATUS_OK: + return 0; + + case AMQP_STATUS_TIMER_FAILURE: + LM_ERR("[%.*s] timer failure\n", srv->cid.len, srv->cid.s); + goto no_close; + + case AMQP_STATUS_NO_MEMORY: + LM_ERR("[%.*s] no more memory\n", srv->cid.len, srv->cid.s); + goto no_close; + + case AMQP_STATUS_TABLE_TOO_BIG: + LM_ERR("[%.*s] a table in the properties was too large to fit in " + "a single frame\n", srv->cid.len, srv->cid.s); + goto no_close; + + case AMQP_STATUS_HEARTBEAT_TIMEOUT: + LM_ERR("[%.*s] heartbeat timeout\n", srv->cid.len, srv->cid.s); + break; + + case AMQP_STATUS_CONNECTION_CLOSED: + LM_ERR("[%.*s] connection closed\n", srv->cid.len, srv->cid.s); + break; + + /* this should not happened since we do not use ssl */ + case AMQP_STATUS_SSL_ERROR: + LM_ERR("[%.*s] SSL error\n", srv->cid.len, srv->cid.s); + break; + + case AMQP_STATUS_TCP_ERROR: + LM_ERR("[%.*s] TCP error: %s(%d)\n", srv->cid.len, srv->cid.s, + strerror(errno), errno); + break; + + /* This is happening on rabbitmq server restart */ + case AMQP_STATUS_SOCKET_ERROR: + LM_WARN("[%.*s] socket error\n", srv->cid.len, srv->cid.s); + break; + + default: + LM_ERR("[%.*s] unknown AMQP error[%d]: %s(%d)\n", + srv->cid.len, srv->cid.s, r, strerror(errno), errno); + break; + } + /* we close the connection here to be able to re-connect later */ + rmq_close_server(srv); +no_close: + if (retry && *retry > 0) { + (*retry)--; + return 1; + } + return r; +#endif +} + +int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype) +{ + int ret; + amqp_bytes_t akey; + amqp_bytes_t abody; + amqp_basic_properties_t props; + int retries = srv->retries; + + akey.len = rkey->len; + akey.bytes = rkey->s; + abody.len = body->len; + abody.bytes = body->s; + memset(&props, 0, sizeof props); + + if (ctype) { + props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG; + props.content_type.len = ctype->len; + props.content_type.bytes = ctype->s; + } + if (!(srv->flags & RMQF_NOPER)) { + props.delivery_mode = 2; + props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG; + } + + do { + if (rmq_reconnect(srv) < 0) { + LM_ERR("[%.*s] cannot send RabbitMQ message\n", + srv->cid.len, srv->cid.s); + return -1; + } + + ret = amqp_basic_publish(srv->conn, 1, srv->exchange, akey, \ + (srv->flags & RMQF_MAND), (srv->flags & RMQF_IMM), + &props, abody); + ret = amqp_check_status(srv, ret, &retries); + } while (ret > 0); + + return ret; +}