Skip to content

Commit

Permalink
rabbitmq: add compatibility code for older versions
Browse files Browse the repository at this point in the history
Although this is highly inefficient, it is the most elegant way to cope
with the latest code.
  • Loading branch information
razvancrainea committed Jan 27, 2017
1 parent c55c4b0 commit 6b28c6c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 6 deletions.
58 changes: 53 additions & 5 deletions modules/rabbitmq/rmq_servers.c
Expand Up @@ -25,7 +25,6 @@

#include "../../mem/shm_mem.h"
#include "../../sr_module.h"
#include "../../db/db_id.h"
#include "../../lib/list.h"
#include "../../mod_fix.h"
#include "../../dprint.h"
Expand All @@ -35,8 +34,57 @@
#include <amqp_framing.h>

#if defined AMQP_VERSION && AMQP_VERSION >= 0x00040000
#define AMQP_VERSION_v04
#include <amqp_tcp_socket.h>
#define rmq_parse amqp_parse_url
#else
#include "../../db/db_id.h"
#warning "You are using an old, unsupported RabbitMQ library version - compile on your own risk!"
/* ugly hack to move ptr from id to rmq_uri */
#define PTR_MOVE(_from, _to) \
do { \
(_to) = (_from); \
(_from) = NULL; \
} while(0)
static inline int rmq_parse(char *url, rmq_uri *uri)
{
str surl;
struct db_id *id;

surl.s = url;
surl.len = strlen(url);

if ((id = new_db_id(&surl)) == NULL)
return -1;

if (strcmp(id->scheme, "amqps") == 0)
uri->ssl = 1;

/* there might me a pkg leak compared to the newer version, but parsing
* only happends at startup, so we should not worry about this now */
if (id->username)
PTR_MOVE(id->username, uri->user);
else
uri->user = "guest";
if (id->password)
PTR_MOVE(id->password, uri->password);
else
uri->password = "guest";
if (id->host)
PTR_MOVE(id->host, uri->host);
else
uri->host = "localhost";
if (id->database)
PTR_MOVE(id->database, uri->vhost);
else
uri->host = "/";
if (id->port)
uri->port = id->port;
else if (uri->ssl)
uri->port = 5671;
else
uri->port = 5672;
free_db_id(id);
return 0;
}
#endif

static LIST_HEAD(rmq_servers);
Expand Down Expand Up @@ -426,7 +474,7 @@ int rmq_server_add(modparam_t type, void * val)
memcpy(uri, suri.s, suri.len);
uri[suri.len] = 0;

if (amqp_parse_url(uri, &srv->uri) != 0) {
if (rmq_parse(uri, &srv->uri) != 0) {
LM_ERR("[%.*s] cannot parse rabbitmq uri: %s\n", cid.len, cid.s, uri);
goto free;
}
Expand All @@ -444,7 +492,7 @@ int rmq_server_add(modparam_t type, void * val)
}
memcpy(srv->exchange.bytes, exchange.s, exchange.len);
} else
srv->exchange = amqp_empty_bytes;
srv->exchange = RMQ_EMPTY;

srv->state = RMQS_OFF;
srv->cid = cid;
Expand Down
26 changes: 25 additions & 1 deletion modules/rabbitmq/rmq_servers.h
Expand Up @@ -33,6 +33,30 @@

#include <amqp.h>

/* AMQP_VERSION was only added in v0.4.0 - there is no way to check the
* version of the library before this, so we consider everything beyond v0.4.0
* as old and inneficient */
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00040000
#define AMQP_VERSION_v04
#include <amqp_tcp_socket.h>
#define rmq_uri struct amqp_connection_info
#define RMQ_EMPTY amqp_empty_bytes
#else
/* although struct amqp_connection_info was added in v0.2.0, there is no way
* to check against that version, so we assume it does not exist until v0.4.0
*/
typedef struct _rmq_uri {
char *user;
char *password;
char *host;
char *vhost;
int port;
int ssl;
} rmq_uri;
#define RMQ_EMPTY AMQP_EMPTY_BYTES
#endif


enum rmq_server_state { RMQS_OFF, RMQS_INIT, RMQS_CONN, RMQS_ON };

#define RMQF_IMM (1<<0) /* message MUST be delivered to a consumer immediately. */
Expand All @@ -44,13 +68,13 @@ struct rmq_server {
str cid; /* connection id */
struct list_head list;

rmq_uri uri;
unsigned flags;
int retries;
int heartbeat;
int max_frames;
amqp_bytes_t exchange;
amqp_connection_state_t conn;
struct amqp_connection_info uri;
};

int rmq_server_add(modparam_t type, void * val);
Expand Down

0 comments on commit 6b28c6c

Please sign in to comment.