From 3df4c3f51cf05f545f7d9e11021d1074f4b77cf3 Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 17 Aug 2021 16:28:50 -0500 Subject: [PATCH 1/9] rtpengine: only maintain 1 node per unique URL --- modules/rtpengine/rtpengine.c | 37 ++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 0e85184b374..9d6e0552f78 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -316,6 +316,7 @@ static int fixup_free_set_id(void ** param); static int set_rtpengine_set_f(struct sip_msg * msg, rtpe_set_link_t *set_param); static struct rtpe_set * select_rtpe_set(int id_set); static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *); +static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url); static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -872,7 +873,6 @@ static int add_rtpengine_socks(struct rtpe_set * rtpe_list, return -1; } memset(pnode, 0, sizeof(*pnode)); - pnode->idx = (*rtpe_no)++; pnode->rn_recheck_ticks = 0; pnode->rn_weight = weight; pnode->rn_umode = 0; @@ -888,6 +888,17 @@ static int add_rtpengine_socks(struct rtpe_set * rtpe_list, pnode->rn_url.len = p2-p1; LM_DBG("url is %s, len is %i\n", pnode->rn_url.s, pnode->rn_url.len); + + if (lookup_rtpe_node(rtpe_list, &pnode->rn_url) != NULL) { + LM_DBG("node with url %s already exists in set %d, not adding new node\n", pnode->rn_url.s, rtpe_list->id_set); + shm_free(pnode->rn_url.s); + shm_free(pnode); + return 0; + } + + /* incr index once we've determined this is a new node */ + pnode->idx = (*rtpe_no)++; + /* Leave only address in rn_address */ pnode->rn_address = pnode->rn_url.s; if (strncasecmp(pnode->rn_address, "udp:", 4) == 0) { @@ -1618,6 +1629,30 @@ static int update_rtpengines(void) return connect_rtpengines(); } +/* Returns the first matching node in set */ +static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url) +{ + struct rtpe_node * crt_rtpe; + + if (rtpe_list == NULL) + return NULL; + + if (rtpe_url->len==0 || !rtpe_url->s) + return NULL; + + for(crt_rtpe = rtpe_list->rn_first; crt_rtpe != NULL; + crt_rtpe = crt_rtpe->rn_next){ + if(crt_rtpe->rn_url.len == rtpe_url->len){ + if(strncmp(crt_rtpe->rn_url.s, rtpe_url->s, rtpe_url->len) == 0){ + return crt_rtpe; + } + } + } + + /* No match */ + return NULL; +} + static void free_rtpe_nodes(struct rtpe_set *list) { struct rtpe_node * crt_rtpp, *last_rtpp; From 952f38fadd2d4a0b12c33a0f5ac85c39e5550116 Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 14:40:46 -0500 Subject: [PATCH 2/9] rtpengine: remove unused variable --- modules/rtpengine/rtpengine.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 9d6e0552f78..4384b7df596 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -361,7 +361,6 @@ static pv_spec_t err_pv; static char ** rtpe_strings=0; static int rtpe_sets=0; /*used in rtpengine_set_store()*/ -static int rtpe_set_count = 0; static int rtpe_ctx_idx = -1; struct rtpe_set_head **rtpe_set_list =0; struct rtpe_set **default_rtpe_set=0; @@ -1025,7 +1024,6 @@ static int rtpengine_add_rtpengine_set( char * rtp_proxies, int set_id) } (*rtpe_set_list)->rset_last = rtpe_list; - rtpe_set_count++; } return 0; From 11291e68e04cbe9149d8faf78437318784cf5c41 Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 14:57:41 -0500 Subject: [PATCH 3/9] rtpengine: add helper functions for freeing a specific rtpengine set and node This commit is in preparation of socket management refactoring --- modules/rtpengine/rtpengine.c | 66 +++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 4384b7df596..7debaad97e2 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -317,6 +317,8 @@ static int set_rtpengine_set_f(struct sip_msg * msg, rtpe_set_link_t *set_param) static struct rtpe_set * select_rtpe_set(int id_set); static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *); static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url); +static void free_rtpe_set(int); +static void free_rtpe_node(struct rtpe_set *, str *); static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -1651,6 +1653,39 @@ static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe return NULL; } +/* finds the node based upon URL and frees it from set */ +static void free_rtpe_node(struct rtpe_set *list, str *rtpe_url) +{ + struct rtpe_node *prev_rtpp=NULL, *crt_rtpp; + + for(crt_rtpp=list->rn_first; crt_rtpp!=NULL && str_strcmp(&crt_rtpp->rn_url, rtpe_url); + crt_rtpp=crt_rtpp->rn_next) + prev_rtpp = crt_rtpp; + + if (!crt_rtpp) { + LM_DBG("no matching node %s\n", rtpe_url->s); + return; + } + + /* first node matched */ + if (!prev_rtpp) { + list->rn_first = crt_rtpp->rn_next; + goto free; + } + + /* last node matched */ + if (crt_rtpp->rn_next == NULL) + list->rn_last = prev_rtpp; + + prev_rtpp->rn_next = crt_rtpp->rn_next; + +free: + list->rtpe_node_count--; + if (crt_rtpp->rn_url.s) + shm_free(crt_rtpp->rn_url.s); + shm_free(crt_rtpp); +} + static void free_rtpe_nodes(struct rtpe_set *list) { struct rtpe_node * crt_rtpp, *last_rtpp; @@ -1668,6 +1703,37 @@ static void free_rtpe_nodes(struct rtpe_set *list) list->rtpe_node_count = 0; } +/* finds the set based upon ID and frees it from list */ +static void free_rtpe_set(int id_set) +{ + struct rtpe_set *prev_list=NULL, *crt_list; + + for(crt_list=(*rtpe_set_list)->rset_first; crt_list!=NULL && crt_list->id_set!=id_set; + crt_list=crt_list->rset_next) + prev_list = crt_list; + + if (!crt_list) { + LM_DBG("no matching set %d\n", id_set); + return; + } + + /* first set matched */ + if (!prev_list) { + (*rtpe_set_list)->rset_first = crt_list->rset_next; + goto free; + } + + /* last set matched */ + if (crt_list->rset_next == NULL) + (*rtpe_set_list)->rset_last = prev_list; + + prev_list->rset_next = crt_list->rset_next; + +free: + free_rtpe_nodes(crt_list); + shm_free(crt_list); +} + static void free_rtpe_sets(void) { struct rtpe_set * crt_list, * last_list; From 1fe179e3c386b0cdc7f45827f8ec709906d37fcb Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 15:09:04 -0500 Subject: [PATCH 4/9] rtpengine: add helper function for disconnecting rtpengine socket(s) This commit is in preparation of socket management refactoring --- modules/rtpengine/rtpengine.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 7debaad97e2..7b689a5e96f 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -319,6 +319,7 @@ static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *); static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url); static void free_rtpe_set(int); static void free_rtpe_node(struct rtpe_set *, str *); +static void disconnect_rtpe_socket(int); static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -1621,9 +1622,7 @@ static int update_rtpengines(void) LM_DBG("updating list from %d to %d [%d]\n", my_version, *list_version, rtpe_number); my_version = *list_version; for (i = 0; i < rtpe_number; i++) { - shutdown(rtpe_socks[i], SHUT_RDWR); - close(rtpe_socks[i]); - rtpe_socks[i] = -1; + disconnect_rtpe_socket(i); } return connect_rtpengines(); @@ -1749,6 +1748,15 @@ static void free_rtpe_sets(void) (*rtpe_set_list)->rset_last = NULL; } +static void disconnect_rtpe_socket(int idx) +{ + LM_DBG("disconnect socket idx=%d\n", idx); + + shutdown(rtpe_socks[idx], SHUT_RDWR); + close(rtpe_socks[idx]); + rtpe_socks[idx] = -1; +} + static void mod_destroy(void) { if (default_rtpe_set) From 06393852f724d3a95d2a578f99c7c8e61eb85f72 Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 15:24:04 -0500 Subject: [PATCH 5/9] rtpengine: remove rtpengine probing from SIP procs Probing is now done exclusively by timer workers and by `rtpengine_reload` mi command. --- modules/rtpengine/rtpengine.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 7b689a5e96f..2c63a3aedcc 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -323,7 +323,7 @@ static void disconnect_rtpe_socket(int); static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); -static int update_rtpengines(void); +static int update_rtpengines(int); static int _add_rtpengine_from_database(void); static int rtpengine_set_store(modparam_t type, void * val); static int rtpengine_add_rtpengine_set( char * rtp_proxies, int set_id); @@ -1208,7 +1208,7 @@ static mi_response_t *mi_reload_rtpengines(const mi_params_t *params, if(_add_rtpengine_from_database() < 0) goto error; - if (update_rtpengines()) + if (update_rtpengines(1)) goto error; /* update pointer to default_rtpp_set*/ @@ -1568,7 +1568,7 @@ static inline int rtpengine_connect_node(struct rtpe_node *pnode) return 1; } -static int connect_rtpengines(void) +static int connect_rtpengines(int force_test) { struct rtpe_set *rtpe_list; struct rtpe_node *pnode; @@ -1592,7 +1592,7 @@ static int connect_rtpengines(void) rtpe_list = rtpe_list->rset_next){ for (pnode=rtpe_list->rn_first; pnode!=0; pnode = pnode->rn_next){ - if (rtpengine_connect_node(pnode)) + if (rtpengine_connect_node(pnode) && force_test) pnode->rn_disabled = rtpe_test(pnode, 0, 1); /* else, there is an error, and we try to connect the next one */ } @@ -1612,10 +1612,10 @@ child_init(int rank) return 0; /* Iterate known RTP proxies - create sockets */ - return connect_rtpengines(); + return connect_rtpengines(1); } -static int update_rtpengines(void) +static int update_rtpengines(int force_test) { int i; @@ -1625,7 +1625,7 @@ static int update_rtpengines(void) disconnect_rtpe_socket(i); } - return connect_rtpengines(); + return connect_rtpengines(force_test); } /* Returns the first matching node in set */ @@ -2759,7 +2759,7 @@ select_rtpe_node(str callid, struct rtpe_set *set) int was_forced, sumcut, found, constant_weight_sum; /* check last list version */ - if (my_version != *list_version && update_rtpengines() < 0) { + if (my_version != *list_version && update_rtpengines(0) < 0) { LM_ERR("cannot update rtpengines list\n"); return 0; } From 1eac7851871a64d938209cc0d1a2a843df88818c Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 15:38:24 -0500 Subject: [PATCH 6/9] rtpengine: ensure new elems are initialized in sockets array --- modules/rtpengine/rtpengine.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 2c63a3aedcc..a15c617265d 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -1570,6 +1570,7 @@ static inline int rtpengine_connect_node(struct rtpe_node *pnode) static int connect_rtpengines(int force_test) { + int i; struct rtpe_set *rtpe_list; struct rtpe_node *pnode; @@ -1585,6 +1586,8 @@ static int connect_rtpengines(int force_test) LM_ERR("no more pkg memory\n"); return -1; } + for (i=rtpe_number; i<*rtpe_no; i++) + rtpe_socks[i] = -1; /* init new elems */ } rtpe_number = *rtpe_no; From 9001bafcd2f895222e08399e45086efa7709d456 Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 16:17:05 -0500 Subject: [PATCH 7/9] rtpengine: enhancement of set/node management This commit has several enhancements: 1. Introduce reload types via MI command `rtpengine_reload type=soft|hard`. - Hard reload: all nodes are disconnected and all sockets are rebuilt per process. This is the default behavior, and is the behavior that existed before this commit. - Soft reload: nodes stored in the database are compared (based on URL) to nodes stored in memory. If the node exists in memory, then the existing node is reused (including all sockets and disabled state). If the node does not exist in memory, then it is added to the set and sockets are built per process. Remaining nodes are removed from the set and sockets are destroyed, as they have no associated database record. 2. Introduce version history for shared set list. This is needed after allowing soft reloads to prevent race conditions, as sockets are maintained per process while the set list is maintained in shared memory. 3. During a `rtpengine_reload`, the write lock is released before probing nodes. Depending on the set size and node status, probing can be a timely task. By releasing the write lock, we free up the SIP procs to access the shared set list. --- modules/rtpengine/rtpengine.c | 267 +++++++++++++++++++++++++++++++--- modules/rtpengine/rtpengine.h | 21 +++ 2 files changed, 266 insertions(+), 22 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index a15c617265d..38a772bf78c 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -320,6 +320,8 @@ static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe static void free_rtpe_set(int); static void free_rtpe_node(struct rtpe_set *, str *); static void disconnect_rtpe_socket(int); +static void cleanup_rtpengine_versions(void); +static int add_rtpengine_version(int); static char *send_rtpe_command(struct rtpe_node *, bencode_item_t *, int *); static int get_extra_id(struct sip_msg* msg, str *id_str); @@ -356,6 +358,7 @@ static int rtpengine_tout = 1; static int rtpengine_timer_interval = 5; static pid_t mypid; static int myrand = 0; +static int myrank = 0; static unsigned int myseqn = 0; static str extra_id_pv_param = {NULL, 0}; static char *setid_avp_param = NULL; @@ -382,8 +385,14 @@ static db_con_t *db_connection = NULL; static db_func_t db_functions; static rw_lock_t *rtpe_lock=NULL; static unsigned int *rtpe_no = 0; -static unsigned int *list_version; -static unsigned int my_version = 0; + +/* version management */ +struct rtpe_version_head **rtpe_versions = 0; /* chronological list of versions */ +static unsigned int *list_version; /* master version */ +static unsigned int my_version = 0; /* per process version */ +static unsigned int *child_versions; /* array of per process versions */ +static unsigned int *child_versions_no; /* number of elems */ + static unsigned int rtpe_number = 0; static int setid_avp_type; @@ -686,6 +695,7 @@ static mi_export_t mi_cmds[] = { }, { MI_RELOAD_RTP_ENGINES, 0, 0, mi_child_init, { {mi_reload_rtpengines, {0}}, + {mi_reload_rtpengines, {"type", 0}}, {EMPTY_MI_RECIPE}} }, { "teardown", 0, 0, 0, { @@ -840,12 +850,85 @@ static int rtpengine_set_store(modparam_t type, void * val){ return 0; } +/* + frees any versions from list that have been applied by + all processes +*/ +static void cleanup_rtpengine_versions(void) +{ + int i, min_version=0; + struct rtpe_version *crt_version, *prev_version; + + if (*rtpe_versions == NULL || (*rtpe_versions)->version_count == 0) + return; // nothing to do + + for (i=0; i<*child_versions_no; i++) { + if (!min_version || child_versions[i] < min_version) + min_version = child_versions[i]; + } + + for (crt_version = (*rtpe_versions)->version_first; crt_version != NULL;) { + if (crt_version->version >= min_version) + break; + prev_version = crt_version; + crt_version = prev_version->version_next; + + /* first version remove */ + if ((*rtpe_versions)->version_first == prev_version) + (*rtpe_versions)->version_first = crt_version; + + /* last version remove */ + if ((*rtpe_versions)->version_last == prev_version) + (*rtpe_versions)->version_last = NULL; + + shm_free(prev_version); + (*rtpe_versions)->version_count--; + } + + return; +} + +/* increments version and adds version to end of list */ +static int add_rtpengine_version(int flags) +{ + struct rtpe_version *pversion; + + /* init list of versions */ + if (*rtpe_versions == NULL) { + *rtpe_versions = shm_malloc(sizeof(struct rtpe_version_head)); + if (*rtpe_versions == NULL) { + LM_ERR("no shm memory left\n"); + return -1; + } + memset(*rtpe_versions, 0, sizeof(struct rtpe_version_head)); + } + + pversion = shm_malloc(sizeof(struct rtpe_version)); + if (pversion == NULL) { + LM_ERR("no shm memory left\n"); + return -1; + } + memset(pversion, 0, sizeof(*pversion)); + pversion->version = ++(*list_version); + pversion->version_flags = flags; + + if ((*rtpe_versions)->version_first == NULL) { + (*rtpe_versions)->version_first = pversion; + } else { + (*rtpe_versions)->version_last->version_next = pversion; + } + (*rtpe_versions)->version_last = pversion; + (*rtpe_versions)->version_count++; + + return 0; +} + static int add_rtpengine_socks(struct rtpe_set * rtpe_list, char * rtpengine){ /* Make rtp proxies list. */ char *p, *p1, *p2, *plim; - struct rtpe_node *pnode; + struct rtpe_node *pnode, *tmpnode; int weight; p = rtpengine; @@ -879,6 +962,7 @@ static int add_rtpengine_socks(struct rtpe_set * rtpe_list, pnode->rn_weight = weight; pnode->rn_umode = 0; pnode->rn_disabled = 0; + pnode->rn_flags = 0; pnode->rn_url.s = shm_malloc(p2 - p1 + 1); if (pnode->rn_url.s == NULL) { shm_free(pnode); @@ -891,8 +975,9 @@ static int add_rtpengine_socks(struct rtpe_set * rtpe_list, LM_DBG("url is %s, len is %i\n", pnode->rn_url.s, pnode->rn_url.len); - if (lookup_rtpe_node(rtpe_list, &pnode->rn_url) != NULL) { + if ((tmpnode = lookup_rtpe_node(rtpe_list, &pnode->rn_url)) != NULL) { LM_DBG("node with url %s already exists in set %d, not adding new node\n", pnode->rn_url.s, rtpe_list->id_set); + tmpnode->rn_flags &= ~RTPE_TEARDOWN_NODE; shm_free(pnode->rn_url.s); shm_free(pnode); return 0; @@ -1004,6 +1089,8 @@ static int rtpengine_add_rtpengine_set( char * rtp_proxies, int set_id) new_list = 0; } + rtpe_list->set_flags &= ~RTPE_TEARDOWN_SET; + if(add_rtpengine_socks(rtpe_list, rtp_proxies)!= 0){ /*if this list will not be inserted, clean it up*/ goto error; @@ -1189,27 +1276,77 @@ static mi_response_t *mi_show_rtpengines(const mi_params_t *params, static mi_response_t *mi_reload_rtpengines(const mi_params_t *params, struct mi_handler *async_hdl) { - struct rtpe_set *it; + str type; + int soft_reload = 0; + struct rtpe_set *crt_list, *next_list; + struct rtpe_node *crt_rtpe, *next_rtpe; + if(db_url.s == NULL) { LM_ERR("Dynamic loading of rtpengines not enabled\n"); return init_mi_error(400, MI_SSTR("Dynamic loading not enabled")); } - lock_start_write(rtpe_lock); - if(*rtpe_set_list) { - for (it = (*rtpe_set_list)->rset_first; it; it = it->rset_next) - free_rtpe_nodes(it); + if (try_get_mi_string_param(params, "type", &type.s, &type.len) == 0) { + if (!str_strcmp(&type, _str("soft"))) + soft_reload = 1; } - *rtpe_no = 0; - (*list_version)++; - /* notify timeout process that the rtpp proxy list changes */ + lock_start_write(rtpe_lock); - if(_add_rtpengine_from_database() < 0) - goto error; + if (soft_reload) { + /* + soft reload: + - mark all nodes/sets for teardown; as nodes are added + from the database, this flag is cleared and sockets + will be reused + */ + if (*rtpe_set_list) { + for(crt_list = (*rtpe_set_list)->rset_first; crt_list != NULL; + crt_list = crt_list->rset_next){ + crt_list->set_flags |= RTPE_TEARDOWN_SET; + for(crt_rtpe = crt_list->rn_first; crt_rtpe != NULL; + crt_rtpe = crt_rtpe->rn_next) + crt_rtpe->rn_flags |= RTPE_TEARDOWN_NODE; + } + } - if (update_rtpengines(1)) - goto error; + if(_add_rtpengine_from_database() < 0) + goto error; + + /* remove old sets/nodes */ + for(crt_list = (*rtpe_set_list)->rset_first; crt_list != NULL; ){ + if (crt_list->set_flags&RTPE_TEARDOWN_SET) { + next_list = crt_list->rset_next; + free_rtpe_set(crt_list->id_set); + crt_list = next_list; + continue; + } + for(crt_rtpe = crt_list->rn_first; crt_rtpe != NULL; ){ + if (crt_rtpe->rn_flags&RTPE_TEARDOWN_NODE) { + next_rtpe = crt_rtpe->rn_next; + free_rtpe_node(crt_list, &crt_rtpe->rn_url); + crt_rtpe = next_rtpe; + continue; + } + crt_rtpe = crt_rtpe->rn_next; + } + crt_list = crt_list->rset_next; + } + } else { + /* + hard reload: + - teardown all nodes and establish fresh connections + */ + if(*rtpe_set_list) { + for (crt_list = (*rtpe_set_list)->rset_first; crt_list != NULL; + crt_list = crt_list->rset_next) + free_rtpe_nodes(crt_list); + } + *rtpe_no = 0; + + if(_add_rtpengine_from_database() < 0) + goto error; + } /* update pointer to default_rtpp_set*/ *default_rtpe_set = select_rtpe_set(DEFAULT_RTPE_SET_ID); @@ -1217,9 +1354,26 @@ static mi_response_t *mi_reload_rtpengines(const mi_params_t *params, LM_WARN("there is no rtpengine in the default set %d\n", DEFAULT_RTPE_SET_ID); + /* update version history */ + cleanup_rtpengine_versions(); + if (add_rtpengine_version(soft_reload ? 0 : RTPE_TEARDOWN_SOCKETS) < 0) + goto error; + /* release the readers */ lock_stop_write(rtpe_lock); + /* + set has been updated so we can connect to nodes + with read lock only like SIP procs; this will test node + outside of SIP context + */ + RTPE_START_READ(); + if (update_rtpengines(1)) { + RTPE_STOP_READ(); + return init_mi_error(500, MI_SSTR("Internal error")); + } + RTPE_STOP_READ(); + return init_mi_result_ok(); error: lock_stop_write(rtpe_lock); @@ -1257,6 +1411,11 @@ void rtpengine_timer(unsigned int ticks, void *param) return; RTPE_START_READ(); + if (my_version != *list_version && update_rtpengines(0) < 0) { + LM_ERR("cannot update rtpengines list\n"); + RTPE_STOP_READ(); + return; + } for(rtpe_list = (*rtpe_set_list)->rset_first; rtpe_list != NULL; rtpe_list = rtpe_list->rset_next){ for(crt_rtpe = rtpe_list->rn_first; crt_rtpe != NULL; @@ -1304,16 +1463,23 @@ mod_init(void) rtpe_no = (unsigned int*)shm_malloc(sizeof(unsigned int)); list_version = (unsigned int*)shm_malloc(sizeof(unsigned int)); + child_versions=(unsigned int*)shm_malloc((1/*non-SIP at head*/ + udp_workers_no + tcp_workers_no) * sizeof(unsigned int)); + child_versions_no = (unsigned int*)shm_malloc(sizeof(unsigned int)); + rtpe_versions = (struct rtpe_version_head **)shm_malloc(sizeof(struct rtpe_version_head *)); - if(!rtpe_no || !list_version) { + if(!rtpe_no || !list_version || !child_versions || !child_versions_no || !rtpe_versions) { LM_ERR("No more shared memory\n"); return -1; } *rtpe_no = 0; *list_version = 0; + *child_versions_no = 1/*non-SIP at head*/ + udp_workers_no + tcp_workers_no; + *rtpe_versions = 0; my_version = 0; + memset(child_versions, 0, *child_versions_no * sizeof(unsigned int)); + if (!(rtpe_set_list = (struct rtpe_set_head **) shm_malloc(sizeof(struct rtpe_set_head *)))) { LM_ERR("no more shm mem\n"); @@ -1468,7 +1634,7 @@ mod_init(void) static int mi_child_init(void) { - if(child_init(1) < 0) + if(child_init(myrank) < 0) { LM_ERR("Failed to initial rtpp socks\n"); return -1; @@ -1595,7 +1761,8 @@ static int connect_rtpengines(int force_test) rtpe_list = rtpe_list->rset_next){ for (pnode=rtpe_list->rn_first; pnode!=0; pnode = pnode->rn_next){ - if (rtpengine_connect_node(pnode) && force_test) + /* reuse socket if already initialized */ + if ((rtpe_socks[pnode->idx] != -1 || rtpengine_connect_node(pnode)) && force_test) pnode->rn_disabled = rtpe_test(pnode, 0, 1); /* else, there is an error, and we try to connect the next one */ } @@ -1610,6 +1777,11 @@ child_init(int rank) { mypid = getpid(); myrand = rand()%10000; + myrank = rank; + + /* external procs are assigned to head of version array */ + if (rank == PROC_MODULE) + myrank = 0; if(*rtpe_set_list==NULL ) return 0; @@ -1620,14 +1792,65 @@ child_init(int rank) static int update_rtpengines(int force_test) { - int i; + int *tmp_socks; + int i, reset=0; + struct rtpe_set * crt_list; + struct rtpe_node * crt_rtpe; + struct rtpe_version * crt_version; LM_DBG("updating list from %d to %d [%d]\n", my_version, *list_version, rtpe_number); - my_version = *list_version; + + if (*rtpe_versions==NULL || (*rtpe_versions)->version_last==NULL + || my_version == (*rtpe_versions)->version_last->version) + return 0; /* nothing to do or already updated */ + + for (crt_version = (*rtpe_versions)->version_first; crt_version != NULL; + crt_version = crt_version->version_next) { + if (crt_version->version <= my_version) + continue; + if (crt_version->version_flags&RTPE_TEARDOWN_SOCKETS) + reset = 1; + my_version = crt_version->version; + } + + child_versions[myrank] = my_version; + + /* + close all sockets if any versions required + a hard reload of rtpengine nodes + */ + if (reset) { + for (i = 0; i < rtpe_number; i++) + disconnect_rtpe_socket(i); + return connect_rtpengines(force_test); + } + + /* + only close sockets that were removed from set(s) + */ + tmp_socks = (int*)pkg_malloc(rtpe_number * sizeof(int)); + if (tmp_socks == NULL) { + LM_ERR("no more pkg memory\n"); + return -1; + } + memcpy(tmp_socks, rtpe_socks, rtpe_number * sizeof(int)); + + for(crt_list = (*rtpe_set_list)->rset_first; crt_list != NULL; + crt_list = crt_list->rset_next){ + for(crt_rtpe = crt_list->rn_first; crt_rtpe != NULL; + crt_rtpe = crt_rtpe->rn_next) { + if (crt_rtpe->idx < rtpe_number) + tmp_socks[crt_rtpe->idx] = 0; /* mark as active */ + } + } + for (i = 0; i < rtpe_number; i++) { - disconnect_rtpe_socket(i); + if (tmp_socks[i]) + disconnect_rtpe_socket(i); } + pkg_free(tmp_socks); + return connect_rtpengines(force_test); } diff --git a/modules/rtpengine/rtpengine.h b/modules/rtpengine/rtpengine.h index 2109690f4bf..012b5f533d3 100644 --- a/modules/rtpengine/rtpengine.h +++ b/modules/rtpengine/rtpengine.h @@ -29,6 +29,11 @@ #include "bencode.h" #include "../../str.h" +/* flags for set, node, and socket management */ +#define RTPE_TEARDOWN_NODE (1<<0) +#define RTPE_TEARDOWN_SET (1<<1) +#define RTPE_TEARDOWN_SOCKETS (1<<2) + struct rtpe_node { unsigned int idx; /* overall index */ str rn_url; /* unparsed, deletable */ @@ -37,6 +42,7 @@ struct rtpe_node { int rn_disabled; /* found unaccessible? */ unsigned rn_weight; /* for load balancing */ unsigned int rn_recheck_ticks; + int rn_flags; struct rtpe_node *rn_next; }; @@ -47,6 +53,7 @@ struct rtpe_set{ unsigned int rtpe_node_count; int set_disabled; unsigned int set_recheck_ticks; + int set_flags; struct rtpe_node *rn_first; struct rtpe_node *rn_last; struct rtpe_set *rset_next; @@ -58,6 +65,20 @@ struct rtpe_set_head{ struct rtpe_set *rset_last; }; + +struct rtpe_version{ + unsigned int version; + int version_flags; + struct rtpe_version *version_next; +}; + + +struct rtpe_version_head{ + unsigned int version_count; + struct rtpe_version *version_first; + struct rtpe_version *version_last; +}; + #define RTPENGINE_TABLE_VERSION 1 #endif From 096bc7504d03587e1235b93a5b598c1f270b40ea Mon Sep 17 00:00:00 2001 From: John Burke Date: Tue, 24 Aug 2021 16:33:03 -0500 Subject: [PATCH 8/9] rtpengine: update `rtpengine_reload` docs --- modules/rtpengine/doc/rtpengine_admin.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modules/rtpengine/doc/rtpengine_admin.xml b/modules/rtpengine/doc/rtpengine_admin.xml index f48d5546b89..5fbe2e6df54 100644 --- a/modules/rtpengine/doc/rtpengine_admin.xml +++ b/modules/rtpengine/doc/rtpengine_admin.xml @@ -1447,6 +1447,15 @@ $ opensips-cli -x mi rtpengine_show Reloads all rtpengine sets from the database. Used only when the parameter is set. + Parameters: + + + type (optional) soft - when reloading nodes + from the database, reuse any existing sockets and keep existing + node disabled state. If not provided, then all nodes and sockets + will first be torndown and then nodes will be loaded from the database. + + No parameter. @@ -1456,6 +1465,7 @@ $ opensips-cli -x mi rtpengine_show ... $ opensips-cli -x mi rtpengine_reload +$ opensips-cli -x mi rtpengine_reload type=soft ... From 2a1b57d3746190a27922c0f6d68acb351aa09f78 Mon Sep 17 00:00:00 2001 From: John Burke Date: Mon, 18 Apr 2022 16:30:05 -0500 Subject: [PATCH 9/9] rtpengine: bring master up-to-date with socket refactor --- modules/rtpengine/rtpengine.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index 38a772bf78c..8c1bc4447ad 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -2381,7 +2381,7 @@ static struct rtpe_node *get_rtpe_node(str *node, struct rtpe_set *set) struct rtpe_node *rnode; /* check last list version */ - if (my_version != *list_version && update_rtpengines() < 0) { + if (my_version != *list_version && update_rtpengines(0) < 0) { LM_ERR("cannot update rtpengines list\n"); return NULL; }