diff --git a/modules/event_routing/ebr_data.c b/modules/event_routing/ebr_data.c index 96920c16618..a085dd58666 100644 --- a/modules/event_routing/ebr_data.c +++ b/modules/event_routing/ebr_data.c @@ -59,7 +59,6 @@ extern struct tm_binds ebr_tmb; static ebr_event *ebr_events = NULL; - ebr_event* search_ebr_event( const str *name ) { ebr_event *ev; @@ -466,8 +465,11 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params) lock_get( &(ev->lock) ); + ev->last_timeout_check = my_time; + /* check the EBR subscription on this event and apply the filters */ sub_prev = NULL; + sub_next = NULL; for ( sub=ev->subs ; sub ; sub_prev=sub, sub=sub_next?sub_next:(sub?sub->next:NULL) ) { @@ -479,6 +481,26 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params) sub->proc_no, pt[sub->proc_no].pid, sub->event->event_name.len, sub->event->event_name.s, sub->expire ); + /* fire the job, if we deal with an WAIT */ + if (sub->flags&EBR_SUBS_TYPE_WAIT) { + job =(ebr_ipc_job*)shm_malloc( sizeof(ebr_ipc_job) ); + if (job==NULL) { + LM_ERR("failed to allocated new IPC job, skipping..\n"); + continue; /* with the next subscription */ + } + job->ev = ev; + job->data = sub->data; + job->flags = sub->flags; + job->tm = sub->tm; + job->avps = NULL; + /* sent the event notification via IPC to resume on the + * subscribing process */ + if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) { + LM_ERR("failed to send job via IPC, skipping...\n"); + shm_free(job); + continue; /* keep it and try next time */ + } + } /* remove the subscription */ sub_next = sub->next; /* unlink it */ @@ -579,7 +601,89 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params) if (avps!=(void*)-1) destroy_avp_list( &avps ); - return 0; + return 0; /* sent the event notification via IPC to resume on the + * subscribing process */ + if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) { + LM_ERR("failed to send job via IPC, skipping...\n"); + shm_free(job); + } + +} + + +void ebr_timeout(unsigned int ticks, void* param) +{ + ebr_event *ev; + ebr_subscription *sub, *sub_next, *sub_prev; + ebr_ipc_job *job; + unsigned int my_time; + + /* iterate all events */ + for( ev=ebr_events ; ev ; ev=ev->next) { + + /* see if the expire check was done within this second */ + if ( ev->last_timeout_check >= get_ticks() ) + continue; + + //LM_DBG("expiring for event %.*s\n", + // ev->event_name.len, ev->event_name.s); + + my_time = get_ticks(); + + lock_get( &(ev->lock) ); + + ev->last_timeout_check = my_time; + + /* check the EBR subscriptions on this event */ + sub_prev = NULL; + sub_next = NULL; + for ( sub=ev->subs ; sub ; sub_prev=sub, + sub=sub_next?sub_next:(sub?sub->next:NULL) ) { + + /* skip valid and non WAIT subscriptions */ + if ( (sub->flags&EBR_SUBS_TYPE_WAIT)==0 || sub->expire>my_time ) + continue; + + LM_DBG("subscription type [%s] from process %d(pid %d) on " + "event <%.*s> expired at %d, now %d\n", + (sub->flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY", + sub->proc_no, pt[sub->proc_no].pid, + sub->event->event_name.len, sub->event->event_name.s, + sub->expire, my_time ); + + /* fire the job */ + job =(ebr_ipc_job*)shm_malloc( sizeof(ebr_ipc_job) ); + if (job==NULL) { + LM_ERR("failed to allocated new IPC job, skipping..\n"); + continue; /* with the next subscription */ + } + job->ev = ev; + job->data = sub->data; + job->flags = sub->flags; + job->tm = sub->tm; + job->avps = NULL; + /* sent the event notification via IPC to resume on the + * subscribing process */ + if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) { + LM_ERR("failed to send job via IPC, skipping...\n"); + shm_free(job); + continue; /* with the next subscription */ + } + + /* remove the subscription */ + sub_next = sub->next; + /* unlink it */ + if (sub_prev) sub_prev->next = sub_next; + else ev->subs = sub_next; + /* free it */ + free_ebr_subscription(sub); + /* do not count us as prev, as we are removed */ + sub = sub_prev; + } + + lock_release( &(ev->lock) ); + + } } diff --git a/modules/event_routing/ebr_data.h b/modules/event_routing/ebr_data.h index fcaf4f4ac6d..9a18f2c5a20 100644 --- a/modules/event_routing/ebr_data.h +++ b/modules/event_routing/ebr_data.h @@ -65,6 +65,7 @@ typedef struct _ebr_event { str event_name; int event_id; gen_lock_t lock; + unsigned int last_timeout_check; ebr_subscription *subs; struct _ebr_event *next; } ebr_event; @@ -89,5 +90,7 @@ void handle_ebr_ipc(int sender, void *payload); int ebr_resume_from_wait(int *fd, struct sip_msg *msg, void *param); +void ebr_timeout(unsigned int ticks, void* param); + #endif diff --git a/modules/event_routing/event_routing.c b/modules/event_routing/event_routing.c index fd5b3873ae8..5abf07ec505 100644 --- a/modules/event_routing/event_routing.c +++ b/modules/event_routing/event_routing.c @@ -200,6 +200,12 @@ static int mod_init(void) LM_NOTICE("unable to load TM API, so TM context will not be " "available in notification routes\n"); + if (register_timer( "EBR timeout", ebr_timeout, NULL, 1, + TIMER_FLAG_SKIP_ON_DELAY)<0 ) { + LM_ERR("failed to register timer, halting..."); + return -1; + } + return 0; }