Skip to content

Commit

Permalink
rabbitmq: add publish command
Browse files Browse the repository at this point in the history
Also allow one to add extra headers in the AMQP properties
  • Loading branch information
razvancrainea committed Feb 8, 2017
1 parent 6bc8b67 commit 87b0d4e
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 16 deletions.
87 changes: 79 additions & 8 deletions modules/rabbitmq/rabbitmq.c
Expand Up @@ -37,8 +37,9 @@ static int mod_init(void);
static int child_init(int);
static void mod_destroy(void);

static int rmq_publish(struct sip_msg *msg, char *cid);
static int fixup_rmq(void **param, int param_no);
static int rmq_publish(struct sip_msg *msg, char *siv, char *rkey, char *body,
char *ctype, char *hnames, char *hvals);

static param_export_t params[]={
{ "server_id", STR_PARAM|USE_FUNC_PARAM,
Expand All @@ -58,7 +59,11 @@ static dep_export_t deps = {

/* exported commands */
static cmd_export_t cmds[] = {
{"rabbitmq_publish",(cmd_function)rmq_publish, 1,
{"rabbitmq_publish",(cmd_function)rmq_publish, 3,
fixup_rmq, 0, ALL_ROUTES },
{"rabbitmq_publish",(cmd_function)rmq_publish, 4,
fixup_rmq, 0, ALL_ROUTES },
{"rabbitmq_publish",(cmd_function)rmq_publish, 6,
fixup_rmq, 0, ALL_ROUTES },
{0, 0, 0, 0, 0, 0}
};
Expand Down Expand Up @@ -114,27 +119,93 @@ static void mod_destroy(void)
*/
static int fixup_rmq(void **param, int param_no)
{
str name;
pv_spec_t *e;

if (param_no == 1)
return fixup_rmq_server(param);
/* TODO: check if this is needed */
if (param_no < 2)
return fixup_sgp(param);
if (param_no < 5)
return fixup_spve(param);
if (param_no < 7) {
name.s = (char *)*param;
name.len = strlen(name.s);
e = pkg_malloc(sizeof *e);
if (!e) {
LM_ERR("out of mem!\n");
return E_OUT_OF_MEM;
}
if (pv_parse_spec(&name, e) < 0) {
LM_ERR("invalid spec %s\n", name.s);
return E_SCRIPT;
}
if (e->type != PVT_AVP) {
LM_ERR("invalid pvar type %s - only AVPs are allowed!\n", name.s);
return E_SCRIPT;
}
*param = (void *)e;
return 0;
}
LM_ERR("Unsupported parameter %d\n", param_no);
return E_CFG;
}

/*
* function that simply prints the parameters passed
*/
static int rmq_publish(struct sip_msg *msg, char *cid)
static int rmq_publish(struct sip_msg *msg, char *sid, char *rkey, char *body,
char *ctype, char *hnames, char *hvals)
{
struct rmq_server *srv;
str srkey, sbody, sctype;
int aname, avals;
unsigned short type;

srv = rmq_resolve_server(msg, cid);
srv = rmq_resolve_server(msg, sid);
if (!srv) {
LM_ERR("cannot find a RabbitMQ server connection\n");
return -1;
}
return -1;
}

if (fixup_get_svalue(msg, (gparam_p)rkey, &srkey) < 0) {
LM_ERR("cannot get routing key!\n");
return -1;
}

if (fixup_get_svalue(msg, (gparam_p)body, &sbody) < 0) {
LM_ERR("cannot get body value!\n");
return -1;
}

if (ctype && fixup_get_svalue(msg, (gparam_p)ctype, &sctype) < 0) {
LM_ERR("cannot get content-type value\n");
return -1;
}

if (hnames && !hvals) {
LM_ERR("header names without values!\n");
return -1;
}
if (!hnames && hvals) {
LM_ERR("header values without names!\n");
return -1;
}

if (hnames &&
pv_get_avp_name(msg, &((pv_spec_p)hnames)->pvp, &aname, &type) < 0) {
LM_ERR("cannot resolve names AVP\n");
return -1;
}

if (hvals &&
pv_get_avp_name(msg, &((pv_spec_p)hvals)->pvp, &avals, &type) < 0) {
LM_ERR("cannot resolve values AVP\n");
return -1;
}

/* resolve the AVP */
return rmq_send(srv, &srkey, &sbody,
(ctype ? &sctype : NULL),
(hnames ? &aname : NULL),
(hvals ? &avals : NULL));
}
74 changes: 66 additions & 8 deletions modules/rabbitmq/rmq_servers.c
Expand Up @@ -106,31 +106,31 @@ static int rmq_error(char const *context, amqp_rpc_reply_t x)
return 0;

case AMQP_RESPONSE_NONE:
LM_ERR("%s: missing RPC reply type!", context);
LM_ERR("%s: missing RPC reply type!\n", context);
break;

case AMQP_RESPONSE_LIBRARY_EXCEPTION:
LM_ERR("%s: %s\n", context, "(end-of-stream)");
LM_ERR("%s: (end-of-stream)\n", context);
break;

case AMQP_RESPONSE_SERVER_EXCEPTION:
switch (x.reply.id) {
case AMQP_CONNECTION_CLOSE_METHOD:
mconn = (amqp_connection_close_t *)x.reply.decoded;
LM_ERR("%s: server connection error %d, message: %.*s",
LM_ERR("%s: server connection error %d, message: %.*s\n",
context, mconn->reply_code,
(int)mconn->reply_text.len,
(char *)mconn->reply_text.bytes);
break;
case AMQP_CHANNEL_CLOSE_METHOD:
mchan = (amqp_channel_close_t *)x.reply.decoded;
LM_ERR("%s: server channel error %d, message: %.*s",
LM_ERR("%s: server channel error %d, message: %.*s\n",
context, mchan->reply_code,
(int)mchan->reply_text.len,
(char *)mchan->reply_text.bytes);
break;
default:
LM_ERR("%s: unknown server error, method id 0x%08X",
LM_ERR("%s: unknown server error, method id 0x%08X\n",
context, x.reply.id);
break;
}
Expand Down Expand Up @@ -254,6 +254,7 @@ int rmq_reconnect(struct rmq_server *srv)
if (rmq_error("Opening channel", amqp_get_rpc_reply(srv->conn)))
goto clean_rmq_server;
LM_DBG("[%.*s] successfully connected!\n", srv->cid.len, srv->cid.s);
srv->state = RMQS_ON;
case RMQS_ON:
return 0;
default:
Expand Down Expand Up @@ -638,20 +639,77 @@ static inline int amqp_check_status(struct rmq_server *srv, int r, int* retry)
#endif
}

int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype)
#define RMQ_ALLOC_STEP 2

int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype,
int *names, int *values)
{
int ret;
int nr;
int_str v;
int ret = -1;
amqp_bytes_t akey;
amqp_bytes_t abody;
amqp_basic_properties_t props;
int retries = srv->retries;
static int htable_allocated = 0;
static amqp_table_entry_t *htable = NULL;
struct usr_avp *aname = NULL, *aval = NULL;
amqp_table_entry_t *htmp = NULL;

akey.len = rkey->len;
akey.bytes = rkey->s;
abody.len = body->len;
abody.bytes = body->s;
memset(&props, 0, sizeof props);

/* populates props based on the names and values */
if (names && values) {
/* count the number of avps */
nr = 0;
for (;;) {
aname = search_first_avp(0, *names, &v, aname);
if (!aname)
break;
if (nr >= htable_allocated) {
htmp = pkg_realloc(htable, (htable_allocated + RMQ_ALLOC_STEP) *
sizeof(amqp_table_entry_t));
if (!htmp) {
LM_ERR("out of pkg memory for headers!\n");
return -1;
}
htable_allocated += RMQ_ALLOC_STEP;
htable = htmp;
}
if (aname->flags & AVP_VAL_STR) {
htable[nr].key.len = v.s.len;
htable[nr].key.bytes = v.s.s;
} else {
htable[nr].key.bytes = int2str(v.n, (int *)&htable[nr].key.len);
}
aval = search_first_avp(0, *values, &v, aval);
if (!aval) {
LM_ERR("names and values number mismatch!\n");
break;
}
if (aval->flags & AVP_VAL_STR) {
htable[nr].value.kind = AMQP_FIELD_KIND_UTF8;
htable[nr].value.value.bytes.bytes = v.s.s;
htable[nr].value.value.bytes.len = v.s.len;
} else {
htable[nr].value.kind = AMQP_FIELD_KIND_I32;
htable[nr].value.value.i32 = v.n;
}
LM_DBG("added key no. %d %.*s type %s\n", nr + 1,
(int)htable[nr].key.len, (char *)htable[nr].key.bytes,
(htable[nr].value.kind == AMQP_FIELD_KIND_UTF8 ? "string":"int"));
nr++;
}
LM_DBG("doing a rabbitmq query with %d headers\n", nr);
props.headers.entries = htable;
props.headers.num_entries = nr;
props._flags |= AMQP_BASIC_HEADERS_FLAG;
}

if (ctype) {
props._flags |= AMQP_BASIC_CONTENT_TYPE_FLAG;
props.content_type.len = ctype->len;
Expand All @@ -666,7 +724,7 @@ int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype)
if (rmq_reconnect(srv) < 0) {
LM_ERR("[%.*s] cannot send RabbitMQ message\n",
srv->cid.len, srv->cid.s);
return -1;
return ret;
}

ret = amqp_basic_publish(srv->conn, 1, srv->exchange, akey, \
Expand Down
3 changes: 3 additions & 0 deletions modules/rabbitmq/rmq_servers.h
Expand Up @@ -84,4 +84,7 @@ struct rmq_server *rmq_get_server(str *cid);
struct rmq_server *rmq_resolve_server(struct sip_msg *msg, char *param);
void rmq_connect_servers(void);

int rmq_send(struct rmq_server *srv, str *rkey, str *body, str *ctype,
int *names, int *values);

#endif /* _RMQ_SERVERS_H_ */

0 comments on commit 87b0d4e

Please sign in to comment.