Skip to content

Commit

Permalink
Add support for async status reporting for the event interface
Browse files Browse the repository at this point in the history
In order to properly report the status of a raise operation, transport
modules no longer have to implement a synchronous mode, where the triggering
process has to block and wait for the dedicated module process to return back
the status. As such, the "sync_mode" module parameter has been dropped from
event_rabbitmq, event_xmlrpc and event_stream modules.

At the moment, only the event_virtual module actually registers an async
status callback. The evi raise function used by all other modules always
returns success if the raise "job" has been "pushed" succesfully.
  • Loading branch information
rvlad-patrascu committed Feb 25, 2021
1 parent fac2247 commit 495380c
Show file tree
Hide file tree
Showing 21 changed files with 386 additions and 280 deletions.
3 changes: 2 additions & 1 deletion evi/event_interface.c
Expand Up @@ -140,6 +140,7 @@ int evi_raise_event(event_id_t id, evi_params_t* params)
int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params)
{
evi_subs_p subs, prev;
evi_async_ctx_t async_status = {NULL, NULL};
long now;
int flags, pflags = 0;
int ret = 0;
Expand Down Expand Up @@ -206,7 +207,7 @@ int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params
lock_release(events[id].lock);

ret += (subs->trans_mod->raise)(msg, &events[id].name,
subs->reply_sock, params);
subs->reply_sock, params, &async_status);

lock_get(events[id].lock);
subs->reply_sock->flags = flags;
Expand Down
17 changes: 15 additions & 2 deletions evi/evi_transport.h
Expand Up @@ -41,6 +41,7 @@
#define EVI_PARAMS (1 << 4)
#define EVI_EXPIRE (1 << 8) // indicates that the socket may expire
#define EVI_PENDING (1 << 9) // indicates that the socket is in use
#define EVI_ASYNC_STATUS (1<<10)

/* sockets */
typedef union {
Expand All @@ -59,9 +60,21 @@ typedef struct ev_reply_sock_ {
void *params;
} evi_reply_sock;

enum evi_status {
EVI_STATUS_FAIL = -1,
EVI_STATUS_SUCCESS
};

typedef void (*evi_status_cb)(void *param, enum evi_status status);

typedef struct evi_async_ctx {
evi_status_cb status_cb;
void *cb_param;
} evi_async_ctx_t;

/* event raise function */
typedef int (raise_f)(struct sip_msg *msg, str *ev_name,
evi_reply_sock *sock, evi_params_t * params);
typedef int (raise_f)(struct sip_msg *msg, str *ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx);
/* socket parse function */
typedef evi_reply_sock* (parse_f)(str);
/* tries to match two sockets */
Expand Down
6 changes: 3 additions & 3 deletions modules/event_datagram/event_datagram.c
Expand Up @@ -57,7 +57,7 @@ static int child_init(int);
static evi_reply_sock* datagram_parse_udp(str socket);
static evi_reply_sock* datagram_parse_unix(str socket);
static int datagram_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t * params);
evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx);
static int datagram_match(evi_reply_sock *sock1, evi_reply_sock *sock2);
static str datagram_print(evi_reply_sock *sock);

Expand Down Expand Up @@ -295,8 +295,8 @@ static str datagram_print(evi_reply_sock *sock)
}
#undef DO_PRINT

static int datagram_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t *params)
static int datagram_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx)
{
int ret;
str buf;
Expand Down
8 changes: 4 additions & 4 deletions modules/event_flatstore/event_flatstore.c
Expand Up @@ -49,8 +49,8 @@ static int flat_match(evi_reply_sock *sock1, evi_reply_sock *sock2);
static evi_reply_sock* flat_parse(str socket);
mi_response_t *mi_rotate(const mi_params_t *params,
struct mi_handler *async_hdl);
static int flat_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t * params);
static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx);

static int *opened_fds;
static int *rotate_version;
Expand Down Expand Up @@ -543,8 +543,8 @@ static void rotating(struct flat_file *file){
lock_release(global_lock);
}

static int flat_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t *params) {
static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx) {

int idx = 0, offset_buff = 0, len, required_length = 0, nwritten;
evi_param_p param;
Expand Down
19 changes: 0 additions & 19 deletions modules/event_rabbitmq/doc/event_rabbitmq_admin.xml
Expand Up @@ -144,25 +144,6 @@
...
modparam("event_rabbitmq", "heartbeat", 3)
...
</programlisting>
</example>
</section>
<section id="param_sync_mode" xreflabel="sync_mode">
<title><varname>sync_mode</varname> (integer)</title>
<para>
Specifies whether an event raise operates synchronous or asynchronous relative to the process where the raise is triggered.In synchronous mode the process waits for the status of the raise from the actual worker process.In asynchronous mode the process continues its operation without receiving any confirmation.
</para>
<para>
<emphasis>
Default value is <quote>0 (asynchronous)</quote>.
</emphasis>
</para>
<example>
<title>Set <varname>sync_mode</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("event_rabbitmq", "sync_mode", 1)
...
</programlisting>
</example>
</section>
Expand Down
13 changes: 6 additions & 7 deletions modules/event_rabbitmq/event_rabbitmq.c
Expand Up @@ -44,7 +44,6 @@ static void destroy(void);
* module parameters
*/
static unsigned int heartbeat = 0;
extern unsigned rmq_sync_mode;
static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT;
struct timeval conn_timeout_tv;
int use_tls;
Expand All @@ -55,8 +54,8 @@ struct tls_mgm_binds tls_api;
* exported functions
*/
static evi_reply_sock* rmq_parse(str socket);
static int rmq_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t * params);
static int rmq_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx);
static int rmq_match(evi_reply_sock *sock1, evi_reply_sock *sock2);
static void rmq_free(evi_reply_sock *sock);
static str rmq_print(evi_reply_sock *sock);
Expand All @@ -70,7 +69,6 @@ static proc_export_t procs[] = {
/* module parameters */
static param_export_t mod_params[] = {
{"heartbeat", INT_PARAM, &heartbeat},
{"sync_mode", INT_PARAM, &rmq_sync_mode},
{"connect_timeout", INT_PARAM, &rmq_connect_timeout},
{"use_tls", INT_PARAM, &use_tls},
{0,0,0}
Expand Down Expand Up @@ -459,7 +457,7 @@ static evi_reply_sock* rmq_parse(str socket)

param->heartbeat = heartbeat;
sock->params = param;
sock->flags |= EVI_PARAMS | RMQ_FLAG;
sock->flags |= EVI_PARAMS | RMQ_FLAG | EVI_ASYNC_STATUS;

return sock;
err:
Expand Down Expand Up @@ -529,8 +527,8 @@ static str rmq_print(evi_reply_sock *sock)
}
#undef DO_PRINT

static int rmq_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t * params)
static int rmq_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx)
{
rmq_send_t *rmqs;
str buf;
Expand Down Expand Up @@ -564,6 +562,7 @@ static int rmq_raise(struct sip_msg *msg, str* ev_name,
evi_free_payload(buf.s);

rmqs->sock = sock;
rmqs->async_ctx = *async_ctx;

if (rmq_send(rmqs) < 0) {
LM_ERR("cannot send message\n");
Expand Down
73 changes: 38 additions & 35 deletions modules/event_rabbitmq/rabbitmq_send.c
Expand Up @@ -34,16 +34,6 @@
#define RMQ_SIZE (sizeof(rmq_send_t *))
#define IS_ERR(_err) (errno == _err)

#ifdef HAVE_SCHED_YIELD
#include <sched.h>
#else
#include <unistd.h>
/** Fake sched_yield if no unistd.h include is available */
#define sched_yield() sleep(0)
#endif

unsigned rmq_sync_mode = 0;

/* used to communicate with the sending process */
static int rmq_pipe[2];

Expand Down Expand Up @@ -77,9 +67,6 @@ int rmq_send(rmq_send_t* rmqs)
{
int rc;
int retries = RMQ_SEND_RETRY;
long send_status;

rmqs->process_idx = process_no;

do {
rc = write(rmq_pipe[1], &rmqs, RMQ_SIZE);
Expand All @@ -88,20 +75,10 @@ int rmq_send(rmq_send_t* rmqs)
if (rc < 0) {
LM_ERR("unable to send rmq send struct to worker\n");
shm_free(rmqs);
return RMQ_SEND_FAIL;
return -1;
}
/* give a change to the writer :) */
sched_yield();

if (rmq_sync_mode) {
if (ipc_recv_sync_reply((void **)(long *)&send_status) < 0) {
LM_ERR("cannot receive send status\n");
send_status = RMQ_SEND_FAIL;
}

return (int)send_status;
} else
return RMQ_SEND_SUCCESS;
return 0;
}

static rmq_send_t * rmq_receive(void)
Expand Down Expand Up @@ -524,9 +501,37 @@ static int rmq_sendmsg(rmq_send_t *rmqs)
return rtrn;
}

void rmq_run_status_cb(int sender, void *param)
{
struct rmq_cb_ipc_param *cb_ipc_param =
(struct rmq_cb_ipc_param *)param;

cb_ipc_param->async_ctx.status_cb(cb_ipc_param->async_ctx.cb_param,
cb_ipc_param->status);

shm_free(cb_ipc_param);
}

static void rmq_dispatch_status_cb(evi_async_ctx_t *async_ctx,
enum evi_status status)
{
struct rmq_cb_ipc_param *cb_ipc_param;

cb_ipc_param = shm_malloc(sizeof *cb_ipc_param);
if (!cb_ipc_param) {
LM_ERR("oom!\n");
return;
}

cb_ipc_param->async_ctx = *async_ctx;
cb_ipc_param->status = status;

ipc_dispatch_rpc(rmq_run_status_cb, cb_ipc_param);
}

void rmq_process(int rank)
{
int send_status;
enum evi_status status;

/* init blocking reader */
rmq_init_reader();
Expand All @@ -548,23 +553,21 @@ void rmq_process(int rank)
/* check if we should reconnect */
if (rmq_reconnect(rmqs->sock) < 0) {
LM_ERR("cannot reconnect socket\n");
send_status = RMQ_SEND_FAIL;
goto send_status_reply;
if (rmqs->async_ctx.status_cb)
rmq_dispatch_status_cb(&rmqs->async_ctx, EVI_STATUS_FAIL);
goto end;
}

/* send msg */
if (rmq_sendmsg(rmqs)) {
LM_ERR("cannot send message\n");
send_status = RMQ_SEND_FAIL;
status = EVI_STATUS_FAIL;
} else {
send_status = RMQ_SEND_SUCCESS;
status = EVI_STATUS_SUCCESS;
}

send_status_reply:
if (rmq_sync_mode) {
if (ipc_send_sync_reply(rmqs->process_idx, (void *)(long)send_status) < 0)
LM_ERR("cannot send status back to requesting process\n");
}
if (rmqs->async_ctx.status_cb)
rmq_dispatch_status_cb(&rmqs->async_ctx, status);
end:
if (rmqs)
shm_free(rmqs);
Expand Down
9 changes: 6 additions & 3 deletions modules/event_rabbitmq/rabbitmq_send.h
Expand Up @@ -30,15 +30,18 @@
#include "event_rabbitmq.h"

#define RMQ_SEND_RETRY 3
#define RMQ_SEND_SUCCESS 0
#define RMQ_SEND_FAIL -1

typedef struct _rmq_send {
evi_reply_sock *sock;
int process_idx;
evi_async_ctx_t async_ctx;
char msg[0];
} rmq_send_t;

struct rmq_cb_ipc_param {
evi_async_ctx_t async_ctx;
enum evi_status status;
};

void rmq_process(int rank);
int rmq_create_pipe(void);
void rmq_destroy_pipe(void);
Expand Down
4 changes: 2 additions & 2 deletions modules/event_route/event_route.c
Expand Up @@ -46,7 +46,7 @@ static int child_init(int rank);
*/
static evi_reply_sock* scriptroute_parse(str socket);
static int scriptroute_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t * params);
evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx);
static int scriptroute_match(evi_reply_sock *sock1, evi_reply_sock *sock2);
static str scriptroute_print(evi_reply_sock *sock);

Expand Down Expand Up @@ -347,7 +347,7 @@ void route_run(struct script_route route, struct sip_msg* msg,
}

static int scriptroute_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t *params)
evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx)
{
route_send_t *buf = NULL;

Expand Down
8 changes: 4 additions & 4 deletions modules/event_routing/event_routing.c
Expand Up @@ -44,8 +44,8 @@ static int wait_for_event(struct sip_msg* msg, async_ctx *ctx,


/* EVI transport API */
static int ebr_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t *params);
static int ebr_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx);
static evi_reply_sock* ebr_parse(str socket);
static int ebr_match(evi_reply_sock *sock1, evi_reply_sock *sock2);
static str ebr_print(evi_reply_sock *sock);
Expand Down Expand Up @@ -471,8 +471,8 @@ static str ebr_print(evi_reply_sock *sock)
}


static int ebr_raise(struct sip_msg *msg, str* ev_name,
evi_reply_sock *sock, evi_params_t *params)
static int ebr_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock,
evi_params_t *params, evi_async_ctx_t *async_ctx)
{
if (!sock || !(sock->flags & EVI_PARAMS)) {
LM_ERR("no socket found\n");
Expand Down

0 comments on commit 495380c

Please sign in to comment.