Skip to content

Commit

Permalink
rabbitmq modules: add TLS support
Browse files Browse the repository at this point in the history
Closes #2306
  • Loading branch information
rvlad-patrascu committed Dec 11, 2020
1 parent 113ecf4 commit 472ae54
Show file tree
Hide file tree
Showing 12 changed files with 686 additions and 52 deletions.
72 changes: 60 additions & 12 deletions modules/event_rabbitmq/doc/event_rabbitmq_admin.xml
Expand Up @@ -35,10 +35,10 @@
</para>
</section>

<section>
<section id="socket_syntax" xreflabel="RabbitMQ socket syntax">
<title>RabbitMQ socket syntax</title>
<para>
<para><emphasis>'rabbitmq:' [user[':'password] '@' host [':' port] '/' [exchange '?'] routing_key</emphasis></para>
<para><emphasis>'rabbitmq:' [user[':'password] '@' host [':' port] '/' [params '?'] routing_key</emphasis></para>
</para>
<para>
Meanings:
Expand All @@ -64,9 +64,21 @@
default value is '5672'.
</para> </listitem>
<listitem><para>
<emphasis>exchange</emphasis> - exchange of the RabbitMQ server. The
default value is ''.
</para> </listitem>
<emphasis>params</emphasis> - extra parameters specified as
<emphasis>key</emphasis>=<emphasis>value</emphasis>, separated by ';':
<itemizedlist>
<listitem><para>
<emphasis>exchange</emphasis> - exchange of the RabbitMQ server.
The default value is ''.
</para></listitem>
<listitem><para>
<emphasis>tls_domain</emphasis> - indicates which TLS domain (as
defined using the <emphasis>tls_mgm</emphasis> module) to use for
this connection. The <xref linkend="param_use_tls"/> module parameter
must be enabled.
</para></listitem>
</itemizedlist>
</para></listitem>
<listitem><para>
<emphasis>routing_key</emphasis> - this is the routing key
used by the AMQP protocol and it is used to identify the queue
Expand All @@ -84,13 +96,13 @@
<title>&osips; Modules</title>
<para>
The following modules must be loaded before this module:
<itemizedlist>
<listitem>
<para>
<emphasis>No dependencies on other &osips; modules</emphasis>.
</para>
</listitem>
</itemizedlist>
<itemizedlist>
<listitem>
<para>
<emphasis>tls_mgm</emphasis> if <xref linkend="param_use_tls"/> is enabled.
</para>
</listitem>
</itemizedlist>
</para>
</section>
<section>
Expand Down Expand Up @@ -174,6 +186,40 @@ modparam("event_rabbitmq", "connect_timeout", 1000)
</programlisting>
</example>
</section>

<section id="param_use_tls" xreflabel="use_tls">
<title><varname>use_tls</varname> (integer)</title>
<para>
Setting this parameter will allow you to use TLS for broker connections.
In order to enable TLS for a specific connection, you can use the
"tls_domain=<emphasis>dom_name</emphasis>" parameter in the configuration
specified through the <xref linkend="socket_syntax"/>.
</para>
<para>
When using this parameter, you must also ensure that
<emphasis>tls_mgm</emphasis> is loaded and properly configured. Refer to
the the module for additional info regarding TLS client domains.
</para>
<para>
<emphasis>
Default value is <emphasis role='bold'>0</emphasis> (not enabled)
</emphasis>
</para>
<example>
<title>Set the <varname>use_tls</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("tls_mgm", "client_domain", "rmq")
modparam("tls_mgm", "certificate", "[rmq]/etc/pki/tls/certs/rmq.pem")
modparam("tls_mgm", "private_key", "[rmq]/etc/pki/tls/private/rmq.key")
modparam("tls_mgm", "ca_list", "[rmq]/etc/pki/tls/certs/ca.pem")
...
modparam("event_rabbitmq", "use_tls", 1)
...
</programlisting>
</example>
</section>

</section>

<section id="exported_functions" xreflabel="exported_functions">
Expand Down Expand Up @@ -213,6 +259,8 @@ modparam("event_rabbitmq", "connect_timeout", 1000)
# same socket can be written as
rabbitmq:127.0.0.1/pike

# TLS broker connection
rabbitmq:127.0.0.1/tls_domain=rmq?pike
</programlisting>
</example>

Expand Down
93 changes: 83 additions & 10 deletions modules/event_rabbitmq/event_rabbitmq.c
Expand Up @@ -26,6 +26,7 @@
#include "../../sr_module.h"
#include "../../evi/evi_transport.h"
#include "../../ut.h"
#include "../../lib/csv.h"
#include "event_rabbitmq.h"
#include "rabbitmq_send.h"
#include <string.h>
Expand All @@ -46,6 +47,9 @@ static unsigned int heartbeat = 0;
extern unsigned rmq_sync_mode;
static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT;
struct timeval conn_timeout_tv;
int use_tls;

struct tls_mgm_binds tls_api;

/**
* exported functions
Expand All @@ -68,9 +72,29 @@ static param_export_t mod_params[] = {
{"heartbeat", INT_PARAM, &heartbeat},
{"sync_mode", INT_PARAM, &rmq_sync_mode},
{"connect_timeout", INT_PARAM, &rmq_connect_timeout},
{"use_tls", INT_PARAM, &use_tls},
{0,0,0}
};

static module_dependency_t *get_deps_use_tls(param_export_t *param)
{
if (*(int *)param->param_pointer == 0)
return NULL;

return alloc_module_dep(MOD_TYPE_DEFAULT, "tls_mgm", DEP_ABORT);
}

/* modules dependencies */
static dep_export_t deps = {
{ /* OpenSIPS module dependencies */
{ MOD_TYPE_NULL, NULL, 0 },
},
{ /* modparam dependencies */
{ "use_tls", get_deps_use_tls },
{ NULL, NULL },
},
};

/**
* module exports
*/
Expand All @@ -80,7 +104,7 @@ struct module_exports exports= {
MODULE_VERSION,
DEFAULT_DLFLAGS, /* dlopen flags */
0, /* load function */
NULL, /* OpenSIPS module dependencies */
&deps, /* OpenSIPS module dependencies */
0, /* exported functions */
0, /* exported async functions */
mod_params, /* exported parameters */
Expand Down Expand Up @@ -138,6 +162,20 @@ static int mod_init(void)
conn_timeout_tv.tv_sec = rmq_connect_timeout/1000;
conn_timeout_tv.tv_usec = (rmq_connect_timeout%1000)*1000;

if (use_tls) {
#ifndef AMQP_VERSION_v04
LM_ERR("TLS not supported for librabbitmq version lower than 0.4.0\n");
return -1;
#endif

if (load_tls_mgm_api(&tls_api) != 0) {
LM_ERR("failed to load tls_mgm API!\n");
return -1;
}

amqp_set_initialize_ssl_library(0);
}

return 0;
}

Expand Down Expand Up @@ -221,14 +259,16 @@ static evi_reply_sock* rmq_parse(str socket)
rmq_params_t *param;
unsigned int len, i;
const char* begin;
str s;
csv_record *p_list = NULL, *it;
str prev_token;

enum state {
ST_USER_HOST, /* Username or hostname */
ST_PASS_PORT, /* Password or port part */
ST_HOST, /* Hostname part */
ST_PORT, /* Port part */
ST_ROUTE_OR_EXPORT /* Routing or export key */
ST_ROUTE_OR_PARAMS /* Routing key or extra params */
} st;

if (!socket.len || !socket.s) {
Expand Down Expand Up @@ -273,7 +313,7 @@ static evi_reply_sock* rmq_parse(str socket)
if (dupl_string(&sock->address, begin, socket.s + i) < 0)
goto err;
sock->flags |= EVI_ADDRESS;
st = ST_ROUTE_OR_EXPORT;
st = ST_ROUTE_OR_PARAMS;
begin = socket.s + i + 1;
}
break;
Expand Down Expand Up @@ -305,7 +345,7 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
}
sock->flags |= EVI_PORT;
st = ST_ROUTE_OR_EXPORT;
st = ST_ROUTE_OR_PARAMS;
begin = socket.s + i + 1;
}
break;
Expand All @@ -325,7 +365,7 @@ static evi_reply_sock* rmq_parse(str socket)
goto err;
sock->flags |= EVI_ADDRESS;

st = ST_ROUTE_OR_EXPORT;
st = ST_ROUTE_OR_PARAMS;
begin = socket.s + i + 1;
}
break;
Expand All @@ -341,18 +381,42 @@ static evi_reply_sock* rmq_parse(str socket)
}
sock->flags |= EVI_PORT;

st = ST_ROUTE_OR_EXPORT;
st = ST_ROUTE_OR_PARAMS;
begin = socket.s + i + 1;
}
break;

case ST_ROUTE_OR_EXPORT:
case ST_ROUTE_OR_PARAMS:
switch(socket.s[i]) {
case '?':
s.s = (char*)begin;
s.len = socket.s + i - begin;

if (dupl_string(&param->exchange, begin, socket.s + i) < 0)
p_list = __parse_csv_record(&s, 0, ';');
if (!p_list) {
LM_ERR("bad extra parameters: %.*s\n", s.len, s.s);
goto err;
param->flags |= RMQ_PARAM_EKEY;
}
for (it = p_list; it; it = it->next)
if (it->s.len > RMQ_EXCHANGE_LEN &&
!memcmp(it->s.s, RMQ_EXCHANGE_S, RMQ_EXCHANGE_LEN)) {
if (dupl_string(&param->exchange, it->s.s+RMQ_EXCHANGE_LEN,
it->s.s + it->s.len) < 0)
goto err;
param->flags |= RMQ_PARAM_EKEY;
} else if (it->s.len > RMQ_TLS_DOM_LEN &&
!memcmp(it->s.s, RMQ_TLS_DOM_S, RMQ_TLS_DOM_LEN)) {
if (dupl_string(&param->tls_dom_name,
it->s.s+RMQ_TLS_DOM_LEN, it->s.s + it->s.len) < 0)
goto err;
param->tls_dom_name.len--;
param->flags |= RMQ_PARAM_TLS;
} else {
LM_WARN("unknown extra parameter: '%.*s'\n", it->s.len, it->s.s);
goto err;
}

free_csv_record(p_list);

if (dupl_string(&param->routing_key, socket.s + i + 1, socket.s + len) < 0)
goto err;
Expand All @@ -376,7 +440,10 @@ static evi_reply_sock* rmq_parse(str socket)

success:
if (!(sock->flags & EVI_PORT) || !sock->port) {
sock->port = RMQ_DEFAULT_PORT;
if (param->flags & RMQ_PARAM_TLS)
sock->port = RMQ_DEFAULT_TLS_PORT;
else
sock->port = RMQ_DEFAULT_PORT;
sock->flags |= EVI_PORT;
}
if (!(param->flags & RMQ_PARAM_USER) || !param->user.s) {
Expand All @@ -385,6 +452,11 @@ static evi_reply_sock* rmq_parse(str socket)
param->flags |= RMQ_PARAM_USER|RMQ_PARAM_PASS;
}

if ((param->flags & RMQ_PARAM_TLS) && !use_tls) {
LM_ERR("'use_tls' module parameter required for TLS support\n");
goto err;
}

param->heartbeat = heartbeat;
sock->params = param;
sock->flags |= EVI_PARAMS | RMQ_FLAG;
Expand All @@ -395,6 +467,7 @@ static evi_reply_sock* rmq_parse(str socket)
if (prev_token.s)
shm_free(prev_token.s);
rmq_free_param(param);
free_csv_record(p_list);
if (sock->address.s)
shm_free(sock->address.s);
shm_free(sock);
Expand Down
14 changes: 14 additions & 0 deletions modules/event_rabbitmq/event_rabbitmq.h
Expand Up @@ -32,8 +32,10 @@
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00040000
#define AMQP_VERSION_v04
#include <amqp_tcp_socket.h>
#include <amqp_ssl_socket.h>
#endif

#include "../tls_mgm/api.h"

/* transport protocols name */
#define RMQ_NAME "rabbitmq"
Expand All @@ -48,24 +50,36 @@
#define RMQ_DEFAULT_MAX 131072
#define RMQ_DEFAULT_VHOST "/"
#define RMQ_DEFAULT_PORT 5672
#define RMQ_DEFAULT_TLS_PORT 5671

#define RMQ_PARAM_RKEY (1 << 1)
#define RMQ_PARAM_CONN (1 << 2)
#define RMQ_PARAM_CHAN (1 << 3)
#define RMQ_PARAM_USER (1 << 4)
#define RMQ_PARAM_PASS (1 << 5)
#define RMQ_PARAM_EKEY (1 << 6)
#define RMQ_PARAM_TLS (1 << 7)

#define RMQ_EXCHANGE_S "exchange="
#define RMQ_EXCHANGE_LEN (sizeof(RMQ_EXCHANGE_S)-1)
#define RMQ_TLS_DOM_S "tls_domain="
#define RMQ_TLS_DOM_LEN (sizeof(RMQ_TLS_DOM_S)-1)

typedef struct _rmq_params {
str routing_key;
str exchange;
str user;
str pass;
str tls_dom_name;
struct tls_domain *tls_dom;
amqp_connection_state_t conn;
int channel;
int flags;
int heartbeat;
} rmq_params_t;

extern int use_tls;
extern struct tls_mgm_binds tls_api;

#endif

0 comments on commit 472ae54

Please sign in to comment.