diff --git a/modules/dispatcher/README b/modules/dispatcher/README index 5ef04bfe1b5..e38ba9bba3e 100644 --- a/modules/dispatcher/README +++ b/modules/dispatcher/README @@ -69,8 +69,8 @@ Carsten Bock 1.4. Exported Functions - 1.4.1. ds_select_dst(set, alg [, "[flags] [M - max_results]"]) + 1.4.1. ds_select_dst(set, alg [, (flags M + max_results)*]) 1.4.2. ds_select_domain(set, alg [, "[flags] [M max_results]"]) @@ -575,7 +575,7 @@ modparam("dispatcher", "socket_col", "my_sock") 1.4. Exported Functions -1.4.1. ds_select_dst(set, alg [, "[flags] [M max_results]"]) +1.4.1. ds_select_dst(set, alg [, (flags M max_results)*]) The method selects a destination from the given set of addresses. It will overwrite the "destination URI" of a SIP @@ -606,22 +606,25 @@ modparam("dispatcher", "socket_col", "my_sock") chosen. + “X” - if the algorithm is not implemented, the first entry in set is chosen. - * flags - If specified, this will be the flags which in - previous versions were specified at startup. The flags are - the failover support flag('f'/'F') letters, the user only - flag('u'/'U') and will specify that only the uri user part - will be used for hashing, the force destination + * flags M max_results - If specified, this will be the flags + which in previous versions were specified at startup. The + flags are the failover support flag('f'/'F') letters, the + user only flag('u'/'U') and will specify that only the uri + user part will be used for hashing, the force destination flag('S'/'s') which will Skip overwriting the destination address if it is already set and the use default flag('D', 'd') which will use the last address in destination set as last option to send the message.You can also specify these flags using PVs. The flags are being kept per partition. - * max_results - If specified, only that many results will be - put into the specified avp for failover. This allows having - many destinations but limit the useless traffic in case of - a number that is bound to fail everywhere. It can accept - variables. The definition must begin with 'M' character - either you use static definition or PVs. + The second paramater, max_results represents that only a + maximum of max_results will be put into the specified avp + for failover. This allows having many destinations but + limit the useless traffic in case of a number that is bound + to fail everywhere. Since version 1.12, the last paramater + cand be represented by a list of flags and max_results, + separated by comma. If the 'M' character is not specified, + there will be no search for the flags, but you cand specify + the max_results paramater (flags will be 0). If the character 'f' in 'flags' is set, the rest of the addresses from the destination set is stored in AVP list. You @@ -639,9 +642,9 @@ ds_select_dst("1", "0"); ... ds_select_dst("part2 : 1", "0", "M 5"); ... -ds_select_dst("part3 : 1", "0", "fUsD"); +ds_select_dst("part3 : 1", "0", "fUsD M"); ... -ds_select_dst("part4 : 2", "0", "fuD M 5,2"); +ds_select_dst("part4 : 2,3", "0,1", "fuD M 5, fuS M 2"); ... # dispatch over multiple dispatching groups $var(partition_name) = "p4" diff --git a/modules/dispatcher/dispatch.c b/modules/dispatcher/dispatch.c index 4cfb7d13464..e6003b2369c 100644 --- a/modules/dispatcher/dispatch.c +++ b/modules/dispatcher/dispatch.c @@ -1229,8 +1229,7 @@ static inline int ds_get_index(int group, ds_set_p *index, ds_partition_t *parti } -static inline int ds_update_dst(struct sip_msg *msg, str *uri, - struct socket_info *sock, int mode) +int ds_update_dst(struct sip_msg *msg, str *uri, struct socket_info *sock, int mode) { struct action act; uri_type utype; @@ -1288,10 +1287,10 @@ static int count_inactive_destinations(ds_set_p idx, int ds_use_default) { static inline int push_ds_2_avps( ds_dest_t *ds, ds_partition_t *partition ) { - char buf[2+16+1]; /* a hexa string */ + char buf[PTR_STRING_SIZE]; /* a hexa string */ int_str avp_val; - avp_val.s.len = 1 + sprintf( buf, "%p", ds->sock ); + avp_val.s.len = 1 + snprintf( buf, PTR_STR_SIZE, "%p", ds->sock ); avp_val.s.s = buf; if(add_avp(AVP_VAL_STR| partition->sock_avp_type, partition->sock_avp_name, avp_val)!=0) { @@ -1321,7 +1320,7 @@ static inline int push_ds_2_avps( ds_dest_t *ds, ds_partition_t *partition ) /** * */ -int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, int ds_flags) +int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, ds_selected_dst_p selected_dst, int ds_flags) { int i, cnt, i_unwrapped; unsigned int ds_hash; @@ -1464,6 +1463,7 @@ int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, int ds_fla break; } } + LM_DBG("alg hash [%u], id [%u]\n", ds_hash, ds_id); cnt = 0; @@ -1502,6 +1502,23 @@ int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, int ds_fla LM_ERR("cannot set dst addr\n"); goto error; } + + /* Save the selected destination for multilist failover */ + if (selected_dst->uri.s != NULL) { + pkg_free(selected_dst->uri.s); + memset(&selected_dst->uri, 0, sizeof(str)); + } + if (pkg_str_dup(&selected_dst->uri, &selected->dst_uri) != 0) { + LM_ERR("cannot set selected_dst uri\n"); + goto error; + } + if (selected->sock) { + selected_dst->socket.len = 1 + snprintf( selected_dst->socket.s, PTR_STR_SIZE, "%p", selected->sock ); + } + else { + selected_dst->socket.len = 0; + } + /* if alg is round-robin then update the shortcut to next to be used */ if(ds_select_ctl->alg==4) idx->last = (ds_id+1) % idx->nr; @@ -1636,12 +1653,12 @@ int ds_next_dst(struct sip_msg *msg, int mode, ds_partition_t *partition) destroy_avp(tmp_avp); } + LM_DBG("using [%.*s]\n", avp_value.s.len, avp_value.s.s); if(ds_update_dst(msg, &avp_value.s, sock, mode)!=0) { LM_ERR("cannot set dst addr\n"); return -1; } - LM_DBG("using [%.*s]\n", avp_value.s.len, avp_value.s.s); return 1; } diff --git a/modules/dispatcher/dispatch.h b/modules/dispatcher/dispatch.h index 10d675c7765..890033aed6e 100644 --- a/modules/dispatcher/dispatch.h +++ b/modules/dispatcher/dispatch.h @@ -50,7 +50,7 @@ #define DS_HASH_USER_ONLY 1 /* use only the uri user part for hashing */ #define DS_FAILOVER_ON 2 /* store the other dest in avps */ #define DS_USE_DEFAULT 4 /* use last address in destination set as last option */ -#define DS_FORCE_DST 8 /* if not set it will force overwriting the destination address +#define DS_FORCE_DST 8 /* if not set it will force overwriting the destination address if already set */ #define DS_INACTIVE_DST 1 /* inactive destination */ @@ -150,6 +150,7 @@ typedef struct _ds_select_ctl int max_results; /* max destinaitons to process */ int reset_AVP; /* reset AVPs flag */ int set_destination; /* set destination flag */ + int ds_flags; } ds_select_ctl_t, *ds_select_ctl_p; typedef struct @@ -158,6 +159,12 @@ typedef struct int set_id; } ds_options_callback_param_t; +typedef struct _ds_selected_dst +{ + str uri; + str socket; +} ds_selected_dst, *ds_selected_dst_p; + extern str ds_set_id_col; extern str ds_dest_uri_col; extern str ds_dest_sock_col; @@ -188,7 +195,8 @@ int ds_reload_db(ds_partition_t *partition); int init_ds_data(ds_partition_t *partition); void ds_destroy_data(ds_partition_t *partition); -int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, int ds_flags); +int ds_update_dst(struct sip_msg *msg, str *uri, struct socket_info *sock, int mode); +int ds_select_dst(struct sip_msg *msg, ds_select_ctl_p ds_select_ctl, ds_selected_dst_p selected_dst, int ds_flags); int ds_next_dst(struct sip_msg *msg, int mode, ds_partition_t *partition); int ds_set_state(int group, str *address, int state, int type, ds_partition_t *partition); diff --git a/modules/dispatcher/dispatcher.c b/modules/dispatcher/dispatcher.c index d79c7d25c33..3c3d0cb0cec 100644 --- a/modules/dispatcher/dispatcher.c +++ b/modules/dispatcher/dispatcher.c @@ -811,14 +811,16 @@ static int mod_init(void) return -1; } /* Register the PING-Timer */ - if (register_timer("ds-pinger",ds_check_timer,NULL,ds_ping_interval)<0){ + if (register_timer("ds-pinger", ds_check_timer, NULL, + ds_ping_interval, TIMER_FLAG_DELAY_ON_DELAY)<0) { LM_ERR("failed to register timer for probing!\n"); return -1; } } /* register timer to flush the state of destination back to DB */ - if (register_timer("ds-flusher",ds_flusher_routine,NULL, 30)<0){ + if (register_timer("ds-flusher",ds_flusher_routine,NULL, 30 , + TIMER_FLAG_SKIP_ON_DELAY)<0) { LM_ERR("failed to register timer for DB flushing!\n"); return -1; } @@ -837,9 +839,9 @@ static int mod_init(void) #include "../../pt.h" static int ds_child_init(int rank) { - /* we need DB connection from the timer procs (for the flushing) + /* we need DB connection from the worker procs (for the flushing) * and from the main proc (for final flush on shutdown) */ - if ( (process_no==0 || rank==PROC_TIMER) ) { + if ( rank>=PROC_MAIN ) { ds_partition_t *partition_it; @@ -897,45 +899,6 @@ static void destroy(void) destroy_ds_bls(); } -static int get_flags_int_value(struct sip_msg* msg, pv_spec_t* pvs){ - - /*Get flags literal value*/ - pv_value_t value; - int ds_flags = 0; - if (pv_get_spec_value(msg, pvs, &value)) { - LM_ERR("no valid PV value found(error in scripts)\n"); - return -1; - } - - /*Parse string and get flags integer value*/ - - for ( ; value.rs.len > 0; value.rs.s++, value.rs.len--) { - switch (*value.rs.s) { - case ' ' : - break; - case 'f' : - case 'F' : - ds_flags |= DS_FAILOVER_ON; - break; - case 'u' : - case 'U' : - ds_flags |= DS_HASH_USER_ONLY; - break; - case 'd' : - case 'D' : - ds_flags |= DS_USE_DEFAULT; - break; - case 's' : - case 'S' : - ds_flags |= DS_FORCE_DST; - break; - default : - LM_ERR("Invalid flags PV value\n"); - return -1; - } - } - return ds_flags; -} #define CHECK_AND_EXPAND_LIST(_list_) \ do{\ if (_list_->type == GPARAM_TYPE_PVS) { \ @@ -963,10 +926,14 @@ static int get_flags_int_value(struct sip_msg* msg, pv_spec_t* pvs){ */ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max_results_flags, int mode) { - int ret; + int ret = -1; + int _ret; int run_prev_ds_select = 0; - int ds_flags = 0; ds_select_ctl_t prev_ds_select_ctl, ds_select_ctl; + char selected_dst_sock_buf[PTR_STRING_SIZE]; /* a hexa string */ + ds_selected_dst selected_dst; + struct socket_info *sock = NULL; + if(msg==NULL) return -1; @@ -975,6 +942,9 @@ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max ds_select_ctl.reset_AVP = 1; ds_select_ctl.set_destination = 1; + memset(&selected_dst, 0, sizeof(ds_selected_dst)); + selected_dst.socket.s = selected_dst_sock_buf; + /* Retrieve dispatcher set */ ds_param_t *part_set_param = (ds_param_t*)part_set; @@ -991,32 +961,22 @@ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max int_list_t *alg_list = (int_list_t *)alg; int_list_t *alg_list_exp_start = NULL, *alg_list_exp_end = NULL; - /* Retrieve dispatcher max results */ - - /*Pointer to the start of the list*/ - flags_int_list_t* lst_flgs_param = (flags_int_list_t *)max_results_flags; - - int_list_t *max_results_ptr = NULL; - /*In case this parameter is not specified*/ - if (max_results_flags) - max_results_ptr = lst_flgs_param->list; - - int_list_t *max_list = max_results_ptr; - int_list_t *max_list_exp_start = NULL, *max_list_exp_end = NULL; + /* In case this parameter is not specified */ + max_list_param_p max_param = (max_list_param_p)max_results_flags; + str max_list_str; - /* Retrieve dispatcher flags */ - - if (max_results_flags) { - ds_flags_t* flags= lst_flgs_param->flags; - if (flags->type == DS_FLAGS_TYPE_INT) { - ds_flags = flags->v.ival; - } else { - ds_flags = get_flags_int_value(msg, flags->v.pvs); - if (ds_flags < 0) { - LM_ERR("Invalid value in flags PV\n"); - return -1; - } + int_list_t *max_list=NULL, *max_list_free; + if (max_param->type == MAX_LIST_TYPE_STR) { + max_list = (int_list_t*)max_param->lst.list; + } else if (max_param->type == MAX_LIST_TYPE_PV) { + if (pv_printf_s(msg, max_param->lst.elem, &max_list_str) != 0) { + LM_ERR("cannot get max list from pv\n"); + return -1; } + + if (set_list_from_string(max_list_str, &max_list) != 0 + || max_list == NULL) + return -1; } /* Avoid compiler warning */ @@ -1034,17 +994,18 @@ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max CHECK_AND_EXPAND_LIST(alg_list); ds_select_ctl.alg = alg_list->v.ival; - if (max_results_ptr) { - CHECK_AND_EXPAND_LIST(max_list); + + if (max_results_flags) { ds_select_ctl.max_results = max_list->v.ival; + ds_select_ctl.ds_flags = max_list->flags; } if (run_prev_ds_select) { LM_DBG("ds_select: %d %d %d %d %d\n", prev_ds_select_ctl.set, prev_ds_select_ctl.alg, prev_ds_select_ctl.max_results, prev_ds_select_ctl.reset_AVP, prev_ds_select_ctl.set_destination); - ret = ds_select_dst(msg, &prev_ds_select_ctl, ds_flags); - if (ret<0) return ret; + _ret = ds_select_dst(msg, &prev_ds_select_ctl, &selected_dst, prev_ds_select_ctl.ds_flags); + if (_ret>=0) ret = _ret; /* stop resetting AVPs. */ ds_select_ctl.reset_AVP = 0; } else { @@ -1055,29 +1016,35 @@ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max set_list = set_list->next; alg_list = alg_list->next; - if (max_results_ptr) { + if (max_results_flags) { + max_list_free = max_list; max_list = max_list->next; + + if (max_param->type == MAX_LIST_TYPE_PV) + shm_free(max_list_free); } TRY_FREE_EXPANDED_LIST(set_list); TRY_FREE_EXPANDED_LIST(alg_list); - TRY_FREE_EXPANDED_LIST(max_list); } while (set_list && alg_list && - (max_results_ptr ? max_list : set_list)); + (max_results_flags ? max_list : set_list)); - if (max_results_ptr && max_list != NULL) { + if (max_results_flags && max_list != NULL) { LM_ERR("extra max slot(s)\n"); + ret = -2; goto error; } if (set_list != NULL) { LM_ERR("extra set(s)\n"); + ret = -2; goto error; } if (alg_list != NULL) { LM_ERR("extra algorithm(s)\n"); + ret = -2; goto error; } @@ -1086,10 +1053,34 @@ static int w_ds_select(struct sip_msg* msg, char* part_set, char* alg, char* max LM_DBG("ds_select: %d %d %d %d %d\n", ds_select_ctl.set, ds_select_ctl.alg, ds_select_ctl.max_results, ds_select_ctl.reset_AVP, ds_select_ctl.set_destination); - return ds_select_dst(msg, &ds_select_ctl, ds_flags); + _ret = ds_select_dst(msg, &ds_select_ctl, &selected_dst, ds_select_ctl.ds_flags); + if (_ret>=0) { + ret = _ret; + } + else { + if (selected_dst.uri.s != NULL) { + if (selected_dst.socket.len != 0) { + if (sscanf( selected_dst.socket.s, "%p", (void**)&sock ) != 1) { + LM_ERR("unable to read forced destination socket\n"); + ret = -4; + goto error; + } + } + if (ds_update_dst(msg, &selected_dst.uri, sock, ds_select_ctl.mode) != 0) { + LM_ERR("cannot set dst addr\n"); + ret = -3; + goto error; + } + } + else { + ret = -1; + goto error; + } + } error: - return -1; + if (selected_dst.uri.s != NULL) pkg_free(selected_dst.uri.s); + return ret; } /** diff --git a/modules/dispatcher/doc/dispatcher_admin.xml b/modules/dispatcher/doc/dispatcher_admin.xml index 5ef0822128d..cd1537fdb66 100644 --- a/modules/dispatcher/doc/dispatcher_admin.xml +++ b/modules/dispatcher/doc/dispatcher_admin.xml @@ -691,7 +691,7 @@ modparam("dispatcher", "socket_col", "my_sock")