diff --git a/modules/event_rabbitmq/README b/modules/event_rabbitmq/README index 6b4cb95f21..82dbb2de59 100644 --- a/modules/event_rabbitmq/README +++ b/modules/event_rabbitmq/README @@ -78,7 +78,7 @@ Chapter 1. Admin Guide 1.3. RabbitMQ socket syntax 'rabbitmq:' [user[':'password] '@' host [':' port] '/' - routing_key + [exchange '?'] routing_key Meanings: * 'rabbitmq:' - informs the Event Interface that the events @@ -91,6 +91,8 @@ Chapter 1. Admin Guide * host - host name of the RabbitMQ server. * port - port of the RabbitMQ server. The default value is '5672'. + * exchange - exchange of the RabbitMQ server. The default + value is ''. * routing_key - this is the routing key used by the AMQP protocol and it is used to identify the queue where the event should be sent. diff --git a/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml b/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml index 273c16b457..88463d03a2 100644 --- a/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml +++ b/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml @@ -60,7 +60,7 @@
RabbitMQ socket syntax - 'rabbitmq:' [user[':'password] '@' host [':' port] '/' routing_key + 'rabbitmq:' [user[':'password] '@' host [':' port] '/' [exchange '?'] routing_key Meanings: @@ -85,6 +85,10 @@ port - port of the RabbitMQ server. The default value is '5672'. + + exchange - exchange of the RabbitMQ server. The + default value is ''. + routing_key - this is the routing key used by the AMQP protocol and it is used to identify the queue diff --git a/modules/event_rabbitmq/event_rabbitmq.c b/modules/event_rabbitmq/event_rabbitmq.c index d71fd96988..ae6f8e6634 100644 --- a/modules/event_rabbitmq/event_rabbitmq.c +++ b/modules/event_rabbitmq/event_rabbitmq.c @@ -155,18 +155,18 @@ static int rmq_match(evi_reply_sock *sock1, evi_reply_sock *sock2) p1 = (rmq_params_t *)sock1->params; p2 = (rmq_params_t *)sock2->params; if (!p1 || !p2 || - !(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY) || - !(p1->flags & RMQ_PARAM_USER) || !(p2->flags & RMQ_PARAM_USER)) + !(p1->flags & RMQ_PARAM_RKEY) || !(p2->flags & RMQ_PARAM_RKEY)) return 0; if (sock1->port == sock2->port && sock1->address.len == sock2->address.len && p1->routing_key.len == p2->routing_key.len && - p1->user.len == p2->user.len && + p1->user.len == p2->user.len && p1->exchange.len == p2->exchange.len && (p1->user.s == p2->user.s || /* trying the static values */ - !memcmp(p1->user.s, p2->user.s, p1->user.len)) && + !memcmp(p1->user.s, p2->user.s, p1->user.len)) && !memcmp(sock1->address.s, sock2->address.s, sock1->address.len) && - !memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len)) { + !memcmp(p1->routing_key.s, p2->routing_key.s, p1->routing_key.len) && + !memcmp(p1->exchange.s, p2->exchange.s, p1->exchange.len)) { LM_DBG("socket matched: %s@%s:%hu/%s\n", p1->user.s, sock1->address.s, sock2->port, p1->routing_key.s); return 1; @@ -202,7 +202,7 @@ static inline int dupl_string(str* dst, const char* begin, const char* end) /* * This is the parsing function * The socket grammar should be: - * [user [':' password] '@'] ip [':' port] '/' routing_key + * [user [':' password] '@'] ip [':' port] '/' [ exchange ?] routing_key */ static evi_reply_sock* rmq_parse(str socket) { @@ -217,6 +217,7 @@ static evi_reply_sock* rmq_parse(str socket) ST_PASS_PORT, /* Password or port part */ ST_HOST, /* Hostname part */ ST_PORT, /* Port part */ + ST_ROUTE_OR_EXPORT /* Routing or export key */ } st; if (!socket.len || !socket.s) { @@ -261,11 +262,8 @@ static evi_reply_sock* rmq_parse(str socket) if (dupl_string(&sock->address, begin, socket.s + i) < 0) goto err; sock->flags |= EVI_ADDRESS; - if (dupl_string(¶m->routing_key, socket.s + i + 1, - socket.s + len) < 0) - goto err; - param->flags |= RMQ_PARAM_RKEY; - goto success; + st = ST_ROUTE_OR_EXPORT; + begin = socket.s + i + 1; } break; @@ -296,11 +294,8 @@ static evi_reply_sock* rmq_parse(str socket) goto err; } sock->flags |= EVI_PORT; - if (dupl_string(¶m->routing_key, socket.s + i + 1, - socket.s + len) < 0) - goto err; - param->flags |= RMQ_PARAM_RKEY; - goto success; + st = ST_ROUTE_OR_EXPORT; + begin = socket.s + i + 1; } break; @@ -319,11 +314,8 @@ static evi_reply_sock* rmq_parse(str socket) goto err; sock->flags |= EVI_ADDRESS; - if (dupl_string(¶m->routing_key, socket.s + i + 1, - socket.s + len) < 0) - goto err; - param->flags |= RMQ_PARAM_RKEY; - goto success; + st = ST_ROUTE_OR_EXPORT; + begin = socket.s + i + 1; } break; @@ -338,13 +330,34 @@ static evi_reply_sock* rmq_parse(str socket) } sock->flags |= EVI_PORT; - if (dupl_string(¶m->routing_key, socket.s + i + 1, - socket.s + len) < 0) + st = ST_ROUTE_OR_EXPORT; + begin = socket.s + i + 1; + } + break; + + case ST_ROUTE_OR_EXPORT: + switch(socket.s[i]) { + case '?': + + if (dupl_string(¶m->exchange, begin, socket.s + i) < 0) + goto err; + param->flags |= RMQ_PARAM_EKEY; + + if (dupl_string(¶m->routing_key, socket.s + i + 1, socket.s + len) < 0) + goto err; + param->flags |= RMQ_PARAM_RKEY; + + goto success; + } + if(i == len - 1){ + if (dupl_string(¶m->routing_key, begin, socket.s + len) < 0) goto err; + param->flags |= RMQ_PARAM_RKEY; goto success; } break; + } } LM_WARN("not implemented %.*s\n", socket.len, socket.s); @@ -417,8 +430,14 @@ static str rmq_print(evi_reply_sock *sock) if (sock->flags & EVI_ADDRESS) DO_PRINT(sock->address.s, sock->address.len - 1); + DO_PRINT("/", 1); /* needs to be changed if it can print a key without RMQ_PARAM_RKEY */ + + if (param->flags & RMQ_PARAM_EKEY) { + DO_PRINT(param->exchange.s, param->exchange.len - 1); + DO_PRINT("?", 1); + } + if (param->flags & RMQ_PARAM_RKEY) { - DO_PRINT("/", 1); DO_PRINT(param->routing_key.s, param->routing_key.len - 1); } end: diff --git a/modules/event_rabbitmq/event_rabbitmq.h b/modules/event_rabbitmq/event_rabbitmq.h index b396e52d6d..58baa76ab5 100644 --- a/modules/event_rabbitmq/event_rabbitmq.h +++ b/modules/event_rabbitmq/event_rabbitmq.h @@ -59,9 +59,11 @@ #define RMQ_PARAM_CHAN (1 << 3) #define RMQ_PARAM_USER (1 << 4) #define RMQ_PARAM_PASS (1 << 5) +#define RMQ_PARAM_EKEY (1 << 6) typedef struct _rmq_params { str routing_key; + str exchange; str user; str pass; amqp_connection_state_t conn; diff --git a/modules/event_rabbitmq/rabbitmq_send.c b/modules/event_rabbitmq/rabbitmq_send.c index 4ed2556c6f..704dfb3960 100644 --- a/modules/event_rabbitmq/rabbitmq_send.c +++ b/modules/event_rabbitmq/rabbitmq_send.c @@ -300,11 +300,13 @@ static int rmq_sendmsg(rmq_send_t *rmqs) { rmq_params_t * rmqp = (rmq_params_t *)rmqs->sock->params; int ret; - + /* all checks should be already done */ ret = amqp_basic_publish(rmqp->conn, rmqp->channel, - AMQP_EMPTY_BYTES, + rmqp->flags&RMQ_PARAM_EKEY? + amqp_cstring_bytes(rmqp->exchange.s) : + AMQP_EMPTY_BYTES , amqp_cstring_bytes(rmqp->routing_key.s), 0, 0,