From 73e63eed9b658882ce407b1fde68aa4715d8388b Mon Sep 17 00:00:00 2001 From: Vlad Patrascu Date: Mon, 25 Jan 2021 21:25:05 +0200 Subject: [PATCH] clusterer: allow disabling a specific node with clusterer_set_status --- modules/clusterer/api.h | 2 +- modules/clusterer/clusterer.c | 83 ++++++++++++++++++++++- modules/clusterer/clusterer.h | 2 +- modules/clusterer/clusterer_mod.c | 22 +++++- modules/clusterer/doc/clusterer_admin.xml | 11 ++- modules/clusterer/node_info.c | 33 ++++++--- modules/clusterer/node_info.h | 2 +- modules/clusterer/topology.c | 8 ++- 8 files changed, 144 insertions(+), 19 deletions(-) diff --git a/modules/clusterer/api.h b/modules/clusterer/api.h index c955d041106..6aa40d1ea9e 100644 --- a/modules/clusterer/api.h +++ b/modules/clusterer/api.h @@ -90,7 +90,7 @@ typedef void (*free_nodes_f)(clusterer_node_t *list); /* * Set the state (enabled or disabled) of the current node in the cluster. */ -typedef int (*set_state_f)(int cluster_id, enum cl_node_state state); +typedef int (*set_state_f)(int cluster_id, int node_id, enum cl_node_state state); /* * Check if the given address belongs to one of the nodes in the cluster. diff --git a/modules/clusterer/clusterer.c b/modules/clusterer/clusterer.c index 4ba1058fbf3..8383cb855d6 100644 --- a/modules/clusterer/clusterer.c +++ b/modules/clusterer/clusterer.c @@ -91,7 +91,7 @@ void seed_fb_check_timer(utime_t ticks, void *param) lock_stop_read(cl_list_lock); } -int cl_set_state(int cluster_id, enum cl_node_state state) +int cl_set_state(int cluster_id, int node_id, enum cl_node_state state) { cluster_info_t *cluster = NULL; node_info_t *node; @@ -107,6 +107,48 @@ int cl_set_state(int cluster_id, enum cl_node_state state) return -1; } + if (node_id != current_id) { + node = get_node_by_id(cluster, node_id); + if (!node) { + lock_stop_read(cl_list_lock); + LM_ERR("Node id [%d] not found\n", node_id); + return 1; + } + + lock_get(node->lock); + + if (state == STATE_DISABLED && node->flags & NODE_STATE_ENABLED) + new_link_states = LS_DOWN; + else if (state == STATE_ENABLED && !(node->flags & NODE_STATE_ENABLED)) + new_link_states = LS_RESTART_PINGING; + + if (state == STATE_DISABLED) + node->flags &= ~NODE_STATE_ENABLED; + else + node->flags |= NODE_STATE_ENABLED; + + lock_release(node->lock); + + if (new_link_states == LS_DOWN) { + set_link_w_neigh_adv(-1, LS_DOWN, node); + + do_actions_node_ev(cluster, &ev_actions_required, 1); + } else if (new_link_states == LS_RESTART_PINGING) { + set_link_w_neigh(LS_RESTART_PINGING, node); + } + + lock_stop_read(cl_list_lock); + + LM_INFO("Set state: %s for node: %d in cluster: %d\n", + state ? "enabled" : "disabled", node_id, cluster_id); + + if (db_mode && update_db_state(cluster_id, node_id, state) < 0) + LM_ERR("Failed to update state in clusterer DB for node [%d] cluster [%d]\n", + node_id, cluster_id); + + return 0; + } + lock_get(cluster->current_node->lock); if (state == STATE_DISABLED && cluster->current_node->flags & NODE_STATE_ENABLED) @@ -136,7 +178,7 @@ int cl_set_state(int cluster_id, enum cl_node_state state) LM_INFO("Set state: %s for local node in cluster: %d\n", state ? "enabled" : "disabled", cluster_id); - if (db_mode && update_db_state(state) < 0) + if (db_mode && update_db_state(cluster_id, current_id, state) < 0) LM_ERR("Failed to update state in clusterer DB for cluster [%d]\n", cluster->cluster_id); return 0; @@ -231,6 +273,15 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet, return CLUSTERER_SEND_ERR; } + lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + lock_stop_read(cl_list_lock); + LM_DBG("node disabled, skip message sending\n"); + return CLUSTERER_SEND_SUCCESS; + } + lock_release(node->lock); + rc = msg_send_retry(packet, node, 0, &ev_actions_required); bin_remove_int_buffer_end(packet, 3); @@ -286,6 +337,14 @@ clusterer_bcast_msg(bin_packet_t *packet, int dst_cid, if (!match_node(dst_cl->current_node, node, match_op)) continue; + lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + LM_DBG("node disabled, skip message sending\n"); + continue; + } + lock_release(node->lock); + matched_once = 1; rc = msg_send_retry(packet, node, 1, &ev_actions_required); @@ -850,6 +909,12 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type, lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + LM_DBG("node disabled, ignoring received clusterer bin packet\n"); + goto exit; + } + /* if the node was down, restart pinging */ if (node->link_state == LS_DOWN) { lock_release(node->lock); @@ -968,6 +1033,14 @@ void bin_rcv_cl_packets(bin_packet_t *packet, int packet_type, goto exit; } + lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + LM_DBG("node disabled, ignoring received clusterer bin packet\n"); + goto exit; + } + lock_release(node->lock); + handle_internal_msg(packet, packet_type, node, now, &ev_actions_required); if (ev_actions_required) do_actions_node_ev(cl, &ev_actions_required, 1); @@ -1091,6 +1164,12 @@ static void bin_rcv_mod_packets(bin_packet_t *packet, int packet_type, lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + LM_DBG("node disabled, ignoring received bin packet\n"); + goto exit; + } + /* if the node was down, restart pinging */ if (node->link_state == LS_DOWN) { lock_release(node->lock); diff --git a/modules/clusterer/clusterer.h b/modules/clusterer/clusterer.h index 98c82a12280..69ccaf0892b 100644 --- a/modules/clusterer/clusterer.h +++ b/modules/clusterer/clusterer.h @@ -165,7 +165,7 @@ enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name, mi_item_t *cmd_params_arr, int no_params); enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node); -int cl_set_state(int cluster_id, enum cl_node_state state); +int cl_set_state(int cluster_id, int node_id, enum cl_node_state state); int clusterer_check_addr(int cluster_id, str *ip_str, enum node_addr_type check_type); enum clusterer_send_ret cl_send_to(bin_packet_t *, int cluster_id, int node_id); diff --git a/modules/clusterer/clusterer_mod.c b/modules/clusterer/clusterer_mod.c index 855bd8c25a6..a0067887613 100644 --- a/modules/clusterer/clusterer_mod.c +++ b/modules/clusterer/clusterer_mod.c @@ -172,6 +172,7 @@ static mi_export_t mi_cmds[] = { }, { "clusterer_set_status", "sets the status for a specified connection", 0,0,{ {clusterer_set_status, {"cluster_id", "status", 0}}, + {clusterer_set_status, {"cluster_id", "node_id", "status", 0}}, {EMPTY_MI_RECIPE}} }, { "clusterer_list", "lists the available connections for the specified server", 0,0,{ @@ -546,7 +547,7 @@ mi_response_t *clusterer_reload(const mi_params_t *params, static mi_response_t *clusterer_set_status(const mi_params_t *params, struct mi_handler *async_hdl) { - int cluster_id; + int cluster_id, node_id; int state; int rc; @@ -555,12 +556,23 @@ static mi_response_t *clusterer_set_status(const mi_params_t *params, if (cluster_id < 1) return init_mi_error(400, MI_SSTR("Bad value for 'cluster_id'")); + switch (try_get_mi_int_param(params, "node_id", &node_id)) { + case -1: + node_id = current_id; + case 0: + if (node_id < 1) + return init_mi_error(400, MI_SSTR("Bad value for 'node_id'")); + break; + default: + return init_mi_param_error(); + } + if (get_mi_int_param(params, "status", &state) < 0) return init_mi_param_error(); if (state != STATE_DISABLED && state != STATE_ENABLED) return init_mi_error(400, MI_SSTR("Bad value for 'status'")); - rc = cl_set_state(cluster_id, state); + rc = cl_set_state(cluster_id, node_id, state); if (rc == -1) return init_mi_error(404, MI_SSTR("Cluster id not found")); if (rc == 1) @@ -640,6 +652,12 @@ static mi_response_t *clusterer_list(const mi_params_t *params, goto error; } + if (add_mi_string_fmt(node_item, MI_SSTR("state"), "%s", + n_info->flags&NODE_STATE_ENABLED ? "enabled" : "disabled") < 0) { + lock_release(n_info->lock); + goto error; + } + lock_release(n_info->lock); n_hop = get_next_hop(n_info); diff --git a/modules/clusterer/doc/clusterer_admin.xml b/modules/clusterer/doc/clusterer_admin.xml index a66e34698e8..84fee7c8e23 100644 --- a/modules/clusterer/doc/clusterer_admin.xml +++ b/modules/clusterer/doc/clusterer_admin.xml @@ -884,8 +884,8 @@ $ opensips-cli -x mi clusterer_list_topology clusterer_set_status - Sets the status(Enabled/Disabled) of the local node in a specified cluster. A disabled node does not send any messages and ignores received ones thus appearing as a failed node in the topology. - + Sets the status(Enabled/Disabled) of a node. If the local instance is disabled, the node will not send any messages and ignore received ones thus appearing as a failed node in the topology (from the other node's perspective). If a different node is disabled, the specified node will simply be ignored by the local instance in terms of sending/receiving any messages, as if no longer part of the topology. + Name: clusterer_set_status @@ -894,6 +894,10 @@ $ opensips-cli -x mi clusterer_list_topology cluster_id - indicates the id of the cluster. + + node_id (optional) - indicates the id of the node to be disabled. + If missing, the local instance will be disalbed. + status - indicates the new status(0 - Disabled, 1 - Enabled). @@ -902,7 +906,10 @@ $ opensips-cli -x mi clusterer_list_topology MI FIFO Command Format: + #disable the local instance opensips-cli -x mi clusterer_set_status 1 0 + #disable node ID 3 + opensips-cli -x mi clusterer_set_status 1 3 0 diff --git a/modules/clusterer/node_info.c b/modules/clusterer/node_info.c index a1cad2f2a8a..ae39644185f 100644 --- a/modules/clusterer/node_info.c +++ b/modules/clusterer/node_info.c @@ -621,30 +621,45 @@ int provision_current(modparam_t type, void *val) return 0; } -int update_db_state(int state) { +int update_db_state(int cluster_id, int node_id, int state) { db_key_t node_id_key = &id_col; db_val_t node_id_val; + db_key_t cl_node_id_keys[2] = {&node_id_col, &cluster_id_col}; + db_val_t cl_node_id_vals[2]; db_key_t update_key; db_val_t update_val; - VAL_TYPE(&node_id_val) = DB_INT; - VAL_NULL(&node_id_val) = 0; - VAL_INT(&node_id_val) = current_id; - update_key = &state_col; - CON_OR_RESET(db_hdl); if (dr_dbf.use_table(db_hdl, &db_table) < 0) { LM_ERR("cannot select table: \"%.*s\"\n", db_table.len, db_table.s); return -1; } + update_key = &state_col; VAL_TYPE(&update_val) = DB_INT; VAL_NULL(&update_val) = 0; VAL_INT(&update_val) = state; - if (dr_dbf.update(db_hdl, &node_id_key, 0, &node_id_val, &update_key, - &update_val, 1, 1) < 0) - return -1; + if (node_id == current_id) { + VAL_TYPE(&node_id_val) = DB_INT; + VAL_NULL(&node_id_val) = 0; + VAL_INT(&node_id_val) = current_id; + + if (dr_dbf.update(db_hdl, &node_id_key, 0, &node_id_val, &update_key, + &update_val, 1, 1) < 0) + return -1; + } else { + VAL_TYPE(&cl_node_id_vals[0]) = DB_INT; + VAL_NULL(&cl_node_id_vals[0]) = 0; + VAL_INT(&cl_node_id_vals[0]) = node_id; + VAL_TYPE(&cl_node_id_vals[1]) = DB_INT; + VAL_NULL(&cl_node_id_vals[1]) = 0; + VAL_INT(&cl_node_id_vals[1]) = cluster_id; + + if (dr_dbf.update(db_hdl, cl_node_id_keys, 0, cl_node_id_vals, &update_key, + &update_val, 2, 1) < 0) + return -1; + } return 0; } diff --git a/modules/clusterer/node_info.h b/modules/clusterer/node_info.h index 66a623bc5da..dad8474edc1 100644 --- a/modules/clusterer/node_info.h +++ b/modules/clusterer/node_info.h @@ -125,7 +125,7 @@ extern int db_mode; extern rw_lock_t *cl_list_lock; extern cluster_info_t **cluster_list; -int update_db_state(int state); +int update_db_state(int cluster_id, int node_id, int state); int load_db_info(db_func_t *dr_dbf, db_con_t* db_hdl, str *db_table, cluster_info_t **cl_list); void free_info(cluster_info_t *cl_list); diff --git a/modules/clusterer/topology.c b/modules/clusterer/topology.c index 2be356e8ec2..a7d0a209c66 100644 --- a/modules/clusterer/topology.c +++ b/modules/clusterer/topology.c @@ -204,6 +204,11 @@ void heartbeats_timer(void) for(node = clusters_it->node_list; node; node = node->next) { lock_get(node->lock); + if (!(node->flags & NODE_STATE_ENABLED)) { + lock_release(node->lock); + continue; + } + gettimeofday(&now, NULL); ping_reply_int = PING_REPLY_INTERVAL(node); last_ping_int = TIME_DIFF(node->last_ping, now); @@ -1097,7 +1102,8 @@ void handle_full_top_update(bin_packet_t *packet, node_info_t *source, if (top_node_info[i][j+4] == current_id) { lock_get(top_node->lock); - if (top_node->link_state == LS_DOWN) { + if (top_node->link_state == LS_DOWN && + top_node->flags & NODE_STATE_ENABLED) { lock_release(top_node->lock); set_link_w_neigh(LS_RESTART_PINGING, top_node);