Skip to content

Commit

Permalink
clusterer: fix learning capabilities when restarting a node
Browse files Browse the repository at this point in the history
This issue occurs when a node that has all the links with its neighbors
established is restarted and boots up faster than the next ping from another
node in the cluster. That node does not detect the failed link and never sends
the capabilities information itself to the restarted node. The solution is to
require a reply to an initial capabilities update sent when a node establishes a
link.
  • Loading branch information
rvlad-patrascu committed Apr 25, 2018
1 parent f1bd449 commit ca84d6c
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions modules/clusterer/clusterer.c
Expand Up @@ -70,7 +70,7 @@ static int set_link_w_neigh_adv(clusterer_link_state new_ls, node_info_t *neigh)
static int set_link_w_neigh_up(node_info_t *neigh, int nr_nodes, int *node_list);
static void do_actions_node_ev(cluster_info_t *clusters, int *select_cluster,
int no_clusters);

static int send_cap_update(node_info_t *dest_node, int require_reply);

static int send_ping(node_info_t *node, int req_node_list)
{
Expand Down Expand Up @@ -891,7 +891,7 @@ int clusterer_check_addr(int cluster_id, union sockaddr_union *su)
}

static int flood_message(bin_packet_t *packet, cluster_info_t *cluster,
int source_id)
int source_id, int rst_req_repl)
{
int path_len;
int path_nodes[UPDATE_MAX_PATH_LEN];
Expand Down Expand Up @@ -924,6 +924,13 @@ static int flood_message(bin_packet_t *packet, cluster_info_t *cluster,
skip_nodes[no_skip_nodes++] = tmp_path_node->node_id;
}

if (rst_req_repl) {
/* message has a require_reply field and it should be reset */
bin_remove_int_buffer_end(packet, path_len + 2);
bin_push_int(packet, 0);
bin_skip_int_packet_end(packet, path_len + 1);
}

lock_get(cluster->current_node->lock);

/* flood update to all neighbours */
Expand Down Expand Up @@ -1194,7 +1201,7 @@ static void handle_full_top_update(bin_packet_t *packet, node_info_t *source,
}
}

flood_message(packet, source->cluster, source->node_id);
flood_message(packet, source->cluster, source->node_id, 0);
}

static void handle_cap_update(bin_packet_t *packet, node_info_t *source)
Expand All @@ -1208,6 +1215,7 @@ static void handle_cap_update(bin_packet_t *packet, node_info_t *source)
node_info_t *node;
int cap_state;
int rc;
int require_reply;

bin_pop_int(packet, &nr_nodes);

Expand Down Expand Up @@ -1295,7 +1303,13 @@ static void handle_cap_update(bin_packet_t *packet, node_info_t *source)
}
}

flood_message(packet, source->cluster, source->node_id);
bin_pop_int(packet, &require_reply);
if (require_reply)
/* also send current node's capabilities information to source node */
send_cap_update(source, 0);

/* flood to other neighbours */
flood_message(packet, source->cluster, source->node_id, require_reply);
}

static void handle_internal_msg_unknown(bin_packet_t *received, cluster_info_t *cl,
Expand Down Expand Up @@ -1338,7 +1352,7 @@ static void handle_internal_msg_unknown(bin_packet_t *received, cluster_info_t *
bin_pop_int(received, &int_vals[INT_VALS_NO_PING_RETRIES_COL]);
add_node(received, cl, src_node_id, str_vals, int_vals);

flood_message(received, cl, src_node_id);
flood_message(received, cl, src_node_id, 0);
break;
default:
LM_DBG("Ignoring message, type: %d from unknown source, id [%d]\n",
Expand Down Expand Up @@ -1398,7 +1412,7 @@ static void handle_ls_update(bin_packet_t *received, node_info_t *src_node,
*ev_actions_required = 1;
}

flood_message(received, src_node->cluster, src_node->node_id);
flood_message(received, src_node->cluster, src_node->node_id, 0);
}

static inline void bin_push_node_info(bin_packet_t *packet, node_info_t *node)
Expand Down Expand Up @@ -2141,6 +2155,8 @@ int send_single_cap_update(cluster_info_t *cluster, struct local_cap *cap,
bin_push_str(&packet, &cap->reg.name);
bin_push_int(&packet, cap_state);

bin_push_int(&packet, 0); /* don't require reply */

bin_push_int(&packet, 1); /* path length is 1, only current node at this point */
bin_push_int(&packet, current_id);
bin_get_buffer(&packet, &bin_buffer);
Expand All @@ -2160,7 +2176,7 @@ int send_single_cap_update(cluster_info_t *cluster, struct local_cap *cap,
return 0;
}

static int send_cap_update(node_info_t *dest_node)
static int send_cap_update(node_info_t *dest_node, int require_reply)
{
bin_packet_t packet;
str bin_buffer;
Expand Down Expand Up @@ -2223,6 +2239,8 @@ static int send_cap_update(node_info_t *dest_node)
lock_release(node->lock);
}

bin_push_int(&packet, require_reply);

bin_push_int(&packet, 1); /* path length is 1, only current node at this point */
bin_push_int(&packet, current_id);
bin_get_buffer(&packet, &bin_buffer);
Expand Down Expand Up @@ -2446,7 +2464,7 @@ static int set_link_w_neigh_up(node_info_t *neigh, int nr_nodes, int *node_list)
if (send_full_top_update(neigh, nr_nodes, node_list) < 0)
return -1;
/* send capabilities update to neighbour */
send_cap_update(neigh);
send_cap_update(neigh, 1);

return 0;
}
Expand Down

0 comments on commit ca84d6c

Please sign in to comment.