Skip to content

Commit

Permalink
rabbitmq: add parameters to server_id
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed Jan 27, 2017
1 parent 3e21f92 commit 5334434
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 46 deletions.
204 changes: 164 additions & 40 deletions modules/rabbitmq/rmq_servers.c
Expand Up @@ -119,26 +119,32 @@ struct rmq_server *rmq_resolve_server(struct sip_msg *msg, char *param)
return rmq_get_server(&cid);
}

static void rmq_clean_server(struct rmq_server *srv)
static void rmq_close_server(struct rmq_server *srv)
{
switch (srv->state) {
case RMQS_ON:
case RMQS_CONN:
rmq_error("closing channel",
amqp_channel_close(srv->conn, 1, AMQP_REPLY_SUCCESS));
case RMQS_INIT:
rmq_error("closing connection",
amqp_connection_close(srv->conn, AMQP_REPLY_SUCCESS));
if (amqp_destroy_connection(srv->conn) < 0)
LM_ERR("cannot destroy connection\n");
case RMQS_NONE:
case RMQS_OFF:
break;
default:
LM_WARN("Unknown rmq server state %d\n", srv->state);
}
srv->state = RMQS_OFF;
}

#if 0
static void rmq_destroy_server(struct rmq_server *srv)
{
rmq_clean_server(srv);
rmq_close_server(srv);
if (srv->exchange.bytes)
amqp_bytes_free(srv->exchange);
pkg_free(srv);
}
#endif
Expand All @@ -154,7 +160,7 @@ int rmq_reconnect(struct rmq_server *srv)
int socket;

switch (srv->state) {
case RMQS_NONE:
case RMQS_OFF:
srv->conn = amqp_new_connection();
if (!srv) {
LM_ERR("cannot create amqp connection!\n");
Expand Down Expand Up @@ -184,7 +190,7 @@ int rmq_reconnect(struct rmq_server *srv)
if (rmq_error("Logging in", amqp_login(
srv->conn,
srv->uri.vhost,
srv->max_channels,
0,
srv->max_frames,
srv->heartbeat,
AMQP_SASL_METHOD_PLAIN,
Expand All @@ -194,16 +200,19 @@ int rmq_reconnect(struct rmq_server *srv)
/* all good - return success */
srv->state = RMQS_CONN;
case RMQS_CONN:
/* don't use more than 1 channel */
amqp_channel_open(srv->conn, 1);
if (rmq_error("Opening channel", amqp_get_rpc_reply(srv->conn)))
goto clean_rmq_server;
LM_DBG("[%.*s] successfully connected!\n", srv->cid.len, srv->cid.s);
case RMQS_ON:
return 0;
default:
LM_WARN("Unknown rmq server state %d\n", srv->state);
return -1;
}
clean_rmq_server:
rmq_clean_server(srv);
rmq_close_server(srv);
return -2;
clean_rmq_conn:
if (amqp_destroy_connection(srv->conn) < 0)
Expand All @@ -219,14 +228,17 @@ int rmq_reconnect(struct rmq_server *srv)
int rmq_server_add(modparam_t type, void * val)
{
struct rmq_server *srv;
str s;
str cid;
str param, s, cid;
str suri = {0, 0};
char uri_pending = 0;
char *uri;
int max_channels = RMQ_DEFAULT_MAX_CHANNELS;
int max_frames = RMQ_DEFAULT_MAX_FRAMES;
unsigned flags;
int retries;
int max_frames = RMQ_DEFAULT_FRAMES;
int heartbeat = RMQ_DEFAULT_HEARTBEAT;
str exchange = {0, 0};
enum rmq_parse_param { RMQP_NONE, RMQP_URI, RMQP_FRAME, RMQP_HBEAT, RMQP_IMM,
RMQP_MAND, RMQP_EXCH, RMQP_RETRY, RMQP_NOPER } state;

if (type != STR_PARAM) {
LM_ERR("invalid parameter type %d\n", type);
Expand Down Expand Up @@ -258,79 +270,192 @@ int rmq_server_add(modparam_t type, void * val)
"names for different connections!\n", cid.len, cid.s);
return -1;
}
#define IF_IS_PARAM(_p, _s, _l) \
do { \
if (s.len >= (sizeof(_p) - 1) && strncasecmp(s.s, (_p), sizeof(_p) - 1) == 0) { \
LM_DBG("[%.*s] found parameter %s\n", cid.len, cid.s, (_p)); \
s.s += (sizeof(_p) - 1); \
s.len -= (sizeof(_p) - 1); \
state = _s; \
goto _l; \
} \
} while (0)

/* server not found - parse this one */
for (s.s++, s.len--; s.len > 0; s.s++, s.len--) {
if (IS_WS(*s.s))
continue;
if (s.len > 4 && strncasecmp(s.s, "uri", 3) == 0) {
/* skip spaces before = */
for (s.len -= 3, s.s += 3; s.len > 0; s.s++, s.len--)
if (!IS_WS(*s.s))
break;
if (s.len <= 0 || *s.s != '=') {
LM_ERR("cannot find uri equal: %.*s\n", s.len, s.s);
return -1;
}
s.s++;
s.len--;
param = s;
state = RMQP_NONE;
IF_IS_PARAM("uri", RMQP_URI, value);
IF_IS_PARAM("frames", RMQP_FRAME, value);
IF_IS_PARAM("retries", RMQP_RETRY, value);
IF_IS_PARAM("exchange", RMQP_EXCH, value);
IF_IS_PARAM("heartbeat", RMQP_HBEAT, value);
IF_IS_PARAM("immediate", RMQP_IMM, no_value);
IF_IS_PARAM("mandatory", RMQP_MAND, no_value);
IF_IS_PARAM("non-persistent", RMQP_NOPER, no_value);

/* there is no known parameter here */
goto no_value;

#undef IF_IS_PARAM

value:
/* found a valid parameter - if uri has started, it should be ended */
if (uri_pending) {
suri.len -= param.len;
uri_pending = 0;
}
/* skip spaces before = */
for (; s.len > 0; s.s++, s.len--)
if (!IS_WS(*s.s))
break;
if (s.len <= 0 || *s.s != '=') {
LM_ERR("[%.*s] cannot find uri equal: %.*s\n", cid.len, cid.s,
param.len, param.s);
return -1;
}
s.s++;
s.len--;
param = s; /* start of the parameter */

no_value:
/* search for the next ';' */
for (; s.len > 0; s.s++, s.len--)
if (*s.s == ';')
break;
if (state != RMQP_URI)
param.len -= s.len;
trim_len(param.len, param.s, param);

/* here is the end of parameter - handle it */
switch (state) {
case RMQP_URI:
/* remember where the uri starts */
suri = s;
suri = param;
uri_pending = 1;
} else {
break;
case RMQP_NONE:
/* we eneded up in a place that has ';' - if we haven't found
* the end of the uri, this is also part of the uri. otherwise it
* is an error and we shall report it */
if (!uri_pending) {
LM_ERR("Unknown parameter: %.*s\n", s.len, s.s);
LM_ERR("[%.*s] Unknown parameter: %.*s\n", cid.len, cid.s,
param.len, param.s);
return -1;
}
break;
case RMQP_FRAME:
if (str2int(&param, (unsigned int *)&max_frames) < 0) {
LM_ERR("[%.*s] frames must be a number: %.*s\n",
cid.len, cid.s, param.len, param.s);
return -1;
}
if (max_frames < RMQ_MIN_FRAMES) {
LM_WARN("[%.*s] number of frames is %d - less than expected %d! "
"setting to expected\n", cid.len, cid.s, max_frames, RMQ_MIN_FRAMES);
max_frames = RMQ_MIN_FRAMES;
} else {
LM_DBG("[%.*s] setting frames to %d\n", cid.len, cid.s, max_frames);
}
break;
case RMQP_HBEAT:
if (str2int(&param, (unsigned int *)&heartbeat) < 0) {
LM_ERR("[%.*s] heartbeat must be the number of seconds, not %.*s\n",
cid.len, cid.s, param.len, param.s);
return -1;
}
if (heartbeat < 0) {
LM_WARN("[%.*s] invalid number of heartbeat seconds %d! Using default!\n",
cid.len, cid.s, heartbeat);
heartbeat = RMQ_DEFAULT_HEARTBEAT;
} else {
LM_DBG("[%.*s] setting heartbeat to %d\n", cid.len, cid.s, heartbeat);
}
break;
case RMQP_RETRY:
if (str2int(&param, (unsigned int *)&retries) < 0) {
LM_ERR("[%.*s] reties must be a number, not %.*s\n",
cid.len, cid.s, param.len, param.s);
return -1;
}
if (retries < 0) {
LM_WARN("[%.*s] invalid number of retries %d! Using default!\n",
cid.len, cid.s, retries);
retries = RMQ_DEFAULT_RETRIES;
} else {
LM_DBG("[%.*s] %d number of retries in case of error\n",
cid.len, cid.s, heartbeat);
}
break;
case RMQP_IMM:
flags &= RMQF_IMM;
break;
case RMQP_MAND:
flags &= RMQF_MAND;
break;
case RMQP_NOPER:
flags &= RMQF_NOPER;
break;
case RMQP_EXCH:
exchange = param;
LM_DBG("[%.*s] setting exchange '%.*s'\n", cid.len, cid.s,
exchange.len, exchange.s);
break;
}
/* search for the next ';' */
for (; s.len > 0; s.s++, s.len--)
if (*s.s == ';')
break;
}
/* if we don't have an uri, we forfeit */
if (!suri.s) {
LM_ERR("cannot find an uri!");
LM_ERR("[%.*s] cannot find an uri!", cid.len, cid.s);
return -1;
}
/* if still pending, remove the last ';' */
trim_spaces_lr(suri);
if (uri_pending && suri.s[suri.len - 1] == ';')
/* trim the last spaces and ';' of the uri */
trim_len(suri.len, suri.s, suri);
if (suri.s[suri.len - 1] == ';')
suri.len--;
trim_spaces_lr(suri);
trim_len(suri.len, suri.s, suri);

if ((srv = pkg_malloc(sizeof *srv + suri.len + 1)) == NULL) {
LM_ERR("cannot alloc memory for rabbitmq server\n");
return -1;
}
memset(srv, 0, sizeof *srv);
uri = ((char *)srv) + sizeof *srv;
memcpy(uri, suri.s, suri.len);
uri[suri.len] = 0;

if (amqp_parse_url(uri, &srv->uri) != 0) {
LM_ERR("cannot parse rabbitmq uri: %s\n", uri);
LM_ERR("[%.*s] cannot parse rabbitmq uri: %s\n", cid.len, cid.s, uri);
goto free;
}

if (srv->uri.ssl) {
LM_WARN("we currently do not support ssl connections!\n");
LM_WARN("[%.*s] we currently do not support ssl connections!\n", cid.len, cid.s);
goto free;
}

srv->state = RMQS_NONE;
if (exchange.len) {
srv->exchange = amqp_bytes_malloc(exchange.len);
if (!srv->exchange.bytes) {
LM_ERR("[%.*s] cannot allocate echange buffer!\n", cid.len, cid.s);
goto free;
}
memcpy(srv->exchange.bytes, exchange.s, exchange.len);
} else
srv->exchange = amqp_empty_bytes;

srv->state = RMQS_OFF;
srv->cid = cid;

srv->flags = flags;
srv->retries = retries;
srv->max_frames = max_frames;
srv->max_channels = max_channels;
srv->heartbeat = heartbeat;

list_add(&srv->list, &rmq_servers);
LM_DBG("new AMQP host=%s:%u with cid=%.*s\n",
srv->uri.host, srv->uri.port, srv->cid.len, srv->cid.s);
LM_DBG("[%.*s] new AMQP host=%s:%u\n", srv->cid.len, srv->cid.s,
srv->uri.host, srv->uri.port);

/* parse the url */
return 0;
Expand Down Expand Up @@ -394,5 +519,4 @@ void rmq_connect_servers(void)
LM_ERR("cannot connect to RabbitMQ server %s:%u\n",
srv->uri.host, srv->uri.port);
}

}
19 changes: 13 additions & 6 deletions modules/rabbitmq/rmq_servers.h
Expand Up @@ -26,22 +26,29 @@
#ifndef _RMQ_SERVERS_H_
#define _RMQ_SERVERS_H_

#define RMQ_DEFAULT_HEARTBEAT 0 /* 0 seconds - disabled */
#define RMQ_DEFAULT_MAX_CHANNELS 0 /* unlimited */
#define RMQ_DEFAULT_MAX_FRAMES 131072
#define RMQ_DEFAULT_HEARTBEAT 0 /* 0 seconds - disabled */
#define RMQ_DEFAULT_RETRIES 0 /* 0 times - do not retry */
#define RMQ_MIN_FRAMES 4096
#define RMQ_DEFAULT_FRAMES 131072

#include <amqp.h>

enum rmq_server_state { RMQS_NONE, RMQS_INIT, RMQS_CONN };
enum rmq_server_state { RMQS_OFF, RMQS_INIT, RMQS_CONN, RMQS_ON };

#define RMQF_IMM (1<<0) /* message MUST be delivered to a consumer immediately. */
#define RMQF_MAND (1<<1) /* message MUST be routed to a queue */
#define RMQF_NOPER (1<<2) /* message must not be persistent */

struct rmq_server {
enum rmq_server_state state;
str cid; /* connection id */
struct list_head list;

int max_frames;
int max_channels;
unsigned flags;
int retries;
int heartbeat;
int max_frames;
amqp_bytes_t exchange;
amqp_connection_state_t conn;
struct amqp_connection_info uri;
};
Expand Down

0 comments on commit 5334434

Please sign in to comment.