Skip to content

Commit

Permalink
event_rabbitmq: add timeout support
Browse files Browse the repository at this point in the history
Add RPC timeout support for any command sent to the rabbitmq server
  • Loading branch information
razvancrainea authored and bogdan-iancu committed Apr 18, 2024
1 parent 273e604 commit b530aa6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 0 deletions.
27 changes: 27 additions & 0 deletions modules/event_rabbitmq/doc/event_rabbitmq_admin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,33 @@ modparam("event_rabbitmq", "use_tls", 1)
</example>
</section>

<section id="param_timeout" xreflabel="timeout">
<title><varname>timeout</varname> (integer)</title>
<para>
Indicates the timeout (in milliseconds) of any command (i.e. publish)
sent to the RabbitMQ server.
</para>
<para>
<emphasis>NOTE</emphasis> that this parameter is available only starting with
RabbitMQ library version <emphasis>0.9.0</emphasis>; setting it when using an
earlier version will have no effect, and the publish command will run in
blocking mode.
</para>
<para>
<emphasis>
Default value is <emphasis role='bold'>0</emphasis> (no timeout - blocking mode)
</emphasis>
</para>
<example>
<title>Set the <varname>timeout</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("event_rabbitmq", "timeout", 1000) # timeout after 1s
...
</programlisting>
</example>
</section>

</section>

<section id="exported_functions" xreflabel="exported_functions">
Expand Down
17 changes: 17 additions & 0 deletions modules/event_rabbitmq/event_rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ static void destroy(void);
*/
static unsigned int heartbeat = 0;
static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT;
static int rmq_timeout = 0;
struct timeval conn_timeout_tv;
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
struct timeval rpc_timeout_tv;
#endif
int use_tls;

struct tls_mgm_binds tls_api;
Expand All @@ -70,6 +74,7 @@ static const proc_export_t procs[] = {
static const param_export_t mod_params[] = {
{"heartbeat", INT_PARAM, &heartbeat},
{"connect_timeout", INT_PARAM, &rmq_connect_timeout},
{"timeout", INT_PARAM, &rmq_timeout},
{"use_tls", INT_PARAM, &use_tls},
{0,0,0}
};
Expand Down Expand Up @@ -160,6 +165,18 @@ static int mod_init(void)
conn_timeout_tv.tv_sec = rmq_connect_timeout/1000;
conn_timeout_tv.tv_usec = (rmq_connect_timeout%1000)*1000;

#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
if (rmq_timeout < 0) {
LM_WARN("invalid value for 'timeout' %d; fallback to blocking mode\n", rmq_timeout);
rmq_timeout = 0;
}
rpc_timeout_tv.tv_sec = rmq_timeout/1000;
rpc_timeout_tv.tv_usec = (rmq_timeout%1000)*1000;
#else
if (rmq_timeout != 0)
LM_WARN("setting the timeout without support for it; fallback to blocking mode\n");
#endif

if (use_tls) {
#ifndef AMQP_VERSION_v04
LM_ERR("TLS not supported for librabbitmq version lower than 0.4.0\n");
Expand Down
5 changes: 5 additions & 0 deletions modules/event_rabbitmq/rabbitmq_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ static int rmq_reconnect(evi_reply_sock *sock)
LM_ERR("cannot open AMQP socket: %d\n", socket);
goto destroy_rmqp;
}
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
if (rpc_timeout_tv.tv_sec > 0 &&
amqp_set_rpc_timeout(rmqp->conn, &rpc_timeout_tv) < 0)
LM_ERR("setting RPC timeout - going blocking\n");
#endif
#else
socket = amqp_open_socket_noblock(sock->address.s, sock->port,
&conn_timeout_tv);
Expand Down
3 changes: 3 additions & 0 deletions modules/event_rabbitmq/rabbitmq_send.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ void rmq_free_param(rmq_params_t *rmqp);
void rmq_destroy(evi_reply_sock *sock);

extern struct timeval conn_timeout_tv;
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
extern struct timeval rpc_timeout_tv;
#endif
extern str rmq_static_holder;

#endif

0 comments on commit b530aa6

Please sign in to comment.