Skip to content

Commit

Permalink
Merge pull request #252 from ionutrazvanionita/master
Browse files Browse the repository at this point in the history
Add exchange support for event_rabbitmq
  • Loading branch information
razvancrainea committed Jun 18, 2014
2 parents 53231eb + e7886d5 commit 56a01fa
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 28 deletions.
4 changes: 3 additions & 1 deletion modules/event_rabbitmq/README
Expand Up @@ -78,7 +78,7 @@ Chapter 1. Admin Guide
1.3. RabbitMQ socket syntax

'rabbitmq:' [user[':'password] '@' host [':' port] '/'
routing_key
[exchange '?'] routing_key

Meanings:
* 'rabbitmq:' - informs the Event Interface that the events
Expand All @@ -91,6 +91,8 @@ Chapter 1. Admin Guide
* host - host name of the RabbitMQ server.
* port - port of the RabbitMQ server. The default value is
'5672'.
* exchange - exchange of the RabbitMQ server. The default
value is ''.
* routing_key - this is the routing key used by the AMQP
protocol and it is used to identify the queue where the
event should be sent.
Expand Down
6 changes: 5 additions & 1 deletion modules/event_rabbitmq/doc/event_rabbitmq_admin.xml
Expand Up @@ -60,7 +60,7 @@
<section>
<title>RabbitMQ socket syntax</title>
<para>
<para><emphasis>'rabbitmq:' [user[':'password] '@' host [':' port] '/' routing_key</emphasis></para>
<para><emphasis>'rabbitmq:' [user[':'password] '@' host [':' port] '/' [exchange '?'] routing_key</emphasis></para>
</para>
<para>
Meanings:
Expand All @@ -85,6 +85,10 @@
<emphasis>port</emphasis> - port of the RabbitMQ server. The
default value is '5672'.
</para> </listitem>
<listitem><para>
<emphasis>exchange</emphasis> - exchange of the RabbitMQ server. The
default value is ''.
</para> </listitem>
<listitem><para>
<emphasis>routing_key</emphasis> - this is the routing key
used by the AMQP protocol and it is used to identify the queue
Expand Down
67 changes: 43 additions & 24 deletions modules/event_rabbitmq/event_rabbitmq.c
Expand Up @@ -155,18 +155,18 @@ static int rmq_match(evi_reply_sock *sock1, evi_reply_sock *sock2)
p1 = (rmq_params_t *)sock1->params;
p2 = (rmq_params_t *)sock2->params;
if (!p1 || !p2 ||
!(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY) ||
!(p1->flags & RMQ_PARAM_USER) || !(p2->flags & RMQ_PARAM_USER))
!(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY))
return 0;

if (sock1->port == sock2->port &&
sock1->address.len == sock2->address.len &&
p1->routing_key.len == p2->routing_key.len &&
p1->user.len == p2->user.len &&
p1->user.len == p2->user.len && p1->exchange.len == p2->exchange.len &&
(p1->user.s == p2->user.s || /* trying the static values */
!memcmp(p1->user.s, p2->user.s, p1->user.len)) &&
!memcmp(p1->user.s, p2->user.s, p1->user.len)) &&
!memcmp(sock1->address.s, sock2->address.s, sock1->address.len) &&
!memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len)) {
!memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len) &&
!memcmp(p1->exchange.s, p2->exchange.s, p1->exchange.len)) {
LM_DBG("socket matched: %s@%s:%hu/%s\n",
p1->user.s, sock1->address.s, sock2->port, p1->routing_key.s);
return 1;
Expand Down Expand Up @@ -202,7 +202,7 @@ static inline int dupl_string(str* dst, const char* begin, const char* end)
/*
* This is the parsing function
* The socket grammar should be:
* [user [':' password] '@'] ip [':' port] '/' routing_key
* [user [':' password] '@'] ip [':' port] '/' [ exchange ?] routing_key
*/
static evi_reply_sock* rmq_parse(str socket)
{
Expand All @@ -217,6 +217,7 @@ static evi_reply_sock* rmq_parse(str socket)
ST_PASS_PORT, /* Password or port part */
ST_HOST, /* Hostname part */
ST_PORT, /* Port part */
ST_ROUTE_OR_EXPORT /* Routing or export key */
} st;

if (!socket.len || !socket.s) {
Expand Down Expand Up @@ -261,11 +262,8 @@ static evi_reply_sock* rmq_parse(str socket)
if (dupl_string(&sock->address, begin, socket.s + i) < 0)
goto err;
sock->flags |= EVI_ADDRESS;
if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand Down Expand Up @@ -296,11 +294,8 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
}
sock->flags |= EVI_PORT;
if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand All @@ -319,11 +314,8 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
sock->flags |= EVI_ADDRESS;

if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand All @@ -338,13 +330,34 @@ static evi_reply_sock* rmq_parse(str socket)
}
sock->flags |= EVI_PORT;

if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

case ST_ROUTE_OR_EXPORT:
switch(socket.s[i]) {
case '?':

if (dupl_string(&param->exchange, begin, socket.s + i) < 0)
goto err;
param->flags |= RMQ_PARAM_EKEY;

if (dupl_string(&param->routing_key, socket.s + i + 1, socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;

goto success;
}
if(i == len - 1){
if (dupl_string(&param->routing_key, begin, socket.s + len) < 0)
goto err;

param->flags |= RMQ_PARAM_RKEY;
goto success;
}
break;

}
}
LM_WARN("not implemented %.*s\n", socket.len, socket.s);
Expand Down Expand Up @@ -417,8 +430,14 @@ static str rmq_print(evi_reply_sock *sock)
if (sock->flags & EVI_ADDRESS)
DO_PRINT(sock->address.s, sock->address.len - 1);

DO_PRINT("/", 1); /* needs to be changed if it can print a key without RMQ_PARAM_RKEY */

if (param->flags & RMQ_PARAM_EKEY) {
DO_PRINT(param->exchange.s, param->exchange.len - 1);
DO_PRINT("?", 1);
}

if (param->flags & RMQ_PARAM_RKEY) {
DO_PRINT("/", 1);
DO_PRINT(param->routing_key.s, param->routing_key.len - 1);
}
end:
Expand Down
2 changes: 2 additions & 0 deletions modules/event_rabbitmq/event_rabbitmq.h
Expand Up @@ -59,9 +59,11 @@
#define RMQ_PARAM_CHAN (1 << 3)
#define RMQ_PARAM_USER (1 << 4)
#define RMQ_PARAM_PASS (1 << 5)
#define RMQ_PARAM_EKEY (1 << 6)

typedef struct _rmq_params {
str routing_key;
str exchange;
str user;
str pass;
amqp_connection_state_t conn;
Expand Down
6 changes: 4 additions & 2 deletions modules/event_rabbitmq/rabbitmq_send.c
Expand Up @@ -300,11 +300,13 @@ static int rmq_sendmsg(rmq_send_t *rmqs)
{
rmq_params_t * rmqp = (rmq_params_t *)rmqs->sock->params;
int ret;

/* all checks should be already done */
ret = amqp_basic_publish(rmqp->conn,
rmqp->channel,
AMQP_EMPTY_BYTES,
rmqp->flags&RMQ_PARAM_EKEY?
amqp_cstring_bytes(rmqp->exchange.s) :
AMQP_EMPTY_BYTES ,
amqp_cstring_bytes(rmqp->routing_key.s),
0,
0,
Expand Down

0 comments on commit 56a01fa

Please sign in to comment.