Skip to content

Commit

Permalink
Merge pull request #2919 from goharahmed/eventExpiry
Browse files Browse the repository at this point in the history
EVI: Event subscribers expiry enhancements
(cherry picked from commit 3ddd19e)
  • Loading branch information
razvancrainea committed Nov 3, 2022
1 parent cd034c8 commit ad6a2b7
Showing 1 changed file with 67 additions and 4 deletions.
71 changes: 67 additions & 4 deletions evi/event_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,49 @@ int evi_raise_event(event_id_t id, evi_params_t* params)

return status;
}
/* this function checks the subscribers of an event and remove them if
they are past their expiry - dont want to print expired events as well */
void evi_remove_expired_subs(event_id_t id) {
evi_subs_p subs, prev;
long now;

lock_get(events[id].lock);
now = time(0);
subs = events[id].subscribers;
prev = NULL;
while (subs) {
if (!subs->reply_sock) {
LM_ERR("unknown destination\n");
continue;
}
/* check expire */
if (!(subs->reply_sock->flags & EVI_PENDING) &&
subs->reply_sock->flags & EVI_EXPIRE &&
subs->reply_sock->subscription_time +
subs->reply_sock->expire < now) {
LM_DBG("removing expired subscriber %.*s:%.*s:%d for event %.*s\n",
subs->trans_mod->proto.len,subs->trans_mod->proto.s,
subs->reply_sock->address.len, subs->reply_sock->address.s,
subs->reply_sock->port,events[id].name.len,events[id].name.s);
if (subs->trans_mod && subs->trans_mod->free)
subs->trans_mod->free(subs->reply_sock);
else
shm_free(subs->reply_sock);
if (!prev) {
events[id].subscribers = subs->next;
shm_free(subs);
subs = events[id].subscribers;
} else {
prev->next = subs->next;
shm_free(subs);
subs = prev->next;
}
continue;
}
subs = subs->next;
}
lock_release(events[id].lock);
}
/* XXX: this function should release its parameters before exiting */
int evi_raise_event_msg(struct sip_msg *msg, event_id_t id, evi_params_t* params)
{
Expand Down Expand Up @@ -511,6 +553,8 @@ static int evi_print_subscriber(mi_item_t *subs_obj, evi_subs_p subs)
{
evi_reply_sock *sock;
str socket;
long now;
int expiry_countdown;

if (!subs || !subs->trans_mod || !subs->trans_mod->print) {
LM_ERR("subscriber does not have a print method exported\n");
Expand All @@ -533,10 +577,21 @@ static int evi_print_subscriber(mi_item_t *subs_obj, evi_subs_p subs)
subs->trans_mod->proto.len, subs->trans_mod->proto.s,
socket.len, socket.s) < 0)
return -1;

if (sock->flags & EVI_EXPIRE) {
/* indicate the expiration time used for the event*/
if (add_mi_number(subs_obj, MI_SSTR("expire"), sock->expire) < 0)
return -1;
return -1;
now = time(0);
/* calculate the remaining time for the subscriber to be expired */
expiry_countdown = sock->expire - (now - subs->reply_sock->subscription_time);
if (expiry_countdown > 0) {
if (add_mi_number(subs_obj, MI_SSTR("ttl"), expiry_countdown) < 0)
return -1;
}else{
/* Mark event as expired if time-to-expire is reached*/
if (add_mi_string(subs_obj, MI_SSTR("ttl"), MI_SSTR("expired")) < 0)
return -1;
}
} else {
if (add_mi_string(subs_obj, MI_SSTR("expire"), MI_SSTR("never")) < 0)
return -1;
Expand Down Expand Up @@ -677,6 +732,8 @@ mi_response_t *w_mi_subscribers_list(const mi_params_t *params,
goto error;

for (i = 0; i < events_no; i++) {
/* before printing the subs list check for any expired subscribers and remove them*/
evi_remove_expired_subs(events[i].id);
if (!events[i].subscribers)
continue;

Expand All @@ -703,13 +760,16 @@ mi_response_t *w_mi_subscribers_list_1(const mi_params_t *params,
{
str event_s;
evi_event_p event;

int evid;
if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
return init_mi_param_error();

event = evi_get_event(&event_s);
if (!event)
return init_mi_error(404, MI_SSTR("Event not published"));
/* get the event id & before printing the subs list check for any expired subscribers and remove them*/
evid = evi_get_id(&event_s);
evi_remove_expired_subs(evid);

return mi_subscribers_list(event, NULL);
}
Expand All @@ -721,13 +781,16 @@ mi_response_t *w_mi_subscribers_list_2(const mi_params_t *params,
str subs_s;
evi_event_p event;
evi_subs_p subs;

int evid;
if (get_mi_string_param(params, "event", &event_s.s, &event_s.len) < 0)
return init_mi_param_error();

event = evi_get_event(&event_s);
if (!event)
return init_mi_error(404, MI_SSTR("Event not published"));
/* get the event id & before printing the subs list check for any expired subscribers and remove them*/
evid = evi_get_id(&event_s);
evi_remove_expired_subs(evid);

if (get_mi_string_param(params, "socket", &subs_s.s, &subs_s.len) < 0)
return init_mi_param_error();
Expand Down

0 comments on commit ad6a2b7

Please sign in to comment.