Skip to content

Commit

Permalink
clusterer: allow disabling a specific node with clusterer_set_status
Browse files Browse the repository at this point in the history
  • Loading branch information
rvlad-patrascu committed Jan 25, 2021
1 parent a8da45c commit 73e63ee
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 19 deletions.
2 changes: 1 addition & 1 deletion modules/clusterer/api.h
Expand Up @@ -90,7 +90,7 @@ typedef void (*free_nodes_f)(clusterer_node_t *list);
/*
* Set the state (enabled or disabled) of the current node in the cluster.
*/
typedef int (*set_state_f)(int cluster_id, enum cl_node_state state);
typedef int (*set_state_f)(int cluster_id, int node_id, enum cl_node_state state);

/*
* Check if the given address belongs to one of the nodes in the cluster.
Expand Down
83 changes: 81 additions & 2 deletions modules/clusterer/clusterer.c
Expand Up @@ -91,7 +91,7 @@ void seed_fb_check_timer(utime_t ticks, void *param)
lock_stop_read(cl_list_lock);
}

int cl_set_state(int cluster_id, enum cl_node_state state)
int cl_set_state(int cluster_id, int node_id, enum cl_node_state state)
{
cluster_info_t *cluster = NULL;
node_info_t *node;
Expand All @@ -107,6 +107,48 @@ int cl_set_state(int cluster_id, enum cl_node_state state)
return -1;
}

if (node_id != current_id) {
node = get_node_by_id(cluster, node_id);
if (!node) {
lock_stop_read(cl_list_lock);
LM_ERR("Node id [%d] not found\n", node_id);
return 1;
}

lock_get(node->lock);

if (state == STATE_DISABLED && node->flags & NODE_STATE_ENABLED)
new_link_states = LS_DOWN;
else if (state == STATE_ENABLED && !(node->flags & NODE_STATE_ENABLED))
new_link_states = LS_RESTART_PINGING;

if (state == STATE_DISABLED)
node->flags &= ~NODE_STATE_ENABLED;
else
node->flags |= NODE_STATE_ENABLED;

lock_release(node->lock);

if (new_link_states == LS_DOWN) {
set_link_w_neigh_adv(-1, LS_DOWN, node);

do_actions_node_ev(cluster, &ev_actions_required, 1);
} else if (new_link_states == LS_RESTART_PINGING) {
set_link_w_neigh(LS_RESTART_PINGING, node);
}

lock_stop_read(cl_list_lock);

LM_INFO("Set state: %s for node: %d in cluster: %d\n",
state ? "enabled" : "disabled", node_id, cluster_id);

if (db_mode && update_db_state(cluster_id, node_id, state) < 0)
LM_ERR("Failed to update state in clusterer DB for node [%d] cluster [%d]\n",
node_id, cluster_id);

return 0;
}

lock_get(cluster->current_node->lock);

if (state == STATE_DISABLED && cluster->current_node->flags & NODE_STATE_ENABLED)
Expand Down Expand Up @@ -136,7 +178,7 @@ int cl_set_state(int cluster_id, enum cl_node_state state)
LM_INFO("Set state: %s for local node in cluster: %d\n",
state ? "enabled" : "disabled", cluster_id);

if (db_mode && update_db_state(state) < 0)
if (db_mode && update_db_state(cluster_id, current_id, state) < 0)
LM_ERR("Failed to update state in clusterer DB for cluster [%d]\n", cluster->cluster_id);

return 0;
Expand Down Expand Up @@ -231,6 +273,15 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet,
return CLUSTERER_SEND_ERR;
}

lock_get(node->lock);
if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
lock_stop_read(cl_list_lock);
LM_DBG("node disabled, skip message sending\n");
return CLUSTERER_SEND_SUCCESS;
}
lock_release(node->lock);

rc = msg_send_retry(packet, node, 0, &ev_actions_required);

bin_remove_int_buffer_end(packet, 3);
Expand Down Expand Up @@ -286,6 +337,14 @@ clusterer_bcast_msg(bin_packet_t *packet, int dst_cid,
if (!match_node(dst_cl->current_node, node, match_op))
continue;

lock_get(node->lock);
if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
LM_DBG("node disabled, skip message sending\n");
continue;
}
lock_release(node->lock);

matched_once = 1;

rc = msg_send_retry(packet, node, 1, &ev_actions_required);
Expand Down Expand Up @@ -850,6 +909,12 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,

lock_get(node->lock);

if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
LM_DBG("node disabled, ignoring received clusterer bin packet\n");
goto exit;
}

/* if the node was down, restart pinging */
if (node->link_state == LS_DOWN) {
lock_release(node->lock);
Expand Down Expand Up @@ -968,6 +1033,14 @@ void bin_rcv_cl_packets(bin_packet_t *packet, int packet_type,
goto exit;
}

lock_get(node->lock);
if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
LM_DBG("node disabled, ignoring received clusterer bin packet\n");
goto exit;
}
lock_release(node->lock);

handle_internal_msg(packet, packet_type, node, now, &ev_actions_required);
if (ev_actions_required)
do_actions_node_ev(cl, &ev_actions_required, 1);
Expand Down Expand Up @@ -1091,6 +1164,12 @@ static void bin_rcv_mod_packets(bin_packet_t *packet, int packet_type,

lock_get(node->lock);

if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
LM_DBG("node disabled, ignoring received bin packet\n");
goto exit;
}

/* if the node was down, restart pinging */
if (node->link_state == LS_DOWN) {
lock_release(node->lock);
Expand Down
2 changes: 1 addition & 1 deletion modules/clusterer/clusterer.h
Expand Up @@ -165,7 +165,7 @@ enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name,
mi_item_t *cmd_params_arr, int no_params);
enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node);

int cl_set_state(int cluster_id, enum cl_node_state state);
int cl_set_state(int cluster_id, int node_id, enum cl_node_state state);
int clusterer_check_addr(int cluster_id, str *ip_str,
enum node_addr_type check_type);
enum clusterer_send_ret cl_send_to(bin_packet_t *, int cluster_id, int node_id);
Expand Down
22 changes: 20 additions & 2 deletions modules/clusterer/clusterer_mod.c
Expand Up @@ -172,6 +172,7 @@ static mi_export_t mi_cmds[] = {
},
{ "clusterer_set_status", "sets the status for a specified connection", 0,0,{
{clusterer_set_status, {"cluster_id", "status", 0}},
{clusterer_set_status, {"cluster_id", "node_id", "status", 0}},
{EMPTY_MI_RECIPE}}
},
{ "clusterer_list", "lists the available connections for the specified server", 0,0,{
Expand Down Expand Up @@ -546,7 +547,7 @@ mi_response_t *clusterer_reload(const mi_params_t *params,
static mi_response_t *clusterer_set_status(const mi_params_t *params,
struct mi_handler *async_hdl)
{
int cluster_id;
int cluster_id, node_id;
int state;
int rc;

Expand All @@ -555,12 +556,23 @@ static mi_response_t *clusterer_set_status(const mi_params_t *params,
if (cluster_id < 1)
return init_mi_error(400, MI_SSTR("Bad value for 'cluster_id'"));

switch (try_get_mi_int_param(params, "node_id", &node_id)) {
case -1:
node_id = current_id;
case 0:
if (node_id < 1)
return init_mi_error(400, MI_SSTR("Bad value for 'node_id'"));
break;
default:
return init_mi_param_error();
}

if (get_mi_int_param(params, "status", &state) < 0)
return init_mi_param_error();
if (state != STATE_DISABLED && state != STATE_ENABLED)
return init_mi_error(400, MI_SSTR("Bad value for 'status'"));

rc = cl_set_state(cluster_id, state);
rc = cl_set_state(cluster_id, node_id, state);
if (rc == -1)
return init_mi_error(404, MI_SSTR("Cluster id not found"));
if (rc == 1)
Expand Down Expand Up @@ -640,6 +652,12 @@ static mi_response_t *clusterer_list(const mi_params_t *params,
goto error;
}

if (add_mi_string_fmt(node_item, MI_SSTR("state"), "%s",
n_info->flags&NODE_STATE_ENABLED ? "enabled" : "disabled") < 0) {
lock_release(n_info->lock);
goto error;
}

lock_release(n_info->lock);

n_hop = get_next_hop(n_info);
Expand Down
11 changes: 9 additions & 2 deletions modules/clusterer/doc/clusterer_admin.xml
Expand Up @@ -884,8 +884,8 @@ $ opensips-cli -x mi clusterer_list_topology
<function moreinfo="none">clusterer_set_status</function>
</title>
<para>
Sets the status(Enabled/Disabled) of the local node in a specified cluster. A disabled node does not send any messages and ignores received ones thus appearing as a failed node in the topology.
</para>
Sets the status(Enabled/Disabled) of a node. If the local instance is disabled, the node will not send any messages and ignore received ones thus appearing as a failed node in the topology (from the other node's perspective). If a different node is disabled, the specified node will simply be ignored by the local instance in terms of sending/receiving any messages, as if no longer part of the topology.
</para>
<para>
Name: <emphasis>clusterer_set_status</emphasis>
</para>
Expand All @@ -894,6 +894,10 @@ $ opensips-cli -x mi clusterer_list_topology
<listitem><para>
<emphasis>cluster_id</emphasis> - indicates the id of the cluster.
</para></listitem>
<listitem><para>
<emphasis>node_id</emphasis> (optional) - indicates the id of the node to be disabled.
If missing, the local instance will be disalbed.
</para></listitem>
<listitem><para>
<emphasis>status</emphasis> - indicates the new status(0 - Disabled, 1 - Enabled).
</para></listitem>
Expand All @@ -902,7 +906,10 @@ $ opensips-cli -x mi clusterer_list_topology
MI FIFO Command Format:
</para>
<programlisting format="linespecific">
#disable the local instance
opensips-cli -x mi clusterer_set_status 1 0
#disable node ID 3
opensips-cli -x mi clusterer_set_status 1 3 0
</programlisting>
</section>

Expand Down
33 changes: 24 additions & 9 deletions modules/clusterer/node_info.c
Expand Up @@ -621,30 +621,45 @@ int provision_current(modparam_t type, void *val)
return 0;
}

int update_db_state(int state) {
int update_db_state(int cluster_id, int node_id, int state) {
db_key_t node_id_key = &id_col;
db_val_t node_id_val;
db_key_t cl_node_id_keys[2] = {&node_id_col, &cluster_id_col};
db_val_t cl_node_id_vals[2];
db_key_t update_key;
db_val_t update_val;

VAL_TYPE(&node_id_val) = DB_INT;
VAL_NULL(&node_id_val) = 0;
VAL_INT(&node_id_val) = current_id;
update_key = &state_col;

CON_OR_RESET(db_hdl);
if (dr_dbf.use_table(db_hdl, &db_table) < 0) {
LM_ERR("cannot select table: \"%.*s\"\n", db_table.len, db_table.s);
return -1;
}

update_key = &state_col;
VAL_TYPE(&update_val) = DB_INT;
VAL_NULL(&update_val) = 0;
VAL_INT(&update_val) = state;

if (dr_dbf.update(db_hdl, &node_id_key, 0, &node_id_val, &update_key,
&update_val, 1, 1) < 0)
return -1;
if (node_id == current_id) {
VAL_TYPE(&node_id_val) = DB_INT;
VAL_NULL(&node_id_val) = 0;
VAL_INT(&node_id_val) = current_id;

if (dr_dbf.update(db_hdl, &node_id_key, 0, &node_id_val, &update_key,
&update_val, 1, 1) < 0)
return -1;
} else {
VAL_TYPE(&cl_node_id_vals[0]) = DB_INT;
VAL_NULL(&cl_node_id_vals[0]) = 0;
VAL_INT(&cl_node_id_vals[0]) = node_id;
VAL_TYPE(&cl_node_id_vals[1]) = DB_INT;
VAL_NULL(&cl_node_id_vals[1]) = 0;
VAL_INT(&cl_node_id_vals[1]) = cluster_id;

if (dr_dbf.update(db_hdl, cl_node_id_keys, 0, cl_node_id_vals, &update_key,
&update_val, 2, 1) < 0)
return -1;
}

return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion modules/clusterer/node_info.h
Expand Up @@ -125,7 +125,7 @@ extern int db_mode;
extern rw_lock_t *cl_list_lock;
extern cluster_info_t **cluster_list;

int update_db_state(int state);
int update_db_state(int cluster_id, int node_id, int state);
int load_db_info(db_func_t *dr_dbf, db_con_t* db_hdl, str *db_table, cluster_info_t **cl_list);
void free_info(cluster_info_t *cl_list);

Expand Down
8 changes: 7 additions & 1 deletion modules/clusterer/topology.c
Expand Up @@ -204,6 +204,11 @@ void heartbeats_timer(void)
for(node = clusters_it->node_list; node; node = node->next) {
lock_get(node->lock);

if (!(node->flags & NODE_STATE_ENABLED)) {
lock_release(node->lock);
continue;
}

gettimeofday(&now, NULL);
ping_reply_int = PING_REPLY_INTERVAL(node);
last_ping_int = TIME_DIFF(node->last_ping, now);
Expand Down Expand Up @@ -1097,7 +1102,8 @@ void handle_full_top_update(bin_packet_t *packet, node_info_t *source,

if (top_node_info[i][j+4] == current_id) {
lock_get(top_node->lock);
if (top_node->link_state == LS_DOWN) {
if (top_node->link_state == LS_DOWN &&
top_node->flags & NODE_STATE_ENABLED) {
lock_release(top_node->lock);

set_link_w_neigh(LS_RESTART_PINGING, top_node);
Expand Down

0 comments on commit 73e63ee

Please sign in to comment.