diff --git a/evi/event_interface.c b/evi/event_interface.c index ea9d4a89220..d9e596a679d 100644 --- a/evi/event_interface.c +++ b/evi/event_interface.c @@ -140,6 +140,7 @@ int evi_raise_event(event_id_t id, evi_params_t* params) int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params) { evi_subs_p subs, prev; + evi_async_ctx_t async_status = {NULL, NULL}; long now; int flags, pflags = 0; int ret = 0; @@ -206,7 +207,7 @@ int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params lock_release(events[id].lock); ret += (subs->trans_mod->raise)(msg, &events[id].name, - subs->reply_sock, params); + subs->reply_sock, params, &async_status); lock_get(events[id].lock); subs->reply_sock->flags = flags; diff --git a/evi/evi_transport.h b/evi/evi_transport.h index e62cd39501e..30546809369 100644 --- a/evi/evi_transport.h +++ b/evi/evi_transport.h @@ -41,6 +41,7 @@ #define EVI_PARAMS (1 << 4) #define EVI_EXPIRE (1 << 8) // indicates that the socket may expire #define EVI_PENDING (1 << 9) // indicates that the socket is in use +#define EVI_ASYNC_STATUS (1<<10) /* sockets */ typedef union { @@ -59,9 +60,21 @@ typedef struct ev_reply_sock_ { void *params; } evi_reply_sock; +enum evi_status { + EVI_STATUS_FAIL = -1, + EVI_STATUS_SUCCESS +}; + +typedef void (*evi_status_cb)(void *param, enum evi_status status); + +typedef struct evi_async_ctx { + evi_status_cb status_cb; + void *cb_param; +} evi_async_ctx_t; + /* event raise function */ -typedef int (raise_f)(struct sip_msg *msg, str *ev_name, - evi_reply_sock *sock, evi_params_t * params); +typedef int (raise_f)(struct sip_msg *msg, str *ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx); /* socket parse function */ typedef evi_reply_sock* (parse_f)(str); /* tries to match two sockets */ diff --git a/modules/event_datagram/event_datagram.c b/modules/event_datagram/event_datagram.c index d0a0d838584..028e9a2e6d5 100644 --- a/modules/event_datagram/event_datagram.c +++ b/modules/event_datagram/event_datagram.c @@ -57,7 +57,7 @@ static int child_init(int); static evi_reply_sock* datagram_parse_udp(str socket); static evi_reply_sock* datagram_parse_unix(str socket); static int datagram_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); + evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx); static int datagram_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static str datagram_print(evi_reply_sock *sock); @@ -295,8 +295,8 @@ static str datagram_print(evi_reply_sock *sock) } #undef DO_PRINT -static int datagram_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t *params) +static int datagram_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx) { int ret; str buf; diff --git a/modules/event_flatstore/event_flatstore.c b/modules/event_flatstore/event_flatstore.c index 1c4da99abeb..db5818e09d6 100644 --- a/modules/event_flatstore/event_flatstore.c +++ b/modules/event_flatstore/event_flatstore.c @@ -49,8 +49,8 @@ static int flat_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static evi_reply_sock* flat_parse(str socket); mi_response_t *mi_rotate(const mi_params_t *params, struct mi_handler *async_hdl); -static int flat_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); +static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx); static int *opened_fds; static int *rotate_version; @@ -543,8 +543,8 @@ static void rotating(struct flat_file *file){ lock_release(global_lock); } -static int flat_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t *params) { +static int flat_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx) { int idx = 0, offset_buff = 0, len, required_length = 0, nwritten; evi_param_p param; diff --git a/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml b/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml index 16416931ff8..72eabf09997 100644 --- a/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml +++ b/modules/event_rabbitmq/doc/event_rabbitmq_admin.xml @@ -144,25 +144,6 @@ ... modparam("event_rabbitmq", "heartbeat", 3) ... - - - -
- <varname>sync_mode</varname> (integer) - - Specifies whether an event raise operates synchronous or asynchronous relative to the process where the raise is triggered.In synchronous mode the process waits for the status of the raise from the actual worker process.In asynchronous mode the process continues its operation without receiving any confirmation. - - - - Default value is 0 (asynchronous). - - - - Set <varname>sync_mode</varname> parameter - -... -modparam("event_rabbitmq", "sync_mode", 1) -...
diff --git a/modules/event_rabbitmq/event_rabbitmq.c b/modules/event_rabbitmq/event_rabbitmq.c index 545e8a0adfc..1012b4b01f7 100644 --- a/modules/event_rabbitmq/event_rabbitmq.c +++ b/modules/event_rabbitmq/event_rabbitmq.c @@ -44,7 +44,6 @@ static void destroy(void); * module parameters */ static unsigned int heartbeat = 0; -extern unsigned rmq_sync_mode; static int rmq_connect_timeout = RMQ_DEFAULT_CONNECT_TIMEOUT; struct timeval conn_timeout_tv; int use_tls; @@ -55,8 +54,8 @@ struct tls_mgm_binds tls_api; * exported functions */ static evi_reply_sock* rmq_parse(str socket); -static int rmq_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); +static int rmq_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx); static int rmq_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static void rmq_free(evi_reply_sock *sock); static str rmq_print(evi_reply_sock *sock); @@ -70,7 +69,6 @@ static proc_export_t procs[] = { /* module parameters */ static param_export_t mod_params[] = { {"heartbeat", INT_PARAM, &heartbeat}, - {"sync_mode", INT_PARAM, &rmq_sync_mode}, {"connect_timeout", INT_PARAM, &rmq_connect_timeout}, {"use_tls", INT_PARAM, &use_tls}, {0,0,0} @@ -459,7 +457,7 @@ static evi_reply_sock* rmq_parse(str socket) param->heartbeat = heartbeat; sock->params = param; - sock->flags |= EVI_PARAMS | RMQ_FLAG; + sock->flags |= EVI_PARAMS | RMQ_FLAG | EVI_ASYNC_STATUS; return sock; err: @@ -529,8 +527,8 @@ static str rmq_print(evi_reply_sock *sock) } #undef DO_PRINT -static int rmq_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params) +static int rmq_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx) { rmq_send_t *rmqs; str buf; @@ -564,6 +562,7 @@ static int rmq_raise(struct sip_msg *msg, str* ev_name, evi_free_payload(buf.s); rmqs->sock = sock; + rmqs->async_ctx = *async_ctx; if (rmq_send(rmqs) < 0) { LM_ERR("cannot send message\n"); diff --git a/modules/event_rabbitmq/rabbitmq_send.c b/modules/event_rabbitmq/rabbitmq_send.c index a01e72ed01f..9fdb2619bea 100644 --- a/modules/event_rabbitmq/rabbitmq_send.c +++ b/modules/event_rabbitmq/rabbitmq_send.c @@ -34,16 +34,6 @@ #define RMQ_SIZE (sizeof(rmq_send_t *)) #define IS_ERR(_err) (errno == _err) -#ifdef HAVE_SCHED_YIELD -#include -#else -#include -/** Fake sched_yield if no unistd.h include is available */ - #define sched_yield() sleep(0) -#endif - -unsigned rmq_sync_mode = 0; - /* used to communicate with the sending process */ static int rmq_pipe[2]; @@ -77,9 +67,6 @@ int rmq_send(rmq_send_t* rmqs) { int rc; int retries = RMQ_SEND_RETRY; - long send_status; - - rmqs->process_idx = process_no; do { rc = write(rmq_pipe[1], &rmqs, RMQ_SIZE); @@ -88,20 +75,10 @@ int rmq_send(rmq_send_t* rmqs) if (rc < 0) { LM_ERR("unable to send rmq send struct to worker\n"); shm_free(rmqs); - return RMQ_SEND_FAIL; + return -1; } - /* give a change to the writer :) */ - sched_yield(); - if (rmq_sync_mode) { - if (ipc_recv_sync_reply((void **)(long *)&send_status) < 0) { - LM_ERR("cannot receive send status\n"); - send_status = RMQ_SEND_FAIL; - } - - return (int)send_status; - } else - return RMQ_SEND_SUCCESS; + return 0; } static rmq_send_t * rmq_receive(void) @@ -524,9 +501,37 @@ static int rmq_sendmsg(rmq_send_t *rmqs) return rtrn; } +void rmq_run_status_cb(int sender, void *param) +{ + struct rmq_cb_ipc_param *cb_ipc_param = + (struct rmq_cb_ipc_param *)param; + + cb_ipc_param->async_ctx.status_cb(cb_ipc_param->async_ctx.cb_param, + cb_ipc_param->status); + + shm_free(cb_ipc_param); +} + +static void rmq_dispatch_status_cb(evi_async_ctx_t *async_ctx, + enum evi_status status) +{ + struct rmq_cb_ipc_param *cb_ipc_param; + + cb_ipc_param = shm_malloc(sizeof *cb_ipc_param); + if (!cb_ipc_param) { + LM_ERR("oom!\n"); + return; + } + + cb_ipc_param->async_ctx = *async_ctx; + cb_ipc_param->status = status; + + ipc_dispatch_rpc(rmq_run_status_cb, cb_ipc_param); +} + void rmq_process(int rank) { - int send_status; + enum evi_status status; /* init blocking reader */ rmq_init_reader(); @@ -548,23 +553,21 @@ void rmq_process(int rank) /* check if we should reconnect */ if (rmq_reconnect(rmqs->sock) < 0) { LM_ERR("cannot reconnect socket\n"); - send_status = RMQ_SEND_FAIL; - goto send_status_reply; + if (rmqs->async_ctx.status_cb) + rmq_dispatch_status_cb(&rmqs->async_ctx, EVI_STATUS_FAIL); + goto end; } /* send msg */ if (rmq_sendmsg(rmqs)) { LM_ERR("cannot send message\n"); - send_status = RMQ_SEND_FAIL; + status = EVI_STATUS_FAIL; } else { - send_status = RMQ_SEND_SUCCESS; + status = EVI_STATUS_SUCCESS; } -send_status_reply: - if (rmq_sync_mode) { - if (ipc_send_sync_reply(rmqs->process_idx, (void *)(long)send_status) < 0) - LM_ERR("cannot send status back to requesting process\n"); - } + if (rmqs->async_ctx.status_cb) + rmq_dispatch_status_cb(&rmqs->async_ctx, status); end: if (rmqs) shm_free(rmqs); diff --git a/modules/event_rabbitmq/rabbitmq_send.h b/modules/event_rabbitmq/rabbitmq_send.h index bd707fb6683..dbe4e62f50b 100644 --- a/modules/event_rabbitmq/rabbitmq_send.h +++ b/modules/event_rabbitmq/rabbitmq_send.h @@ -30,15 +30,18 @@ #include "event_rabbitmq.h" #define RMQ_SEND_RETRY 3 -#define RMQ_SEND_SUCCESS 0 -#define RMQ_SEND_FAIL -1 typedef struct _rmq_send { evi_reply_sock *sock; - int process_idx; + evi_async_ctx_t async_ctx; char msg[0]; } rmq_send_t; +struct rmq_cb_ipc_param { + evi_async_ctx_t async_ctx; + enum evi_status status; +}; + void rmq_process(int rank); int rmq_create_pipe(void); void rmq_destroy_pipe(void); diff --git a/modules/event_route/event_route.c b/modules/event_route/event_route.c index 91e30073d72..7ccf78efe5e 100644 --- a/modules/event_route/event_route.c +++ b/modules/event_route/event_route.c @@ -46,7 +46,7 @@ static int child_init(int rank); */ static evi_reply_sock* scriptroute_parse(str socket); static int scriptroute_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); + evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx); static int scriptroute_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static str scriptroute_print(evi_reply_sock *sock); @@ -347,7 +347,7 @@ void route_run(struct script_route route, struct sip_msg* msg, } static int scriptroute_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t *params) + evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx) { route_send_t *buf = NULL; diff --git a/modules/event_routing/event_routing.c b/modules/event_routing/event_routing.c index fd5b3873ae8..4286e2f1b70 100644 --- a/modules/event_routing/event_routing.c +++ b/modules/event_routing/event_routing.c @@ -44,8 +44,8 @@ static int wait_for_event(struct sip_msg* msg, async_ctx *ctx, /* EVI transport API */ -static int ebr_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t *params); +static int ebr_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx); static evi_reply_sock* ebr_parse(str socket); static int ebr_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static str ebr_print(evi_reply_sock *sock); @@ -471,8 +471,8 @@ static str ebr_print(evi_reply_sock *sock) } -static int ebr_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t *params) +static int ebr_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx) { if (!sock || !(sock->flags & EVI_PARAMS)) { LM_ERR("no socket found\n"); diff --git a/modules/event_stream/doc/event_stream_admin.xml b/modules/event_stream/doc/event_stream_admin.xml index 442e152f8d6..34052eadf31 100644 --- a/modules/event_stream/doc/event_stream_admin.xml +++ b/modules/event_stream/doc/event_stream_admin.xml @@ -10,7 +10,7 @@ This module provides a TCP transport layer implementation for the Event Interface. The module can either send a JSON-RPC notification or a standard request and wait for the response (when used in - sync_mode). + reliable_mode). @@ -68,7 +68,7 @@ id - uniquly generated if - sync_mode is used, otherwise (for + reliable_mode is used, otherwise (for notifications) null. @@ -119,13 +119,13 @@
Exported Parameters -
- <varname>sync_mode</varname> (integer) +
+ <varname>reliable_mode</varname> (integer) This parameter controls the way the event_stream module communicates with the JSON-RPC server. If enabled, (set to - yes), each event is translated to + 1), each event is translated to a JSON-RPC request. If disabled, each event will be sent as a JSON-RPC notification - there will be no reply expected by our client. @@ -133,11 +133,11 @@ Note that if you need a reliable communication with the JSON-RPC server, where each event sent needs to be - confirmed, you must set this parameter to - 1/yes. Also, if you are using this + confirmed (by a JSON-RPC response), you must set this parameter + to 1/yes. If you are using this module in a failover setup (using the - event_virtual module), you should - also set this parameter to 1/yes. + event_virtual module), it is recommended + to set this parameter to 1/yes. @@ -145,10 +145,10 @@ - Set <varname>sync_mode</varname> parameter + Set <varname>reliable_mode</varname> parameter ... -modparam("event_stream", "sync_mode", yes) +modparam("event_stream", "reliable_mode", yes) ... @@ -158,9 +158,9 @@ modparam("event_stream", "sync_mode", yes) Specified the amount of milliseconds the module waits for a command to complete. In - sync_mode, it specifies the time + reliable_mode, it specifies the time module waits the request to be sent and a reply received. - In non-sync_mode, it represents + In non-reliable_mode, it represents only the time opensips takes to send the JSON-RPC notification. @@ -237,7 +237,7 @@ modparam("event_stream", "event_param", "opensips_event") JSON-RPC notification This is an example of an event raised when - sync_mode is disabled + reliable_mode is disabled by the pike module when it decides an ip should be blocked: @@ -260,11 +260,11 @@ modparam("event_stream", "event_param", "opensips_event") JSON-RPC Request This is an example of an event raised in - sync_mode by the pike module + reliable_mode by the pike module when it decides an ip should be blocked: - E_PIKE_BLOCKED JSON-RPC request (sync_mode) + E_PIKE_BLOCKED JSON-RPC request (reliable_mode) flags |= EVI_SOCKET; + sock->flags |= EVI_SOCKET|EVI_ASYNC_STATUS; /* address */ sock->address.s = (char*)(sock+1); @@ -336,7 +336,7 @@ static str stream_print(evi_reply_sock *sock) static int stream_raise(struct sip_msg *dummy_msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params) + evi_reply_sock *sock, evi_params_t *params, evi_async_ctx_t *async_ctx) { stream_send_t *msg = NULL; str socket; @@ -368,6 +368,8 @@ static int stream_raise(struct sip_msg *dummy_msg, str* ev_name, goto error; } + msg->async_ctx = *async_ctx; + if (stream_send(msg) < 0) { err_msg = "raising event"; goto error; diff --git a/modules/event_stream/stream_send.c b/modules/event_stream/stream_send.c index 74e9c430369..21163d5d1e7 100644 --- a/modules/event_stream/stream_send.c +++ b/modules/event_stream/stream_send.c @@ -38,8 +38,6 @@ #define JSONRPC_REQ_NEW 0 #define JSONRPC_REQ_SENT 1 #define STREAM_REACTOR_TIMEOUT 1 /* sec */ -#define STREAM_SEND_SUCCESS 0 -#define STREAM_SEND_FAIL -1 #define STREAM_MAX_PENDING_READS 4 #define IS_ERR(_err) (errno == _err) #define STREAM_ADDR(con) \ @@ -47,7 +45,7 @@ int stream_timeout = STREAM_DEFAULT_TIMEOUT; char *stream_event_param; -unsigned stream_sync_mode = 0; +unsigned stream_reliable_mode = 0; static int jrpc_id_index = 0; /* used to communicate with the sending process */ @@ -109,9 +107,6 @@ void stream_destroy_pipe(void) int stream_send(stream_send_t* streams) { int rc, retries = STREAM_SEND_RETRY; - long send_status; - - streams->process_idx = process_no; do { rc = write(stream_pipe[1], &streams, sizeof(stream_send_t *)); @@ -120,20 +115,10 @@ int stream_send(stream_send_t* streams) if (rc < 0) { LM_ERR("unable to send jsonrpc send struct to worker\n"); shm_free(streams); - return STREAM_SEND_FAIL; + return -1; } - /* give a chance to the writer :) */ - sched_yield(); - if (stream_sync_mode) { - if (ipc_recv_sync_reply((void **)(long *)&send_status) < 0) { - LM_ERR("cannot receive send status\n"); - send_status = STREAM_SEND_FAIL; - } - - return (int)send_status; - } else - return STREAM_SEND_SUCCESS; + return 0; } static stream_send_t * stream_receive(void) @@ -156,6 +141,34 @@ static stream_send_t * stream_receive(void) return recv; } +void stream_run_status_cb(int sender, void *param) +{ + struct stream_cb_ipc_param *cb_ipc_param = + (struct stream_cb_ipc_param *)param; + + cb_ipc_param->async_ctx.status_cb(cb_ipc_param->async_ctx.cb_param, + cb_ipc_param->status); + + shm_free(cb_ipc_param); +} + +static void stream_dispatch_status_cb(evi_async_ctx_t *async_ctx, + enum evi_status status) +{ + struct stream_cb_ipc_param *cb_ipc_param; + + cb_ipc_param = shm_malloc(sizeof *cb_ipc_param); + if (!cb_ipc_param) { + LM_ERR("oom!\n"); + return; + } + + cb_ipc_param->async_ctx = *async_ctx; + cb_ipc_param->status = status; + + ipc_dispatch_rpc(stream_run_status_cb, cb_ipc_param); +} + int stream_init_writer(void) { int flags; @@ -165,7 +178,7 @@ int stream_init_writer(void) stream_pipe[0] = -1; } - if (stream_sync_mode) { + if (stream_reliable_mode) { /* initilize indexes */ jrpc_id_index = my_pid() & USHRT_MAX; jrpc_id_index |= rand() << sizeof(unsigned short); @@ -200,7 +213,7 @@ static void jsonrpc_init_reader(void) static inline int jsonrpc_unique_id(void) { - if (!stream_sync_mode) + if (!stream_reliable_mode) return 0; /* * the format is 'rand | my_pid' @@ -231,7 +244,6 @@ static stream_send_t *stream_build_send_t(evi_reply_sock *sock, msg->message.len = jlen; msg->id = id; - msg->process_idx = process_no; gettimeofday(&msg->time, NULL); /* finally add the socket info */ @@ -251,7 +263,7 @@ int stream_build_buffer(str *event_name, evi_reply_sock *sock, if (stream_event_param) init_str(&extra_param, stream_event_param); - s = evi_build_payload(params, method, stream_sync_mode ? id : 0, + s = evi_build_payload(params, method, stream_reliable_mode ? id : 0, extra_param.s ? &extra_param : NULL, extra_param.s ? event_name : NULL); if (!s) { LM_ERR("Failed to build event payload %.*s\n", event_name->len, event_name->s); @@ -358,21 +370,6 @@ static void jsonrpc_cmd_free(struct jsonrpc_cmd *cmd) pkg_free(cmd); } -static void jsonrpc_cmd_write(int process_idx, int send_status) -{ - if (ipc_send_sync_reply(process_idx, (void *)(long)send_status) < 0) - LM_ERR("cannot send status back to requesting process\n"); -} - -static void jsonrpc_cmd_reply(struct jsonrpc_cmd *cmd, int send_status) -{ - - if (!stream_sync_mode) - return; - - jsonrpc_cmd_write(cmd->job->process_idx, send_status); -} - static void stream_con_free(struct stream_con *con) { struct list_head *it, *tmp; @@ -385,15 +382,14 @@ static void stream_con_free(struct stream_con *con) if (con->pending_buffer.len) pkg_free(con->pending_buffer.s); - if (stream_sync_mode) { - /* in sync mode, we need to send back error */ - list_for_each_safe(it, tmp, &con->cmds) { - cmd = list_entry(it, struct jsonrpc_cmd, list); - jsonrpc_cmd_reply(cmd, STREAM_SEND_FAIL); - list_del(&cmd->list); - jsonrpc_cmd_free(cmd); - } + list_for_each_safe(it, tmp, &con->cmds) { + cmd = list_entry(it, struct jsonrpc_cmd, list); + if (cmd->job->async_ctx.status_cb) + stream_dispatch_status_cb(&cmd->job->async_ctx, EVI_STATUS_FAIL); + list_del(&cmd->list); + jsonrpc_cmd_free(cmd); } + shutdown(con->fd, SHUT_RDWR); close(con->fd); /* remove from the list */ @@ -437,10 +433,8 @@ static void handle_new_stream(stream_send_t *stream) } error: - if (stream_sync_mode) { - /* we need to notify the process that the connection failed! */ - jsonrpc_cmd_write(stream->process_idx, STREAM_SEND_FAIL); - } + if (stream->async_ctx.status_cb) + stream_dispatch_status_cb(&stream->async_ctx, EVI_STATUS_FAIL); } static int handle_cmd_reply(struct stream_con *con, cJSON *reply) @@ -465,7 +459,7 @@ static int handle_cmd_reply(struct stream_con *con, cJSON *reply) /* now check if there is an error */ aux = cJSON_GetObjectItem(reply, "error"); - ret = (aux ? STREAM_SEND_FAIL : STREAM_SEND_SUCCESS); + ret = (aux ? EVI_STATUS_FAIL : EVI_STATUS_SUCCESS); /* XXX: should we check the version too?! */ @@ -474,7 +468,8 @@ static int handle_cmd_reply(struct stream_con *con, cJSON *reply) cmd = list_entry(it, struct jsonrpc_cmd, list); if (id != cmd->job->id) continue; - jsonrpc_cmd_reply(cmd, ret); + if (cmd->job->async_ctx.status_cb) + stream_dispatch_status_cb(&cmd->job->async_ctx, ret); list_del(&cmd->list); jsonrpc_cmd_free(cmd); /* all good */ @@ -505,8 +500,8 @@ static void handle_reply_jsonrpc(struct stream_con *con) goto error; } - /* if not in sync mode, no one listens for the reply */ - if (stream_sync_mode == 0) + /* if not in reliable mode, we are not interested in the reply */ + if (stream_reliable_mode == 0) return; /* got a reply - parse it and match a command */ @@ -632,9 +627,9 @@ static void handle_write_jsonrpc(struct stream_con *con) cmd->state = JSONRPC_REQ_SENT; con->pending_writes--; - /* if sync mode was not used, we don't really care about the reply, + /* if reliable mode was not used, we don't really care about the reply, * so we simply discard the job right here */ - if (!stream_sync_mode) { + if (!stream_reliable_mode) { list_del(&cmd->list); jsonrpc_cmd_free(cmd); } @@ -700,8 +695,8 @@ static void stream_cleanup_old(void) list_for_each_safe(it_cmd, tmp, &con->cmds) { cmd = list_entry(it_cmd, struct jsonrpc_cmd, list); if (get_time_diff(&cmd->job->time) > stream_timeout * 1000) { - if (stream_sync_mode) - jsonrpc_cmd_reply(cmd, STREAM_SEND_FAIL); + if (cmd->job->async_ctx.status_cb) + stream_dispatch_status_cb(&cmd->job->async_ctx, EVI_STATUS_FAIL); list_del(&cmd->list); LM_INFO("Handling JSON-RPC command [%.*s] timed out!\n", cmd->job->message.len, cmd->job->message.s); diff --git a/modules/event_stream/stream_send.h b/modules/event_stream/stream_send.h index 4f91347d9a7..15b4bb1560a 100644 --- a/modules/event_stream/stream_send.h +++ b/modules/event_stream/stream_send.h @@ -29,11 +29,16 @@ typedef struct _stream_send { union sockaddr_union addr; struct timeval time; - int process_idx; + evi_async_ctx_t async_ctx; str message; int id; } stream_send_t; +struct stream_cb_ipc_param { + evi_async_ctx_t async_ctx; + enum evi_status status; +}; + void stream_process(int rank); int stream_init_process(void); void stream_destroy_pipe(void); @@ -49,7 +54,7 @@ int stream_build_buffer(str *, #define JSONRPC_VERSION "2.0" extern int stream_timeout; -extern unsigned stream_sync_mode; +extern unsigned stream_reliable_mode; extern char *stream_event_param; #endif diff --git a/modules/event_virtual/doc/event_virtual_admin.xml b/modules/event_virtual/doc/event_virtual_admin.xml index 8ab0597f2bb..ee4aace88bd 100644 --- a/modules/event_virtual/doc/event_virtual_admin.xml +++ b/modules/event_virtual/doc/event_virtual_admin.xml @@ -18,12 +18,7 @@ notify the subscribers, in the order in which they are given, until the first successful notification. A failed subscriber is skipped for further notifications until the - passes. For some transport modules - (event_stream, event_xmlrpc, - event_rabbitmq), in order to properly - detect a failure when raising an event, you have to configure - synchronous operation through the sync_mode - module parameter of the respective module. + passes. ROUND-ROBIN - for every event raised, notify the subscribers alternatively, in the order in which they are given (for each raised event notify a different subscriber) diff --git a/modules/event_virtual/event_virtual.c b/modules/event_virtual/event_virtual.c index d1f8b924fbd..339e18857cf 100644 --- a/modules/event_virtual/event_virtual.c +++ b/modules/event_virtual/event_virtual.c @@ -37,8 +37,10 @@ static void virtual_free(evi_reply_sock *sock); static str virtual_print(evi_reply_sock *sock); static int virtual_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static evi_reply_sock* virtual_parse(str socket); -static int virtual_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); +static int virtual_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *async_ctx); + +static void virtual_status_cb(void *param, enum evi_status status); static struct virtual_socket **list_sockets; @@ -417,9 +419,153 @@ static int parse_socket(struct sub_socket *socket) { return 1; } -static int virtual_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, evi_params_t *params) { +static int failover_raise(struct sip_msg *msg, str *ev_name, + evi_params_t *params, struct sub_socket *cur_sock, + struct virtual_cb_param *cb_param) +{ + evi_async_ctx_t async_status; + int is_cb = (cb_param != NULL); + + while (cur_sock) { + lock_get(cur_sock->lock); + + if (cur_sock->last_failed && + (get_ticks() - cur_sock->last_failed <= failover_timeout)) { + lock_release(cur_sock->lock); + LM_DBG("skipping already failed socket %.*s\n", + cur_sock->sock_str.len, cur_sock->sock_str.s); + cur_sock = cur_sock->next; + continue; + } + + if (!cur_sock->trans_mod && !parse_socket(cur_sock)) { + cur_sock->last_failed = get_ticks(); + lock_release(cur_sock->lock); + LM_DBG("unable to parse socket %.*s trying next socket\n", + cur_sock->sock_str.len, cur_sock->sock_str.s); + cur_sock = cur_sock->next; + continue; + } + + if (cur_sock->sock->flags & EVI_ASYNC_STATUS) { + if (!cb_param) { + cb_param = shm_malloc(sizeof *cb_param + msg->len + ev_name->len); + if (!cb_param) { + lock_release(cur_sock->lock); + LM_ERR("oom!\n"); + return -1; + } + + cb_param->sip_msg_buf.len = msg->len; + cb_param->sip_msg_buf.s = (char *)(cb_param+1); + memcpy(cb_param->sip_msg_buf.s, msg->buf, msg->len); + + cb_param->ev_name.len = ev_name->len; + cb_param->ev_name.s = (char *)(cb_param+1) + msg->len; + memcpy(cb_param->ev_name.s, ev_name->s, ev_name->len); + + cb_param->evi_params = evi_dup_shm_params(params); + if (!cb_param->evi_params) { + LM_ERR("Failed to dup evi params in shm\n"); + shm_free(cb_param); + return -1; + } + } + + cb_param->current_sock = cur_sock; + + async_status.status_cb = virtual_status_cb; + } else { + async_status.cb_param = NULL; + } + + async_status.cb_param = cb_param; + + cur_sock->last_failed = 0; + + if (cur_sock->trans_mod->raise(msg, ev_name, cur_sock->sock, params, + &async_status)) { + cur_sock->last_failed = get_ticks(); + lock_release(cur_sock->lock); + LM_DBG("unable to raise socket %.*s trying next socket\n", + cur_sock->sock_str.len, cur_sock->sock_str.s); + cur_sock = cur_sock->next; + continue; + } + + lock_release(cur_sock->lock); + + break; + } + + if (!cur_sock) { + if (!is_cb && cb_param) { + evi_free_shm_params(cb_param->evi_params); + shm_free(cb_param); + } + return -1; + } else { + return 0; + } +} + +static void virtual_status_cb(void *param, enum evi_status status) +{ + struct virtual_cb_param *cbp = (struct virtual_cb_param *)param; + struct sub_socket *cur_sock; + struct sip_msg msg; + int free_cbp = 1; + + cur_sock = cbp->current_sock; + + if (status == EVI_STATUS_FAIL) { + LM_DBG("unable to raise socket %.*s trying next socket\n", + cur_sock->sock_str.len, cur_sock->sock_str.s); + + lock_get(cur_sock->lock); + cur_sock->last_failed = get_ticks(); + lock_release(cur_sock->lock); + + cur_sock = cbp->current_sock->next; + + memset(&msg, 0, sizeof(struct sip_msg)); + msg.buf = cbp->sip_msg_buf.s; + msg.len = cbp->sip_msg_buf.len; + if (parse_msg(msg.buf, msg.len, &msg)!=0) { + LM_ERR("Invalid SIP msg\n"); + goto end_free; + } + + if (!cur_sock || failover_raise(&msg, &cbp->ev_name, cbp->evi_params, + cur_sock, cbp) < 0) { + LM_ERR("unable to raise any socket for event: %.*s\n", + cbp->ev_name.len, cbp->ev_name.s); + free_sip_msg(&msg); + goto end_free; + } + + free_cbp = 0; + + free_sip_msg(&msg); + } else { + lock_get(cur_sock->lock); + cur_sock->last_failed = 0; + lock_release(cur_sock->lock); + } + +end_free: + if (free_cbp) { + evi_free_shm_params(cbp->evi_params); + shm_free(cbp); + } +} + +static int virtual_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock, + evi_params_t *params, evi_async_ctx_t *_) +{ struct virtual_socket *vsock; - struct sub_socket *h_list; + struct sub_socket *cur_sock; + evi_async_ctx_t async_status = {NULL, NULL}; if (!sock || !(sock->params)) { LM_ERR("invalid socket\n"); @@ -427,72 +573,34 @@ static int virtual_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock } vsock = (struct virtual_socket *)sock->params; - h_list = vsock->list_sockets; + cur_sock = vsock->list_sockets; switch (vsock->type) { /* raise all the sockets at once*/ case PARALLEL_TYPE : { - while (h_list) { - if (!h_list->trans_mod && !parse_socket(h_list)) { + while (cur_sock) { + if (!cur_sock->trans_mod && !parse_socket(cur_sock)) { LM_ERR("unable to parse socket %.*s\n", - h_list->sock_str.len, h_list->sock_str.s); + cur_sock->sock_str.len, cur_sock->sock_str.s); return -1; } - if (h_list->trans_mod->raise(msg, ev_name, h_list->sock, params)) { + if (cur_sock->trans_mod->raise(msg, ev_name, cur_sock->sock, + params, &async_status)) { LM_ERR("unable to raise socket %.*s\n", - h_list->sock_str.len, h_list->sock_str.s); + cur_sock->sock_str.len, cur_sock->sock_str.s); return -1; } - h_list = h_list->next; + cur_sock = cur_sock->next; } break; } /* try to raise all sockets until first successful raise*/ case FAILOVER_TYPE : { - while (h_list) { - lock_get(h_list->lock); - - if (h_list->last_failed && - (get_ticks() - h_list->last_failed <= failover_timeout)) { - lock_release(h_list->lock); - - LM_DBG("skipping already failed socket %.*s\n", - h_list->sock_str.len, h_list->sock_str.s); - h_list = h_list->next; - continue; - } - - if (!h_list->trans_mod && !parse_socket(h_list)) { - h_list->last_failed = get_ticks(); - lock_release(h_list->lock); - - LM_DBG("unable to parse socket %.*s trying next socket\n", - h_list->sock_str.len, h_list->sock_str.s); - h_list = h_list->next; - continue; - } - - if (h_list->trans_mod->raise(msg, ev_name, h_list->sock, params)) { - h_list->last_failed = get_ticks(); - lock_release(h_list->lock); - - LM_DBG("unable to raise socket %.*s trying next socket\n", - h_list->sock_str.len, h_list->sock_str.s); - h_list = h_list->next; - continue; - } - - h_list->last_failed = 0; - - lock_release(h_list->lock); - - break; - } - - if (!h_list) { - LM_ERR("unable to raise any socket\n"); + if (failover_raise(msg, ev_name, params, cur_sock, NULL) < 0) { + LM_ERR("unable to raise any socket for event: %.*s\n", + ev_name->len, ev_name->s); return -1; } break; @@ -502,20 +610,20 @@ static int virtual_raise(struct sip_msg *msg, str* ev_name, evi_reply_sock *sock lock_get(rrobin_lock); if (!vsock->current_sock) - vsock->current_sock = h_list; + vsock->current_sock = cur_sock; if (!vsock->current_sock->trans_mod && !parse_socket(vsock->current_sock)) { - LM_ERR("unable to parse socket %.*s\n", - vsock->current_sock->sock_str.len, - vsock->current_sock->sock_str.s); - return -1; + LM_ERR("unable to parse socket %.*s\n", + vsock->current_sock->sock_str.len, + vsock->current_sock->sock_str.s); + return -1; } if (vsock->current_sock->trans_mod->raise(msg, ev_name, - vsock->current_sock->sock, params)) { - LM_ERR("unable to raise socket %.*s\n", - vsock->current_sock->sock_str.len, vsock->current_sock->sock_str.s); - return -1; + vsock->current_sock->sock, params, &async_status)) { + LM_ERR("unable to raise socket %.*s\n", + vsock->current_sock->sock_str.len, vsock->current_sock->sock_str.s); + return -1; } vsock->current_sock = vsock->current_sock->next; diff --git a/modules/event_virtual/event_virtual.h b/modules/event_virtual/event_virtual.h index 385dac89e8a..30c5dab1ab2 100644 --- a/modules/event_virtual/event_virtual.h +++ b/modules/event_virtual/event_virtual.h @@ -69,4 +69,11 @@ struct sub_socket { struct sub_socket *next; }; +struct virtual_cb_param { + struct sub_socket *current_sock; + str sip_msg_buf; + str ev_name; + evi_params_t *evi_params; +}; + #endif diff --git a/modules/event_xmlrpc/doc/event_xmlrpc_admin.xml b/modules/event_xmlrpc/doc/event_xmlrpc_admin.xml index 7e6b178faa1..77a5c233993 100644 --- a/modules/event_xmlrpc/doc/event_xmlrpc_admin.xml +++ b/modules/event_xmlrpc/doc/event_xmlrpc_admin.xml @@ -108,25 +108,6 @@ ... modparam("event_xmlrpc", "use_struct_param", 1) ... - - -
-
- <varname>sync_mode</varname> (integer) - - Specifies whether an event raise operates synchronous or asynchronous relative to the process where the raise is triggered.In synchronous mode the process waits for the status of the raise from the actual worker process.In asynchronous mode the process continues its operation without receiving any confirmation. - - - - Default value is 0 (asynchronous). - - - - Set <varname>sync_mode</varname> parameter - -... -modparam("event_xmlrpc", "sync_mode", 1) -...
diff --git a/modules/event_xmlrpc/event_xmlrpc.c b/modules/event_xmlrpc/event_xmlrpc.c index 1e9286b0eda..5f255be13f6 100644 --- a/modules/event_xmlrpc/event_xmlrpc.c +++ b/modules/event_xmlrpc/event_xmlrpc.c @@ -35,7 +35,6 @@ #include extern unsigned xmlrpc_struct_on; -extern unsigned xmlrpc_sync_mode; /** * module functions @@ -48,8 +47,8 @@ static int child_init(int); * exported functions */ static evi_reply_sock* xmlrpc_parse(str socket); -static int xmlrpc_raise(struct sip_msg *msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params); +static int xmlrpc_raise(struct sip_msg *dummy_msg, str* ev_name, + evi_reply_sock *sock, evi_params_t * params, evi_async_ctx_t *async_ctx); static int xmlrpc_match(evi_reply_sock *sock1, evi_reply_sock *sock2); static void xmlrpc_free(evi_reply_sock *sock); static str xmlrpc_print(evi_reply_sock *sock); @@ -65,7 +64,6 @@ static proc_export_t procs[] = { /* module parameters */ static param_export_t mod_params[] = { {"use_struct_param", INT_PARAM, &xmlrpc_struct_on}, - {"sync_mode", INT_PARAM, &xmlrpc_sync_mode}, {0,0,0} }; @@ -328,7 +326,7 @@ static evi_reply_sock* xmlrpc_parse(str socket) sock->flags |= EVI_PARAMS; /* needs expire */ - sock->flags |= EVI_EXPIRE|XMLRPC_FLAG; + sock->flags |= EVI_EXPIRE|XMLRPC_FLAG|EVI_ASYNC_STATUS; sock->params= params; @@ -392,7 +390,7 @@ static str xmlrpc_print(evi_reply_sock *sock) static int xmlrpc_raise(struct sip_msg *dummy_msg, str* ev_name, - evi_reply_sock *sock, evi_params_t * params) + evi_reply_sock *sock, evi_params_t * params, evi_async_ctx_t *async_ctx) { xmlrpc_send_t * msg = NULL; @@ -426,6 +424,8 @@ static int xmlrpc_raise(struct sip_msg *dummy_msg, str* ev_name, return -1; } + msg->async_ctx = *async_ctx; + if (xmlrpc_send(msg) < 0) { LM_ERR("cannot send message\n"); return -1; diff --git a/modules/event_xmlrpc/xmlrpc_send.c b/modules/event_xmlrpc/xmlrpc_send.c index fcac0ab0b13..3bf29c31cc8 100644 --- a/modules/event_xmlrpc/xmlrpc_send.c +++ b/modules/event_xmlrpc/xmlrpc_send.c @@ -37,7 +37,6 @@ #define IS_ERR(_err) (errno == _err) unsigned xmlrpc_struct_on = 0; -unsigned xmlrpc_sync_mode = 0; static char * xmlrpc_body_buf = 0; static struct iovec xmlrpc_iov[XMLRPC_IOVEC_MAX_SIZE]; static unsigned xmlrpc_iov_len = 0; @@ -94,9 +93,6 @@ void xmlrpc_destroy_pipe(void) int xmlrpc_send(xmlrpc_send_t* xmlrpcs) { int rc, retries = XMLRPC_SEND_RETRY; - long send_status; - - xmlrpcs->process_idx = process_no; do { rc = write(xmlrpc_pipe[1], &xmlrpcs, sizeof(xmlrpc_send_t *)); @@ -105,21 +101,10 @@ int xmlrpc_send(xmlrpc_send_t* xmlrpcs) if (rc < 0) { LM_ERR("unable to send xmlrpc send struct to worker\n"); shm_free(xmlrpcs); - return XMLRPC_SEND_FAIL; + return -1; } - /* give a change to the writer :) */ - sched_yield(); - - if (xmlrpc_sync_mode) { - - if (ipc_recv_sync_reply((void **)(long *)&send_status) < 0) { - LM_ERR("cannot receive send status\n"); - send_status = XMLRPC_SEND_FAIL; - } - return (int)send_status; - } else - return XMLRPC_SEND_SUCCESS; + return 0; } static xmlrpc_send_t * xmlrpc_receive(void) @@ -497,10 +482,37 @@ static void xmlrpc_init_send_buf(void) } +void xmlrpc_run_status_cb(int sender, void *param) +{ + struct xmlrpc_cb_ipc_param *cb_ipc_param = + (struct xmlrpc_cb_ipc_param *)param; + + cb_ipc_param->async_ctx.status_cb(cb_ipc_param->async_ctx.cb_param, + cb_ipc_param->status); + + shm_free(cb_ipc_param); +} + +static void xmlrpc_dispatch_status_cb(evi_async_ctx_t *async_ctx, + enum evi_status status) +{ + struct xmlrpc_cb_ipc_param *cb_ipc_param; + + cb_ipc_param = shm_malloc(sizeof *cb_ipc_param); + if (!cb_ipc_param) { + LM_ERR("oom!\n"); + return; + } + + cb_ipc_param->async_ctx = *async_ctx; + cb_ipc_param->status = status; + + ipc_dispatch_rpc(xmlrpc_run_status_cb, cb_ipc_param); +} void xmlrpc_process(int rank) { - int send_status; + enum evi_status status; /* init blocking reader */ xmlrpc_init_reader(); @@ -518,14 +530,12 @@ void xmlrpc_process(int rank) /* send msg */ if (xmlrpc_sendmsg(xmlrpcs)) { LM_ERR("cannot send message\n"); - send_status = XMLRPC_SEND_FAIL; + status = EVI_STATUS_FAIL; } else - send_status = XMLRPC_SEND_SUCCESS; + status = EVI_STATUS_SUCCESS; - if (xmlrpc_sync_mode) { - if (ipc_send_sync_reply(xmlrpcs->process_idx, (void *)(long)send_status) < 0) - LM_ERR("cannot send status back to requesting process\n"); - } + if (xmlrpcs->async_ctx.status_cb) + xmlrpc_dispatch_status_cb(&xmlrpcs->async_ctx, status); end: if (xmlrpcs) shm_free(xmlrpcs); diff --git a/modules/event_xmlrpc/xmlrpc_send.h b/modules/event_xmlrpc/xmlrpc_send.h index a708a324f98..de87ad583e8 100644 --- a/modules/event_xmlrpc/xmlrpc_send.h +++ b/modules/event_xmlrpc/xmlrpc_send.h @@ -32,14 +32,19 @@ typedef struct _xmlrpc_send { union sockaddr_union addr; + evi_async_ctx_t async_ctx; str body; str method; str host; str first_line; str event; - int process_idx; } xmlrpc_send_t; +struct xmlrpc_cb_ipc_param { + evi_async_ctx_t async_ctx; + enum evi_status status; +}; + void xmlrpc_process(int rank); int xmlrpc_create_pipe(void); void xmlrpc_destroy_pipe(void); @@ -54,8 +59,6 @@ int xmlrpc_build_buffer(str *, #define XMLRPC_DEFAULT_BUFFER_SIZE 8192 #define XMLRPC_IOVEC_MAX_SIZE 32 #define XMLRPC_DEFAULT_PORT 8080 -#define XMLRPC_SEND_SUCCESS 0 -#define XMLRPC_SEND_FAIL -1 /* string macros */ /* computes a macro len */