Skip to content

Commit

Permalink
dispatcher: add support for pemanent pinging
Browse files Browse the repository at this point in the history
Using the new probe_mode column in the table you can specify whether a
destination should be permanently pinged or just one in probe mode.
  • Loading branch information
razvancrainea committed Aug 16, 2022
1 parent 30214dc commit 4eea26f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 35 deletions.
10 changes: 9 additions & 1 deletion db/schema/dispatcher.xml
Expand Up @@ -9,7 +9,7 @@

<table id="dispatcher" xmlns:db="http://docbook.org/ns/docbook">
<name>dispatcher</name>
<version>8</version>
<version>9</version>
<type db="mysql">&MYSQL_TABLE_TYPE;</type>
<description>
<db:para>This table is used by the dispatcher module. It contains the sets of destinations used for load balancing and dispatching. More information about the dispatcher module can be found at: &OPENSIPS_MOD_DOC;dispatcher.html
Expand Down Expand Up @@ -61,6 +61,14 @@
<natural/>
</column>

<column id="probe_mode">
<name>probe_mode</name>
<type>unsigned int</type>
<size>11</size>
<default>0</default>
<description>0-Probe only when in probing state; 1-Probe even in enable/active state;</description>
</column>

<column id="weight">
<name>weight</name>
<type>string</type>
Expand Down
71 changes: 37 additions & 34 deletions modules/dispatcher/dispatch.c
Expand Up @@ -53,16 +53,7 @@
#include "ds_bl.h"
#include "ds_clustering.h"

#define DS_TABLE_VERSION 8

/**
* in version 8, the "weight" column is given as a string, since it can contain
* both integer (the weight) or URL definitions (dynamically calculated weight)
*
* OpenSIPS retains backwards-compatibility with the former integer column flavor
*/
#define supported_ds_version(_ver) \
(DS_TABLE_VERSION == 8 ? (_ver == 8 || _ver == 7) : _ver == DS_TABLE_VERSION)
#define DS_TABLE_VERSION 9

extern ds_partition_t *partitions;

Expand Down Expand Up @@ -184,7 +175,7 @@ void ds_destroy_data(ds_partition_t *partition)


int add_dest2list(int id, str uri, struct socket_info *sock, str *comsock, int state,
int weight, int prio, str attrs, str description, ds_data_t *d_data)
int weight, int prio, int probe_mode, str attrs, str description, ds_data_t *d_data)
{
ds_dest_p dp = NULL;
ds_set_p sp = NULL;
Expand Down Expand Up @@ -309,6 +300,16 @@ int add_dest2list(int id, str uri, struct socket_info *sock, str *comsock, int s
LM_CRIT("BUG: unknown state %d for destination %.*s\n",
state, uri.len, uri.s);
}
switch (probe_mode) {
case 0:
break;
case 1:
dp->flags |= DS_PROBING_PERM_DST;
break;
default:
LM_CRIT("BUG: unknown probing_mode %d for destination %.*s\n",
probe_mode, uri.len, uri.s);
}

/* Do a DNS-Lookup for the Host-Name: */
proxy = mk_proxy( &puri.host, puri.port_no, puri.proto,
Expand Down Expand Up @@ -852,8 +853,6 @@ void ds_disconnect_db(ds_partition_t *partition)
/*initialize and verify DB stuff*/
int init_ds_db(ds_partition_t *partition)
{
int _ds_table_version;

if(partition->table_name.s == 0){
LM_ERR("invalid database name\n");
return -1;
Expand All @@ -870,18 +869,9 @@ int init_ds_db(ds_partition_t *partition)
return -1;
}

_ds_table_version = db_table_version(&partition->dbf,*partition->db_handle,
&partition->table_name);
if (_ds_table_version < 0) {
LM_ERR("failed to query table version\n");
return -1;
} else if (!supported_ds_version(_ds_table_version)) {
LM_ERR("invalid version for table '%.*s' (found %d, required %d)\n"
"(use opensips-cli to migrate to latest schema)\n",
partition->table_name.len, partition->table_name.s,
_ds_table_version, DS_TABLE_VERSION );
if (db_check_table_version(&partition->dbf,*partition->db_handle,
&partition->table_name, DS_TABLE_VERSION) != 0)
return -1;
}

return 0;
}
Expand All @@ -892,6 +882,7 @@ static void ds_inherit_state( ds_data_t *old_data , ds_data_t *new_data)
ds_set_p new_set, old_set;
ds_dest_p new_ds, old_ds;
int changed;
int probe_mode_flags;

/* search the new sets through the old sets */
for ( new_set=new_data->sets ; new_set ; new_set=new_set->next ) {
Expand All @@ -913,8 +904,10 @@ static void ds_inherit_state( ds_data_t *old_data , ds_data_t *new_data)
strncasecmp(new_ds->uri.s, old_ds->uri.s, old_ds->uri.len)==0 ) {
LM_DBG("DST <%.*s> found in old set, copying state\n",
new_ds->uri.len,new_ds->uri.s);
if (new_ds->flags != old_ds->flags) {
new_ds->flags = old_ds->flags;
if ((new_ds->flags&(~DS_PROBING_PERM_DST)) != (old_ds->flags&(~DS_PROBING_PERM_DST))) {
probe_mode_flags = new_ds->flags & DS_PROBING_PERM_DST;
new_ds->flags = old_ds->flags & (~DS_PROBING_PERM_DST);
new_ds->flags |= probe_mode_flags;
changed = 1;
}
break;
Expand Down Expand Up @@ -1013,10 +1006,11 @@ void ds_flusher_routine(unsigned int ticks, void* param)
static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col)
{
ds_data_t *d_data;
int i, id, nr_rows, cnt, nr_cols = 8;
int i, id, nr_rows, cnt, nr_cols = 9;
int state;
int weight;
int prio;
int probe_mode;
struct socket_info *sock;
str uri;
str attrs, weight_st;
Expand All @@ -1026,9 +1020,10 @@ static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col)
db_row_t * rows;
int discarded_dst=0;

db_key_t query_cols[8] = {&ds_set_id_col, &ds_dest_uri_col,
db_key_t query_cols[9] = {&ds_set_id_col, &ds_dest_uri_col,
&ds_dest_sock_col, &ds_dest_weight_col, &ds_dest_attrs_col,
&ds_dest_prio_col, &ds_dest_description_col, &ds_dest_state_col};
&ds_dest_prio_col, &ds_dest_description_col,
&ds_dest_probe_mode_col, &ds_dest_state_col};

if (!use_state_col)
nr_cols--;
Expand Down Expand Up @@ -1125,17 +1120,24 @@ static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col)
else
prio = VAL_INT(values+5);

/* priority */
if (VAL_NULL(values+7))
probe_mode = 0;
else
probe_mode = VAL_INT(values+7);

/* state */
if (!use_state_col || VAL_NULL(values+7))
if (!use_state_col || VAL_NULL(values+8))
/* active state */
state = 0;
else
state = VAL_INT(values+7);
state = VAL_INT(values+8);

get_str_from_dbval( "DESCRIPTION", values+6,
0/*not_null*/, 0/*not_empty*/, description, error2);

if (add_dest2list(id, uri, sock, &weight_st, state, weight, prio, attrs, description, d_data)
if (add_dest2list(id, uri, sock, &weight_st, state, weight, prio,
probe_mode, attrs, description, d_data)
!= 0) {
LM_WARN("failed to add destination <%.*s> in group %d\n",
uri.len,uri.s,id);
Expand Down Expand Up @@ -2639,7 +2641,7 @@ static void ds_options_callback( struct cell *t, int type,
/* if we always probe, and we get a timeout
* or a reponse that is not within the allowed
* reply codes, then disable*/
if(ds_probing_mode==1 && ps->code != 200 &&
if((ds_probing_mode==1 || cb_param->always_probe) && ps->code != 200 &&
(ps->code == 408 || !check_options_rplcode(ps->code)))
{
if (ds_set_state(cb_param->set_id, &uri, DS_PROBING_DST, 1,
Expand Down Expand Up @@ -2694,7 +2696,7 @@ void ds_check_timer(unsigned int ticks, void* param)
* the entry has "Probing" set, send a probe: */
if ( (!ds_probing_list || in_int_list(ds_probing_list, list->id)==0) &&
((list->dlist[j].flags&DS_INACTIVE_DST)==0) &&
(ds_probing_mode==1 || (list->dlist[j].flags&DS_PROBING_DST)!=0
(ds_probing_mode==1 || (list->dlist[j].flags&(DS_PROBING_DST|DS_PROBING_PERM_DST))!=0
))
{
/* stage 2 of checking, clustering level */
Expand Down Expand Up @@ -2744,6 +2746,7 @@ void ds_check_timer(unsigned int ticks, void* param)

cb_param->partition = partition;
cb_param->set_id = list->id;
cb_param->always_probe = (list->dlist[j].flags & DS_PROBING_PERM_DST);
if (tmb.t_request_within(&ds_ping_method,
NULL,
NULL,
Expand Down
4 changes: 4 additions & 0 deletions modules/dispatcher/dispatch.h
Expand Up @@ -43,6 +43,8 @@
#define DS_RESET_FAIL_DST 4 /* Reset-Failure-Counter */
#define DS_STATE_DIRTY_DST 8 /* STATE is dirty */

#define DS_PROBING_PERM_DST 16 /* permanently probing if not inactive */

#define DS_PV_ALGO_ID_MARKER "%i" /* Marker to indicate where the Set ID should be inserted in the pvar */
#define DS_PV_ALGO_URI_MARKER "%u" /* Marker to indicate where the URI should be inserted in the pvar */
#define DS_PV_ALGO_MARKER_LEN 2
Expand Down Expand Up @@ -161,6 +163,7 @@ typedef struct
{
ds_partition_t *partition;
int set_id;
int always_probe;
} ds_options_callback_param_t;

typedef struct _ds_selected_dst
Expand All @@ -178,6 +181,7 @@ extern str ds_dest_weight_col;
extern str ds_dest_prio_col;
extern str ds_dest_attrs_col;
extern str ds_dest_description_col;
extern str ds_dest_probe_mode_col;

extern pv_elem_t * hash_param_model;
extern str hash_pvar_param;
Expand Down
3 changes: 3 additions & 0 deletions modules/dispatcher/dispatcher.c
Expand Up @@ -55,6 +55,7 @@
#define DS_DEST_PRIO_COL "priority"
#define DS_DEST_ATTRS_COL "attrs"
#define DS_DEST_DESCRIPTION_COL "description"
#define DS_DEST_PROBE_MODE_COL "probe_mode"
#define DS_TABLE_NAME "dispatcher"
#define DS_PARTITION_DELIM ':'

Expand Down Expand Up @@ -142,6 +143,7 @@ str ds_dest_weight_col= str_init(DS_DEST_WEIGHT_COL);
str ds_dest_prio_col = str_init(DS_DEST_PRIO_COL);
str ds_dest_attrs_col = str_init(DS_DEST_ATTRS_COL);
str ds_dest_description_col = str_init(DS_DEST_DESCRIPTION_COL);
str ds_dest_probe_mode_col = str_init(DS_DEST_PROBE_MODE_COL);

str ds_setid_pvname = {NULL, 0};
pv_spec_t ds_setid_pv;
Expand Down Expand Up @@ -284,6 +286,7 @@ static param_export_t params[]={
{"priority_col", STR_PARAM, &ds_dest_prio_col.s},
{"attrs_col", STR_PARAM, &ds_dest_attrs_col.s},
{"description_col", STR_PARAM, &ds_dest_description_col.s},
{"probe_mode_col", STR_PARAM, &ds_dest_probe_mode_col.s},
{"dst_avp", STR_PARAM, &default_db_head.dst_avp.s},
{"grp_avp", STR_PARAM, &default_db_head.grp_avp.s},
{"cnt_avp", STR_PARAM, &default_db_head.cnt_avp.s},
Expand Down
21 changes: 21 additions & 0 deletions modules/dispatcher/doc/dispatcher_admin.xml
Expand Up @@ -1022,6 +1022,27 @@ modparam("dispatcher", "socket_col", "my_sock")
</example>
</section>

<section id="param_probe_mode_col" xreflabel="probe_mode_col">
<title><varname>probe_mode_col</varname> (string)</title>
<para>
The column's name in the database storing the probe_mode (as
string) for destination.
</para>
<para>
<emphasis>
Default value is <quote>probe_mode</quote>.
</emphasis>
</para>
<example>
<title>Set <quote>probe_mode_col</quote> parameter</title>
<programlisting format="linespecific">
...
modparam("dispatcher", "probe_mode_col", "probing")
...
</programlisting>
</example>
</section>

<section id="param_fetch_freeswitch_stats" xreflabel="fetch_freeswitch_stats">
<title><varname>fetch_freeswitch_stats</varname> (integer)</title>
<para>
Expand Down

0 comments on commit 4eea26f

Please sign in to comment.