Skip to content

Commit

Permalink
Fix the timeout for wait_for_event()
Browse files Browse the repository at this point in the history
Even if a timeout is provided, it was not taken into account, leading to a potential permanent blocking of a request execution thread if there was no event ever delivered to it.
  • Loading branch information
bogdan-iancu committed Sep 23, 2021
1 parent c38c590 commit 7917c42
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
108 changes: 106 additions & 2 deletions modules/event_routing/ebr_data.c
Expand Up @@ -59,7 +59,6 @@ extern struct tm_binds ebr_tmb;
static ebr_event *ebr_events = NULL;



ebr_event* search_ebr_event( const str *name )
{
ebr_event *ev;
Expand Down Expand Up @@ -466,8 +465,11 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)

lock_get( &(ev->lock) );

ev->last_timeout_check = my_time;

/* check the EBR subscription on this event and apply the filters */
sub_prev = NULL;
sub_next = NULL;
for ( sub=ev->subs ; sub ; sub_prev=sub,
sub=sub_next?sub_next:(sub?sub->next:NULL) ) {

Expand All @@ -479,6 +481,26 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
sub->proc_no, pt[sub->proc_no].pid,
sub->event->event_name.len, sub->event->event_name.s,
sub->expire );
/* fire the job, if we deal with an WAIT */
if (sub->flags&EBR_SUBS_TYPE_WAIT) {
job =(ebr_ipc_job*)shm_malloc( sizeof(ebr_ipc_job) );
if (job==NULL) {
LM_ERR("failed to allocated new IPC job, skipping..\n");
continue; /* with the next subscription */
}
job->ev = ev;
job->data = sub->data;
job->flags = sub->flags;
job->tm = sub->tm;
job->avps = NULL;
/* sent the event notification via IPC to resume on the
* subscribing process */
if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) {
LM_ERR("failed to send job via IPC, skipping...\n");
shm_free(job);
continue; /* keep it and try next time */
}
}
/* remove the subscription */
sub_next = sub->next;
/* unlink it */
Expand Down Expand Up @@ -579,7 +601,89 @@ int notify_ebr_subscriptions( ebr_event *ev, evi_params_t *params)
if (avps!=(void*)-1)
destroy_avp_list( &avps );

return 0;
return 0; /* sent the event notification via IPC to resume on the
* subscribing process */
if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) {
LM_ERR("failed to send job via IPC, skipping...\n");
shm_free(job);
}

}


void ebr_timeout(unsigned int ticks, void* param)
{
ebr_event *ev;
ebr_subscription *sub, *sub_next, *sub_prev;
ebr_ipc_job *job;
unsigned int my_time;

/* iterate all events */
for( ev=ebr_events ; ev ; ev=ev->next) {

/* see if the expire check was done within this second */
if ( ev->last_timeout_check >= get_ticks() )
continue;

//LM_DBG("expiring for event %.*s\n",
// ev->event_name.len, ev->event_name.s);

my_time = get_ticks();

lock_get( &(ev->lock) );

ev->last_timeout_check = my_time;

/* check the EBR subscriptions on this event */
sub_prev = NULL;
sub_next = NULL;
for ( sub=ev->subs ; sub ; sub_prev=sub,
sub=sub_next?sub_next:(sub?sub->next:NULL) ) {

/* skip valid and non WAIT subscriptions */
if ( (sub->flags&EBR_SUBS_TYPE_WAIT)==0 || sub->expire>my_time )
continue;

LM_DBG("subscription type [%s] from process %d(pid %d) on "
"event <%.*s> expired at %d, now %d\n",
(sub->flags&EBR_SUBS_TYPE_WAIT)?"WAIT":"NOTIFY",
sub->proc_no, pt[sub->proc_no].pid,
sub->event->event_name.len, sub->event->event_name.s,
sub->expire, my_time );

/* fire the job */
job =(ebr_ipc_job*)shm_malloc( sizeof(ebr_ipc_job) );
if (job==NULL) {
LM_ERR("failed to allocated new IPC job, skipping..\n");
continue; /* with the next subscription */
}
job->ev = ev;
job->data = sub->data;
job->flags = sub->flags;
job->tm = sub->tm;
job->avps = NULL;
/* sent the event notification via IPC to resume on the
* subscribing process */
if (ipc_send_job( sub->proc_no, ebr_ipc_type , (void*)job)<0) {
LM_ERR("failed to send job via IPC, skipping...\n");
shm_free(job);
continue; /* with the next subscription */
}

/* remove the subscription */
sub_next = sub->next;
/* unlink it */
if (sub_prev) sub_prev->next = sub_next;
else ev->subs = sub_next;
/* free it */
free_ebr_subscription(sub);
/* do not count us as prev, as we are removed */
sub = sub_prev;
}

lock_release( &(ev->lock) );

}
}


Expand Down
3 changes: 3 additions & 0 deletions modules/event_routing/ebr_data.h
Expand Up @@ -65,6 +65,7 @@ typedef struct _ebr_event {
str event_name;
int event_id;
gen_lock_t lock;
unsigned int last_timeout_check;
ebr_subscription *subs;
struct _ebr_event *next;
} ebr_event;
Expand All @@ -89,5 +90,7 @@ void handle_ebr_ipc(int sender, void *payload);

int ebr_resume_from_wait(int *fd, struct sip_msg *msg, void *param);

void ebr_timeout(unsigned int ticks, void* param);

#endif

6 changes: 6 additions & 0 deletions modules/event_routing/event_routing.c
Expand Up @@ -200,6 +200,12 @@ static int mod_init(void)
LM_NOTICE("unable to load TM API, so TM context will not be "
"available in notification routes\n");

if (register_timer( "EBR timeout", ebr_timeout, NULL, 1,
TIMER_FLAG_SKIP_ON_DELAY)<0 ) {
LM_ERR("failed to register timer, halting...");
return -1;
}

return 0;
}

Expand Down

0 comments on commit 7917c42

Please sign in to comment.