From 4eea26f758885b983ab16b4c6094a8120a9155e1 Mon Sep 17 00:00:00 2001 From: Razvan Crainea Date: Tue, 16 Aug 2022 12:06:17 +0300 Subject: [PATCH] dispatcher: add support for pemanent pinging Using the new probe_mode column in the table you can specify whether a destination should be permanently pinged or just one in probe mode. --- db/schema/dispatcher.xml | 10 ++- modules/dispatcher/dispatch.c | 71 +++++++++++---------- modules/dispatcher/dispatch.h | 4 ++ modules/dispatcher/dispatcher.c | 3 + modules/dispatcher/doc/dispatcher_admin.xml | 21 ++++++ 5 files changed, 74 insertions(+), 35 deletions(-) diff --git a/db/schema/dispatcher.xml b/db/schema/dispatcher.xml index 4effe2a7136..3d0e7a457e3 100644 --- a/db/schema/dispatcher.xml +++ b/db/schema/dispatcher.xml @@ -9,7 +9,7 @@ dispatcher - 8 + 9&MYSQL_TABLE_TYPE; This table is used by the dispatcher module. It contains the sets of destinations used for load balancing and dispatching. More information about the dispatcher module can be found at: &OPENSIPS_MOD_DOC;dispatcher.html @@ -61,6 +61,14 @@ + + probe_mode + unsigned int + 11 + 0 + 0-Probe only when in probing state; 1-Probe even in enable/active state; + + weight string diff --git a/modules/dispatcher/dispatch.c b/modules/dispatcher/dispatch.c index ab1fdeeed7b..3a7c974c83b 100644 --- a/modules/dispatcher/dispatch.c +++ b/modules/dispatcher/dispatch.c @@ -53,16 +53,7 @@ #include "ds_bl.h" #include "ds_clustering.h" -#define DS_TABLE_VERSION 8 - -/** - * in version 8, the "weight" column is given as a string, since it can contain - * both integer (the weight) or URL definitions (dynamically calculated weight) - * - * OpenSIPS retains backwards-compatibility with the former integer column flavor - */ -#define supported_ds_version(_ver) \ - (DS_TABLE_VERSION == 8 ? (_ver == 8 || _ver == 7) : _ver == DS_TABLE_VERSION) +#define DS_TABLE_VERSION 9 extern ds_partition_t *partitions; @@ -184,7 +175,7 @@ void ds_destroy_data(ds_partition_t *partition) int add_dest2list(int id, str uri, struct socket_info *sock, str *comsock, int state, - int weight, int prio, str attrs, str description, ds_data_t *d_data) + int weight, int prio, int probe_mode, str attrs, str description, ds_data_t *d_data) { ds_dest_p dp = NULL; ds_set_p sp = NULL; @@ -309,6 +300,16 @@ int add_dest2list(int id, str uri, struct socket_info *sock, str *comsock, int s LM_CRIT("BUG: unknown state %d for destination %.*s\n", state, uri.len, uri.s); } + switch (probe_mode) { + case 0: + break; + case 1: + dp->flags |= DS_PROBING_PERM_DST; + break; + default: + LM_CRIT("BUG: unknown probing_mode %d for destination %.*s\n", + probe_mode, uri.len, uri.s); + } /* Do a DNS-Lookup for the Host-Name: */ proxy = mk_proxy( &puri.host, puri.port_no, puri.proto, @@ -852,8 +853,6 @@ void ds_disconnect_db(ds_partition_t *partition) /*initialize and verify DB stuff*/ int init_ds_db(ds_partition_t *partition) { - int _ds_table_version; - if(partition->table_name.s == 0){ LM_ERR("invalid database name\n"); return -1; @@ -870,18 +869,9 @@ int init_ds_db(ds_partition_t *partition) return -1; } - _ds_table_version = db_table_version(&partition->dbf,*partition->db_handle, - &partition->table_name); - if (_ds_table_version < 0) { - LM_ERR("failed to query table version\n"); - return -1; - } else if (!supported_ds_version(_ds_table_version)) { - LM_ERR("invalid version for table '%.*s' (found %d, required %d)\n" - "(use opensips-cli to migrate to latest schema)\n", - partition->table_name.len, partition->table_name.s, - _ds_table_version, DS_TABLE_VERSION ); + if (db_check_table_version(&partition->dbf,*partition->db_handle, + &partition->table_name, DS_TABLE_VERSION) != 0) return -1; - } return 0; } @@ -892,6 +882,7 @@ static void ds_inherit_state( ds_data_t *old_data , ds_data_t *new_data) ds_set_p new_set, old_set; ds_dest_p new_ds, old_ds; int changed; + int probe_mode_flags; /* search the new sets through the old sets */ for ( new_set=new_data->sets ; new_set ; new_set=new_set->next ) { @@ -913,8 +904,10 @@ static void ds_inherit_state( ds_data_t *old_data , ds_data_t *new_data) strncasecmp(new_ds->uri.s, old_ds->uri.s, old_ds->uri.len)==0 ) { LM_DBG("DST <%.*s> found in old set, copying state\n", new_ds->uri.len,new_ds->uri.s); - if (new_ds->flags != old_ds->flags) { - new_ds->flags = old_ds->flags; + if ((new_ds->flags&(~DS_PROBING_PERM_DST)) != (old_ds->flags&(~DS_PROBING_PERM_DST))) { + probe_mode_flags = new_ds->flags & DS_PROBING_PERM_DST; + new_ds->flags = old_ds->flags & (~DS_PROBING_PERM_DST); + new_ds->flags |= probe_mode_flags; changed = 1; } break; @@ -1013,10 +1006,11 @@ void ds_flusher_routine(unsigned int ticks, void* param) static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col) { ds_data_t *d_data; - int i, id, nr_rows, cnt, nr_cols = 8; + int i, id, nr_rows, cnt, nr_cols = 9; int state; int weight; int prio; + int probe_mode; struct socket_info *sock; str uri; str attrs, weight_st; @@ -1026,9 +1020,10 @@ static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col) db_row_t * rows; int discarded_dst=0; - db_key_t query_cols[8] = {&ds_set_id_col, &ds_dest_uri_col, + db_key_t query_cols[9] = {&ds_set_id_col, &ds_dest_uri_col, &ds_dest_sock_col, &ds_dest_weight_col, &ds_dest_attrs_col, - &ds_dest_prio_col, &ds_dest_description_col, &ds_dest_state_col}; + &ds_dest_prio_col, &ds_dest_description_col, + &ds_dest_probe_mode_col, &ds_dest_state_col}; if (!use_state_col) nr_cols--; @@ -1125,17 +1120,24 @@ static ds_data_t* ds_load_data(ds_partition_t *partition, int use_state_col) else prio = VAL_INT(values+5); + /* priority */ + if (VAL_NULL(values+7)) + probe_mode = 0; + else + probe_mode = VAL_INT(values+7); + /* state */ - if (!use_state_col || VAL_NULL(values+7)) + if (!use_state_col || VAL_NULL(values+8)) /* active state */ state = 0; else - state = VAL_INT(values+7); + state = VAL_INT(values+8); get_str_from_dbval( "DESCRIPTION", values+6, 0/*not_null*/, 0/*not_empty*/, description, error2); - if (add_dest2list(id, uri, sock, &weight_st, state, weight, prio, attrs, description, d_data) + if (add_dest2list(id, uri, sock, &weight_st, state, weight, prio, + probe_mode, attrs, description, d_data) != 0) { LM_WARN("failed to add destination <%.*s> in group %d\n", uri.len,uri.s,id); @@ -2639,7 +2641,7 @@ static void ds_options_callback( struct cell *t, int type, /* if we always probe, and we get a timeout * or a reponse that is not within the allowed * reply codes, then disable*/ - if(ds_probing_mode==1 && ps->code != 200 && + if((ds_probing_mode==1 || cb_param->always_probe) && ps->code != 200 && (ps->code == 408 || !check_options_rplcode(ps->code))) { if (ds_set_state(cb_param->set_id, &uri, DS_PROBING_DST, 1, @@ -2694,7 +2696,7 @@ void ds_check_timer(unsigned int ticks, void* param) * the entry has "Probing" set, send a probe: */ if ( (!ds_probing_list || in_int_list(ds_probing_list, list->id)==0) && ((list->dlist[j].flags&DS_INACTIVE_DST)==0) && - (ds_probing_mode==1 || (list->dlist[j].flags&DS_PROBING_DST)!=0 + (ds_probing_mode==1 || (list->dlist[j].flags&(DS_PROBING_DST|DS_PROBING_PERM_DST))!=0 )) { /* stage 2 of checking, clustering level */ @@ -2744,6 +2746,7 @@ void ds_check_timer(unsigned int ticks, void* param) cb_param->partition = partition; cb_param->set_id = list->id; + cb_param->always_probe = (list->dlist[j].flags & DS_PROBING_PERM_DST); if (tmb.t_request_within(&ds_ping_method, NULL, NULL, diff --git a/modules/dispatcher/dispatch.h b/modules/dispatcher/dispatch.h index 663f850dfa2..65beb5c6a30 100644 --- a/modules/dispatcher/dispatch.h +++ b/modules/dispatcher/dispatch.h @@ -43,6 +43,8 @@ #define DS_RESET_FAIL_DST 4 /* Reset-Failure-Counter */ #define DS_STATE_DIRTY_DST 8 /* STATE is dirty */ +#define DS_PROBING_PERM_DST 16 /* permanently probing if not inactive */ + #define DS_PV_ALGO_ID_MARKER "%i" /* Marker to indicate where the Set ID should be inserted in the pvar */ #define DS_PV_ALGO_URI_MARKER "%u" /* Marker to indicate where the URI should be inserted in the pvar */ #define DS_PV_ALGO_MARKER_LEN 2 @@ -161,6 +163,7 @@ typedef struct { ds_partition_t *partition; int set_id; + int always_probe; } ds_options_callback_param_t; typedef struct _ds_selected_dst @@ -178,6 +181,7 @@ extern str ds_dest_weight_col; extern str ds_dest_prio_col; extern str ds_dest_attrs_col; extern str ds_dest_description_col; +extern str ds_dest_probe_mode_col; extern pv_elem_t * hash_param_model; extern str hash_pvar_param; diff --git a/modules/dispatcher/dispatcher.c b/modules/dispatcher/dispatcher.c index 737df79e9e6..4eca1a0e61e 100644 --- a/modules/dispatcher/dispatcher.c +++ b/modules/dispatcher/dispatcher.c @@ -55,6 +55,7 @@ #define DS_DEST_PRIO_COL "priority" #define DS_DEST_ATTRS_COL "attrs" #define DS_DEST_DESCRIPTION_COL "description" +#define DS_DEST_PROBE_MODE_COL "probe_mode" #define DS_TABLE_NAME "dispatcher" #define DS_PARTITION_DELIM ':' @@ -142,6 +143,7 @@ str ds_dest_weight_col= str_init(DS_DEST_WEIGHT_COL); str ds_dest_prio_col = str_init(DS_DEST_PRIO_COL); str ds_dest_attrs_col = str_init(DS_DEST_ATTRS_COL); str ds_dest_description_col = str_init(DS_DEST_DESCRIPTION_COL); +str ds_dest_probe_mode_col = str_init(DS_DEST_PROBE_MODE_COL); str ds_setid_pvname = {NULL, 0}; pv_spec_t ds_setid_pv; @@ -284,6 +286,7 @@ static param_export_t params[]={ {"priority_col", STR_PARAM, &ds_dest_prio_col.s}, {"attrs_col", STR_PARAM, &ds_dest_attrs_col.s}, {"description_col", STR_PARAM, &ds_dest_description_col.s}, + {"probe_mode_col", STR_PARAM, &ds_dest_probe_mode_col.s}, {"dst_avp", STR_PARAM, &default_db_head.dst_avp.s}, {"grp_avp", STR_PARAM, &default_db_head.grp_avp.s}, {"cnt_avp", STR_PARAM, &default_db_head.cnt_avp.s}, diff --git a/modules/dispatcher/doc/dispatcher_admin.xml b/modules/dispatcher/doc/dispatcher_admin.xml index 67eafcd3dc7..ac56a69286c 100644 --- a/modules/dispatcher/doc/dispatcher_admin.xml +++ b/modules/dispatcher/doc/dispatcher_admin.xml @@ -1022,6 +1022,27 @@ modparam("dispatcher", "socket_col", "my_sock") +
+ <varname>probe_mode_col</varname> (string) + + The column's name in the database storing the probe_mode (as + string) for destination. + + + + Default value is probe_mode. + + + + Set <quote>probe_mode_col</quote> parameter + +... +modparam("dispatcher", "probe_mode_col", "probing") +... + + +
+
<varname>fetch_freeswitch_stats</varname> (integer)