Skip to content

Commit

Permalink
clusterer: add MI function to remove a node from the cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
rvlad-patrascu committed Jan 12, 2021
1 parent eeaf59e commit b78ed71
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 23 deletions.
103 changes: 100 additions & 3 deletions modules/clusterer/clusterer.c
Expand Up @@ -457,6 +457,32 @@ enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name,
return rc;
}

enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node)
{
bin_packet_t packet;
int rc;

if (bin_init(&packet, &cl_extra_cap, CLUSTERER_REMOVE_NODE,
BIN_VERSION, 0) < 0) {
LM_ERR("Failed to init bin send buffer\n");
return CLUSTERER_SEND_ERR;
}

if (bin_push_int(&packet, target_node) < 0)
return CLUSTERER_SEND_ERR;

if (msg_add_trailer(&packet, cluster_id, -1) < 0) {
LM_ERR("Failed to add trailer to module's message\n");
return CLUSTERER_SEND_ERR;
}

rc = clusterer_bcast_msg(&packet, cluster_id, NODE_CMP_ANY);

bin_free_packet(&packet);

return rc;
}

static inline int su_ip_cmp(union sockaddr_union* s1, union sockaddr_union* s2)
{
if (s1->s.sa_family != s2->s.sa_family)
Expand Down Expand Up @@ -723,6 +749,51 @@ static void handle_cl_mi_msg(bin_packet_t *packet)
cmd_name.len, cmd_name.s, (rc == 1) ? "error" : "success");
}

static void handle_remove_node(bin_packet_t *packet, cluster_info_t *cl)
{
int target_node;
int lock_old_flag;
node_info_t *node;
int ev_actions_cl = 1;

bin_pop_int(packet, &target_node);
LM_DBG("Received remove node command for node id: [%d]\n", target_node);

if (db_mode) {
LM_DBG("We are in DB mode, ignoring received remove node command\n");
return;
}

if (target_node == current_id) {
lock_get(cl->current_node->lock);

if (cl->current_node->flags & NODE_STATE_ENABLED) {
cl->current_node->flags &= ~NODE_STATE_ENABLED;
lock_release(cl->current_node->lock);

for (node = cl->node_list; node; node = node->next) {
set_link_w_neigh(LS_DOWN, node);

do_actions_node_ev(cl, &ev_actions_cl, 1);
}
} else {
lock_release(cl->current_node->lock);
}

return;
}

node = get_node_by_id(cl, target_node);
if (!node) {
LM_DBG("Unknown node [%d] to remove\n", target_node);
return;
}

lock_switch_write(cl_list_lock, lock_old_flag);
remove_node(cl, node);
lock_switch_read(cl_list_lock, lock_old_flag);
}

void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,
struct receive_info *ri, void *att)
{
Expand All @@ -746,7 +817,10 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,
return;
}

lock_start_read(cl_list_lock);
if (!db_mode && packet_type == CLUSTERER_REMOVE_NODE)
lock_start_sw_read(cl_list_lock);
else
lock_start_read(cl_list_lock);

cl = get_cluster_by_id(cluster_id);
if (!cl) {
Expand Down Expand Up @@ -807,7 +881,9 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,
goto exit;
}
} else {
if (packet_type == CLUSTERER_GENERIC_MSG)
if (packet_type == CLUSTERER_REMOVE_NODE)
handle_remove_node(packet, cl);
else if (packet_type == CLUSTERER_GENERIC_MSG)
handle_cl_gen_msg(packet, cluster_id, source_id);
else if (packet_type == CLUSTERER_MI_CMD)
handle_cl_mi_msg(packet);
Expand All @@ -824,7 +900,10 @@ void bin_rcv_cl_extra_packets(bin_packet_t *packet, int packet_type,
}

exit:
lock_stop_read(cl_list_lock);
if (!db_mode && packet_type == CLUSTERER_REMOVE_NODE)
lock_stop_sw_read(cl_list_lock);
else
lock_stop_read(cl_list_lock);
}

void bin_rcv_cl_packets(bin_packet_t *packet, int packet_type,
Expand Down Expand Up @@ -1389,3 +1468,21 @@ int preserve_reg_caps(cluster_info_t *new_info)

return 0;
}

void remove_node(struct cluster_info *cl, struct node_info *node)
{
node_info_t *it;
int ev_actions_cl = 1;

set_link_w_neigh(LS_DOWN, node);

do_actions_node_ev(cl, &ev_actions_cl, 1);

for (it = cl->node_list; it; it = it->next) {
lock_get(it->lock);
delete_neighbour(it, node);
lock_release(it->lock);
}

remove_node_list(cl, node);
}
4 changes: 4 additions & 0 deletions modules/clusterer/clusterer.h
Expand Up @@ -60,6 +60,7 @@
typedef enum { CLUSTERER_PING, CLUSTERER_PONG,
CLUSTERER_LS_UPDATE, CLUSTERER_FULL_TOP_UPDATE,
CLUSTERER_UNKNOWN_ID, CLUSTERER_NODE_DESCRIPTION,
CLUSTERER_REMOVE_NODE,
CLUSTERER_GENERIC_MSG,
CLUSTERER_MI_CMD,
CLUSTERER_CAP_UPDATE,
Expand Down Expand Up @@ -155,11 +156,14 @@ int send_cap_update(struct node_info *dest_node, int require_reply);
void do_actions_node_ev(struct cluster_info *clusters, int *select_cluster,
int no_clusters);

void remove_node(struct cluster_info *cl, struct node_info *node);

enum clusterer_send_ret send_gen_msg(int cluster_id, int node_id, str *gen_msg,
str *exchg_tag, int req_like);
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag);
enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name,
mi_item_t *cmd_params_arr, int no_params);
enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node);

int cl_set_state(int cluster_id, enum cl_node_state state);
int clusterer_check_addr(int cluster_id, str *ip_str,
Expand Down
94 changes: 94 additions & 0 deletions modules/clusterer/clusterer_mod.c
Expand Up @@ -83,6 +83,8 @@ static mi_response_t *cluster_bcast_mi(const mi_params_t *params,
struct mi_handler *async_hdl);
static mi_response_t *clusterer_list_cap(const mi_params_t *params,
struct mi_handler *async_hdl);
static mi_response_t *cluster_remove_node(const mi_params_t *params,
struct mi_handler *async_hdl);

static void heartbeats_timer_handler(unsigned int ticks, void *param);
static void heartbeats_utimer_handler(utime_t ticks, void *param);
Expand Down Expand Up @@ -202,6 +204,10 @@ static mi_export_t mi_cmds[] = {
{shtag_mi_set_active, {"tag", 0}},
{EMPTY_MI_RECIPE}}
},
{ "clusterer_remove_node", "removes a node from the cluster", 0,0,{
{cluster_remove_node, {"cluster_id", "node_id", 0}},
{EMPTY_MI_RECIPE}}
},
{EMPTY_MI_EXPORT}
};

Expand Down Expand Up @@ -1015,6 +1021,94 @@ static mi_response_t *cluster_bcast_mi(const mi_params_t *params,
return run_mi_cmd_local(&cmd_name, cmd_params_arr, no_params);
}

static mi_response_t *cluster_remove_node(const mi_params_t *params,
struct mi_handler *async_hdl)
{
int cluster_id, node_id;
int rc;
cluster_info_t *cl;
node_info_t *node;
mi_response_t *resp;

if (db_mode)
return init_mi_error(400, MI_SSTR("Running in DB mode"));

if (get_mi_int_param(params, "cluster_id", &cluster_id) < 0)
return init_mi_param_error();
if (cluster_id < 1)
return init_mi_error(400, MI_SSTR("Bad value for 'cluster_id'"));

lock_start_read(cl_list_lock);

cl = get_cluster_by_id(cluster_id);
if (!cl) {
LM_ERR("Unknown cluster id [%d]\n", cluster_id);
resp = init_mi_error(400, MI_SSTR("Unknown cluster id"));
goto error;
}

if (get_mi_int_param(params, "node_id", &node_id) < 0) {
resp = init_mi_param_error();
goto error;
}
if (node_id < 1) {
resp = init_mi_error(400, MI_SSTR("Bad value for 'node_id'"));
goto error;
}

node = get_node_by_id(cl, node_id);
if (!node) {
LM_ERR("Unknown node id [%d]\n", node_id);
resp = init_mi_error(400, MI_SSTR("Unknown node id"));
goto error;
}

lock_stop_read(cl_list_lock);

rc = bcast_remove_node(cluster_id, node_id);
switch (rc) {
case CLUSTERER_SEND_SUCCESS:
LM_DBG("Remove node <%d> command sent\n", node_id);
break;
case CLUSTERER_CURR_DISABLED:
LM_INFO("Local node disabled, remove node <%d> command not sent\n",
node_id);
break;
case CLUSTERER_DEST_DOWN:
LM_ERR("All nodes down, remove node <%d> command not sent\n",
node_id);
break;
case CLUSTERER_SEND_ERR:
LM_ERR("Error sending remove node <%d> command\n", node_id);
break;
}

lock_start_write(cl_list_lock);

cl = get_cluster_by_id(cluster_id);
if (!cl) {
LM_ERR("Unknown cluster id [%d]\n", cluster_id);
lock_stop_write(cl_list_lock);
return init_mi_error(400, MI_SSTR("Unknown cluster id"));
}

node = get_node_by_id(cl, node_id);
if (!node) {
LM_ERR("Unknown node id [%d]\n", node_id);
lock_stop_write(cl_list_lock);
return init_mi_error(400, MI_SSTR("Unknown node id"));
}

remove_node(cl, node);

lock_stop_write(cl_list_lock);

return init_mi_result_ok();
error:
lock_stop_read(cl_list_lock);
return resp;
}

static void heartbeats_timer_handler(unsigned int ticks, void *param)
{
heartbeats_timer();
Expand Down
35 changes: 35 additions & 0 deletions modules/clusterer/doc/clusterer_admin.xml
Expand Up @@ -906,6 +906,41 @@ $ opensips-cli -x mi clusterer_list_topology
</programlisting>
</section>

<section id="mi_clusterer_remove_node" xreflabel="clusterer_remove_node">
<title>
<function moreinfo="none">clusterer_remove_node</function>
</title>
<para>
Removes a node from the cluster's topology. It is enough to run the function
on a single node in order to remove the target node from all the other
nodes in the cluster. If the node to be removed is running when triggering
this function, it will be automatically disabled (equivalent to running
<xref linkend="mi_clusterer_set_status"/> on that specific node).
</para>
<para>
This function can only be used when <xref linkend="param_db_mode"/> is set to
<emphasis>0</emphasis> (disabled).
</para>
<para>
Name: <emphasis>clusterer_remove_node</emphasis>
</para>
<para>Parameters:</para>
<itemizedlist>
<listitem><para>
<emphasis>cluster_id</emphasis> - cluster ID
</para></listitem>
<listitem><para>
<emphasis>node_id</emphasis> - ID of the node to be removed.
</para></listitem>
</itemizedlist>
<para>
MI FIFO Command Format:
</para>
<programlisting format="linespecific">
opensips-cli -x mi clusterer_remove_node 1 3
</programlisting>
</section>

<section id="mi_cluster_send_mi" xreflabel="cluster_send_mi">
<title>
<function moreinfo="none">cluster_send_mi</function>
Expand Down

0 comments on commit b78ed71

Please sign in to comment.