Skip to content

Commit

Permalink
Add exchange support for rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
Ionut Ionita committed Jun 18, 2014
1 parent f43bde4 commit 73678aa
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 26 deletions.
67 changes: 43 additions & 24 deletions modules/event_rabbitmq/event_rabbitmq.c
Expand Up @@ -155,18 +155,18 @@ static int rmq_match(evi_reply_sock *sock1, evi_reply_sock *sock2)
p1 = (rmq_params_t *)sock1->params;
p2 = (rmq_params_t *)sock2->params;
if (!p1 || !p2 ||
!(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY) ||
!(p1->flags & RMQ_PARAM_USER) || !(p2->flags & RMQ_PARAM_USER))
!(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY))
return 0;

if (sock1->port == sock2->port &&
sock1->address.len == sock2->address.len &&
p1->routing_key.len == p2->routing_key.len &&
p1->user.len == p2->user.len &&
p1->user.len == p2->user.len && p1->exchange.len == p2->exchange.len &&
(p1->user.s == p2->user.s || /* trying the static values */
!memcmp(p1->user.s, p2->user.s, p1->user.len)) &&
!memcmp(p1->user.s, p2->user.s, p1->user.len)) &&
!memcmp(sock1->address.s, sock2->address.s, sock1->address.len) &&
!memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len)) {
!memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len) &&
!memcmp(p1->exchange.s, p2->exchange.s, p1->exchange.len)) {
LM_DBG("socket matched: %s@%s:%hu/%s\n",
p1->user.s, sock1->address.s, sock2->port, p1->routing_key.s);
return 1;
Expand Down Expand Up @@ -202,7 +202,7 @@ static inline int dupl_string(str* dst, const char* begin, const char* end)
/*
* This is the parsing function
* The socket grammar should be:
* [user [':' password] '@'] ip [':' port] '/' routing_key
* [user [':' password] '@'] ip [':' port] '/' [ exchange ?] routing_key
*/
static evi_reply_sock* rmq_parse(str socket)
{
Expand All @@ -217,6 +217,7 @@ static evi_reply_sock* rmq_parse(str socket)
ST_PASS_PORT, /* Password or port part */
ST_HOST, /* Hostname part */
ST_PORT, /* Port part */
ST_ROUTE_OR_EXPORT /* Routing or export key */
} st;

if (!socket.len || !socket.s) {
Expand Down Expand Up @@ -261,11 +262,8 @@ static evi_reply_sock* rmq_parse(str socket)
if (dupl_string(&sock->address, begin, socket.s + i) < 0)
goto err;
sock->flags |= EVI_ADDRESS;
if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand Down Expand Up @@ -296,11 +294,8 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
}
sock->flags |= EVI_PORT;
if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand All @@ -319,11 +314,8 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
sock->flags |= EVI_ADDRESS;

if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;
goto success;
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

Expand All @@ -338,13 +330,34 @@ static evi_reply_sock* rmq_parse(str socket)
}
sock->flags |= EVI_PORT;

if (dupl_string(&param->routing_key, socket.s + i + 1,
socket.s + len) < 0)
st = ST_ROUTE_OR_EXPORT;
begin = socket.s + i + 1;
}
break;

case ST_ROUTE_OR_EXPORT:
switch(socket.s[i]) {
case '?':

if (dupl_string(&param->exchange, begin, socket.s + i) < 0)
goto err;
param->flags |= RMQ_PARAM_EKEY;

if (dupl_string(&param->routing_key, socket.s + i + 1, socket.s + len) < 0)
goto err;
param->flags |= RMQ_PARAM_RKEY;

goto success;
}
if(i == len - 1){
if (dupl_string(&param->routing_key, begin, socket.s + len) < 0)
goto err;

param->flags |= RMQ_PARAM_RKEY;
goto success;
}
break;

}
}
LM_WARN("not implemented %.*s\n", socket.len, socket.s);
Expand Down Expand Up @@ -417,8 +430,14 @@ static str rmq_print(evi_reply_sock *sock)
if (sock->flags & EVI_ADDRESS)
DO_PRINT(sock->address.s, sock->address.len - 1);

DO_PRINT("/", 1); /* needs to be changed if it can print a key without RMQ_PARAM_RKEY */

if (param->flags & RMQ_PARAM_EKEY) {
DO_PRINT(param->exchange.s, param->exchange.len - 1);
DO_PRINT("?", 1);
}

if (param->flags & RMQ_PARAM_RKEY) {
DO_PRINT("/", 1);
DO_PRINT(param->routing_key.s, param->routing_key.len - 1);
}
end:
Expand Down
2 changes: 2 additions & 0 deletions modules/event_rabbitmq/event_rabbitmq.h
Expand Up @@ -59,9 +59,11 @@
#define RMQ_PARAM_CHAN (1 << 3)
#define RMQ_PARAM_USER (1 << 4)
#define RMQ_PARAM_PASS (1 << 5)
#define RMQ_PARAM_EKEY (1 << 6)

typedef struct _rmq_params {
str routing_key;
str exchange;
str user;
str pass;
amqp_connection_state_t conn;
Expand Down
6 changes: 4 additions & 2 deletions modules/event_rabbitmq/rabbitmq_send.c
Expand Up @@ -300,11 +300,13 @@ static int rmq_sendmsg(rmq_send_t *rmqs)
{
rmq_params_t * rmqp = (rmq_params_t *)rmqs->sock->params;
int ret;

/* all checks should be already done */
ret = amqp_basic_publish(rmqp->conn,
rmqp->channel,
AMQP_EMPTY_BYTES,
rmqp->flags&RMQ_PARAM_EKEY?
amqp_cstring_bytes(rmqp->exchange.s) :
AMQP_EMPTY_BYTES ,
amqp_cstring_bytes(rmqp->routing_key.s),
0,
0,
Expand Down

0 comments on commit 73678aa

Please sign in to comment.