Skip to content

Commit

Permalink
rtpengine: refactor of node probing
Browse files Browse the repository at this point in the history
Probing of disabled rtpengine nodes is now done in timer routine instead of SIP context.
  • Loading branch information
john08burke committed Aug 11, 2021
1 parent 0d2dac3 commit 9ccd25f
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
20 changes: 20 additions & 0 deletions modules/rtpengine/doc/rtpengine_admin.xml
Expand Up @@ -185,6 +185,26 @@ modparam("rtpengine", "rtpengine_tout", 2)
...
modparam("rtpengine", "rtpengine_retr", 2)
...
</programlisting>
</example>
</section>
<section id="param_rtpengine_timer_interval" xreflabel="rtpengine_timer_interval">
<title><varname>rtpengine_timer_interval</varname> (integer)</title>
<para>
Frequency to scan rtpengine sets for disabled node probing. Probing is done
outside the SIP processing context and in a separate timer routine. Disabled nodes
are probed for re-enablement after rtpengine_disable_tout seconds. Setting this value
too high can lead to unexpectedly large disabled interval as the max interval
before probing is (rtpengine_timer_interval + rtpengine_disable_tout) seconds.
</para><para>
Default value is <quote>5</quote>.
</para>
<example>
<title>Set <varname>rtpengine_timer_interval</varname> parameter</title>
<programlisting format="linespecific">
...
modparam("rtpengine", "rtpengine_timer_interval", 1)
...
</programlisting>
</example>
</section>
Expand Down
61 changes: 32 additions & 29 deletions modules/rtpengine/rtpengine.c
Expand Up @@ -296,7 +296,7 @@ static int fixup_set_id(void ** param);
static int fixup_free_set_id(void ** param);
static int set_rtpengine_set_f(struct sip_msg * msg, rtpe_set_link_t *set_param);
static struct rtpe_set * select_rtpe_set(int id_set);
static struct rtpe_node *select_rtpe_node(str, int, struct rtpe_set *);
static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *);
static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *);
static int get_extra_id(struct sip_msg* msg, str *id_str);

Expand Down Expand Up @@ -330,6 +330,7 @@ static int rtpengine_stats_used = 0;
static int rtpengine_disable_tout = 60;
static int rtpengine_retr = 5;
static int rtpengine_tout = 1;
static int rtpengine_timer_interval = 5;
static pid_t mypid;
static int myrand = 0;
static unsigned int myseqn = 0;
Expand Down Expand Up @@ -637,6 +638,7 @@ static param_export_t params[] = {
{"rtpengine_disable_tout", INT_PARAM, &rtpengine_disable_tout },
{"rtpengine_retr", INT_PARAM, &rtpengine_retr },
{"rtpengine_tout", INT_PARAM, &rtpengine_tout },
{"rtpengine_timer_interval", INT_PARAM, &rtpengine_timer_interval},
{"notification_sock", STR_PARAM, &rtpengine_notify_sock.s},
{"extra_id_pv", STR_PARAM, &extra_id_pv_param.s },
{"setid_avp", STR_PARAM, &setid_avp_param },
Expand Down Expand Up @@ -1201,6 +1203,26 @@ static mi_response_t *mi_teardown_call(const mi_params_t *params,
return init_mi_result_ok();
}

/*
* Timer housekeeping, invoked each timer interval to check for proxy re-enablement
*/
void rtpengine_timer(unsigned int ticks, void *param)
{
struct rtpe_set *rtpe_list;
struct rtpe_node *crt_rtpe;

RTPE_START_READ();
for(rtpe_list = (*rtpe_set_list)->rset_first; rtpe_list != NULL;
rtpe_list = rtpe_list->rset_next){
for(crt_rtpe = rtpe_list->rn_first; crt_rtpe != NULL;
crt_rtpe = crt_rtpe->rn_next){
if (crt_rtpe->rn_disabled && crt_rtpe->rn_recheck_ticks <= get_ticks())
crt_rtpe->rn_disabled = rtpe_test(crt_rtpe, 0, 1);
}
}
RTPE_STOP_READ();
}

/* hack to get the rtpengine node used for the offer */
static pv_spec_t media_pvar;

Expand Down Expand Up @@ -1375,6 +1397,11 @@ mod_init(void)
memset(&dlgb, 0, sizeof(struct dlg_binds));
}

if (register_timer("rtpengine-timer", rtpengine_timer, NULL, rtpengine_timer_interval, TIMER_FLAG_SKIP_ON_DELAY) <0 ) {
LM_ERR("could not register timer function\n");
return -1;
}

return 0;
}

Expand Down Expand Up @@ -2104,10 +2131,10 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
do {
if (snode && snode->s) {
if ((node = get_rtpe_node(snode, set)) == NULL)
node = select_rtpe_node(ng_flags.call_id, 1, set);
node = select_rtpe_node(ng_flags.call_id, set);
snode = NULL;
} else {
node = select_rtpe_node(ng_flags.call_id, 1, set);
node = select_rtpe_node(ng_flags.call_id, set);
}
if (!node) {
LM_ERR("no available proxies\n");
Expand Down Expand Up @@ -2486,7 +2513,7 @@ static struct rtpe_set * select_rtpe_set(int id_set )
* too expensive here.
*/
static struct rtpe_node *
select_rtpe_node(str callid, int do_test, struct rtpe_set *set)
select_rtpe_node(str callid, struct rtpe_set *set)
{
unsigned sum, weight_sum;
struct rtpe_node* node;
Expand All @@ -2506,11 +2533,8 @@ select_rtpe_node(str callid, int do_test, struct rtpe_set *set)
/* Most popular case: 1 proxy, nothing to calculate */
if (set->rtpe_node_count == 1) {
node = set->rn_first;
if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks())
node->rn_disabled = rtpe_test(node, 1, 0);
if (node->rn_disabled)
return NULL;

return node;
}

Expand All @@ -2520,31 +2544,18 @@ select_rtpe_node(str callid, int do_test, struct rtpe_set *set)
sum &= 0xff;

was_forced = 0;
retry:
weight_sum = 0;
constant_weight_sum = 0;
found = 0;
for (node=set->rn_first; node!=NULL; node=node->rn_next) {

if (node->rn_disabled && node->rn_recheck_ticks <= get_ticks()){
/* Try to enable if it's time to try. */
node->rn_disabled = rtpe_test(node, 1, 0);
}
constant_weight_sum += node->rn_weight;
if (!node->rn_disabled) {
weight_sum += node->rn_weight;
found = 1;
}
}
if (found == 0) {
/* No proxies? Force all to be re-detected, if not yet */
if (was_forced)
return NULL;
was_forced = 1;
for(node=set->rn_first; node!=NULL; node=node->rn_next) {
node->rn_disabled = rtpe_test(node, 1, 1);
}
goto retry;
}
sumcut = weight_sum ? sum % constant_weight_sum : -1;
/*
Expand All @@ -2555,7 +2566,7 @@ select_rtpe_node(str callid, int do_test, struct rtpe_set *set)
for (node=set->rn_first; node!=NULL;) {
if (sumcut < (int)node->rn_weight) {
if (!node->rn_disabled)
goto found;
return node;
if (was_forced == 0) {
/* appropriate proxy is disabled : redistribute on enabled ones */
sumcut = weight_sum ? sum % weight_sum : -1;
Expand All @@ -2569,14 +2580,6 @@ select_rtpe_node(str callid, int do_test, struct rtpe_set *set)
}
/* No node list */
return NULL;
found:
if (do_test) {
node->rn_disabled = rtpe_test(node, node->rn_disabled, 0);
if (node->rn_disabled)
goto retry;
}

return node;
}

static int
Expand Down

0 comments on commit 9ccd25f

Please sign in to comment.