Skip to content

Commit

Permalink
event_rabbitmq: add timeout support
Browse files Browse the repository at this point in the history
Add RPC timeout support for any command sent to the rabbitmq server
  • Loading branch information
razvancrainea committed Mar 28, 2024
1 parent a889ac9 commit 4b23a80
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 0 deletions.
27 changes: 27 additions & 0 deletions modules/rabbitmq/doc/rabbitmq_admin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,33 @@ modparam("rabbitmq", "use_tls", 1)

aram("rabbitmq", "connect_timeout", 1000)

</programlisting>
</example>
</section>

<section id="param_timeout" xreflabel="timeout">
<title><varname>timeout</varname> (integer)</title>
<para>
Indicates the timeout (in milliseconds) of any command (i.e. publish)
sent to the RabbitMQ server.
</para>
<para>
<emphasis>NOTE</emphasis> that this parameter is available only starting with
RabbitMQ library version <emphasis>0.9.0</emphasis>; setting it when using an
earlier version will have no effect, and the publish command will run in
blocking mode.
</para>
<para>
<emphasis>
Default value is <emphasis role='bold'>0</emphasis> (no timeout - blocking mode)
</emphasis>
</para>
<example>
<title>Set the <varname>timeout</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("rabbitmq", "timeout", 1000) # timeout after 1s
...
</programlisting>
</example>
</section>
Expand Down
17 changes: 17 additions & 0 deletions modules/rabbitmq/rabbitmq.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ int use_tls;
struct openssl_binds openssl_api;
struct tls_mgm_binds tls_api;
static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT;
static int rmq_timeout = 0;

struct timeval conn_timeout_tv;
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
struct timeval rpc_timeout_tv;
#endif

#if AMQP_VERSION < AMQP_VERSION_CODE(0, 10, 0, 0)
gen_lock_t *ssl_lock;
Expand All @@ -57,6 +61,7 @@ static const param_export_t params[]={
(void *)rmq_server_add},
{"use_tls", INT_PARAM, &use_tls},
{"connect_timeout", INT_PARAM, &rmq_connect_timeout},
{"timeout", INT_PARAM, &rmq_timeout},
{0,0,0}
};

Expand Down Expand Up @@ -165,6 +170,18 @@ static int mod_init(void)
conn_timeout_tv.tv_sec = rmq_connect_timeout/1000;
conn_timeout_tv.tv_usec = (rmq_connect_timeout%1000)*1000;

#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
if (rmq_timeout < 0) {
LM_WARN("invalid value for 'timeout' %d; fallback to blocking mode\n", rmq_timeout);
rmq_timeout = 0;
}
rpc_timeout_tv.tv_sec = rmq_timeout/1000;
rpc_timeout_tv.tv_usec = (rmq_timeout%1000)*1000;
#else
if (rmq_timeout != 0)
LM_WARN("setting the timeout without support for it; fallback to blocking mode\n");
#endif

return 0;
}

Expand Down
5 changes: 5 additions & 0 deletions modules/rabbitmq/rmq_servers.c
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,11 @@ int rmq_reconnect(struct rmq_server *srv)
LM_ERR("cannot open AMQP socket\n");
goto clean_rmq_conn;
}
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
if (rpc_timeout_tv.tv_sec > 0 &&
amqp_set_rpc_timeout(srv->conn, &rpc_timeout_tv) < 0)
LM_ERR("setting RPC timeout - going blocking\n");
#endif

#else
socket = amqp_open_socket_noblock(srv->uri.host, srv->uri.port,
Expand Down
3 changes: 3 additions & 0 deletions modules/rabbitmq/rmq_servers.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,8 @@ extern int use_tls;
extern struct openssl_binds openssl_api;
extern struct tls_mgm_binds tls_api;
extern struct timeval conn_timeout_tv;
#if defined AMQP_VERSION && AMQP_VERSION >= 0x00090000
extern struct timeval rpc_timeout_tv;
#endif

#endif /* _RMQ_SERVERS_H_ */

0 comments on commit 4b23a80

Please sign in to comment.