Skip to content

Commit

Permalink
rabbitmq: add send command
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed Jan 27, 2017
1 parent 5334434 commit 3096757
Showing 1 changed file with 109 additions and 0 deletions.
109 changes: 109 additions & 0 deletions modules/rabbitmq/rmq_servers.c
Expand Up @@ -32,6 +32,7 @@
#include "../../ut.h"

#include "rmq_servers.h"
#include <amqp_framing.h>

#if defined AMQP_VERSION && AMQP_VERSION >= 0x00040000
#define AMQP_VERSION_v04
Expand Down Expand Up @@ -520,3 +521,111 @@ void rmq_connect_servers(void)
srv->uri.host, srv->uri.port);
}
}

static inline int amqp_check_status(struct rmq_server *srv, int r, int* retry)
{
#ifndef AMQP_VERSION_v04
if (r != 0) {
LM_ERR("[%.*s] unknown AMQP error [%d] while sending\n",
srv->cid.len, srv->cid.s, r);
/* we close the connection here to be able to re-connect later */
/* TODO: close the connection */
return r;
}
return 0;
#else
switch (r) {
case AMQP_STATUS_OK:
return 0;

case AMQP_STATUS_TIMER_FAILURE:
LM_ERR("[%.*s] timer failure\n", srv->cid.len, srv->cid.s);
goto no_close;

case AMQP_STATUS_NO_MEMORY:
LM_ERR("[%.*s] no more memory\n", srv->cid.len, srv->cid.s);
goto no_close;

case AMQP_STATUS_TABLE_TOO_BIG:
LM_ERR("[%.*s] a table in the properties was too large to fit in "
"a single frame\n", srv->cid.len, srv->cid.s);
goto no_close;

case AMQP_STATUS_HEARTBEAT_TIMEOUT:
LM_ERR("[%.*s] heartbeat timeout\n", srv->cid.len, srv->cid.s);
break;

case AMQP_STATUS_CONNECTION_CLOSED:
LM_ERR("[%.*s] connection closed\n", srv->cid.len, srv->cid.s);
break;

/* this should not happened since we do not use ssl */
case AMQP_STATUS_SSL_ERROR:
LM_ERR("[%.*s] SSL error\n", srv->cid.len, srv->cid.s);
break;

case AMQP_STATUS_TCP_ERROR:
LM_ERR("[%.*s] TCP error: %s(%d)\n", srv->cid.len, srv->cid.s,
strerror(errno), errno);
break;

/* This is happening on rabbitmq server restart */
case AMQP_STATUS_SOCKET_ERROR:
LM_WARN("[%.*s] socket error\n", srv->cid.len, srv->cid.s);
break;

default:
LM_ERR("[%.*s] unknown AMQP error[%d]: %s(%d)\n",
srv->cid.len, srv->cid.s, r, strerror(errno), errno);
break;
}
/* we close the connection here to be able to re-connect later */
rmq_close_server(srv);
no_close:
if (retry && *retry > 0) {
(*retry)--;
return 1;
}
return r;
#endif
}

int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype)
{
int ret;
amqp_bytes_t akey;
amqp_bytes_t abody;
amqp_basic_properties_t props;
int retries = srv->retries;

akey.len = rkey->len;
akey.bytes = rkey->s;
abody.len = body->len;
abody.bytes = body->s;
memset(&props, 0, sizeof props);

if (ctype) {
props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type.len = ctype->len;
props.content_type.bytes = ctype->s;
}
if (!(srv->flags & RMQF_NOPER)) {
props.delivery_mode = 2;
props._flags |= AMQP_BASIC_DELIVERY_MODE_FLAG;
}

do {
if (rmq_reconnect(srv) < 0) {
LM_ERR("[%.*s] cannot send RabbitMQ message\n",
srv->cid.len, srv->cid.s);
return -1;
}

ret = amqp_basic_publish(srv->conn, 1, srv->exchange, akey, \
(srv->flags & RMQF_MAND), (srv->flags & RMQF_IMM),
&props, abody);
ret = amqp_check_status(srv, ret, &retries);
} while (ret > 0);

return ret;
}

0 comments on commit 3096757

Please sign in to comment.