Skip to content

Commit

Permalink
clusterer: fix internal traffic flood caused by node UP event
Browse files Browse the repository at this point in the history
The capability update broadcasting mechanism would generate an
unncessarily large number of packets, especially for clusters
of more than 6,7 nodes.
  • Loading branch information
rvlad-patrascu committed Nov 1, 2021
1 parent 52e5180 commit 32dbcba
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 15 deletions.
40 changes: 37 additions & 3 deletions modules/clusterer/clusterer.c
Expand Up @@ -727,6 +727,21 @@ static void handle_cap_update(bin_packet_t *packet, node_info_t *source)
int cap_state;
int rc;
int require_reply;
int seq_no, timestamp;

bin_pop_int(packet, &seq_no);
bin_pop_int(packet, &timestamp);

lock_get(source->lock);
if (validate_update(source->cap_seq_no, seq_no,
source->cap_timestamp, timestamp, 0, source->node_id) < 0) {
lock_release(source->lock);
return;
} else {
source->cap_seq_no = seq_no;
source->cap_timestamp = timestamp;
}
lock_release(source->lock);

bin_pop_int(packet, &nr_nodes);

Expand Down Expand Up @@ -1349,26 +1364,35 @@ int send_single_cap_update(cluster_info_t *cluster, struct local_cap *cap,
node_info_t* destinations[MAX_NO_NODES];
struct neighbour *neigh;
int no_dests = 0, i;
int timestamp;

timestamp = time(NULL);

lock_get(cluster->current_node->lock);

for (neigh = cluster->current_node->neighbour_list; neigh;
neigh = neigh->next)
destinations[no_dests++] = neigh->node;

lock_release(cluster->current_node->lock);

if (no_dests == 0)
if (no_dests == 0) {
lock_release(cluster->current_node->lock);
return 0;
}

if (bin_init(&packet, &cl_internal_cap, CLUSTERER_CAP_UPDATE,
BIN_VERSION, 0) < 0) {
lock_release(cluster->current_node->lock);
LM_ERR("Failed to init bin send buffer\n");
return -1;
}
bin_push_int(&packet, cluster->cluster_id);
bin_push_int(&packet, current_id);

bin_push_int(&packet, ++cluster->current_node->cap_seq_no);
bin_push_int(&packet, timestamp);

lock_release(cluster->current_node->lock);

/* only the current node */
bin_push_int(&packet, 1);
bin_push_int(&packet, current_id);
Expand Down Expand Up @@ -1407,6 +1431,9 @@ int send_cap_update(node_info_t *dest_node, int require_reply)
struct remote_cap *n_cap;
int nr_cap, nr_nodes = 0;
node_info_t *node;
int timestamp;

timestamp = time(NULL);

if (dest_node->cluster->capabilities)
nr_nodes++;
Expand All @@ -1428,6 +1455,13 @@ int send_cap_update(node_info_t *dest_node, int require_reply)
bin_push_int(&packet, dest_node->cluster->cluster_id);
bin_push_int(&packet, current_id);

lock_get(dest_node->cluster->current_node->lock);

bin_push_int(&packet, ++dest_node->cluster->current_node->cap_seq_no);
bin_push_int(&packet, timestamp);

lock_release(dest_node->cluster->current_node->lock);

bin_push_int(&packet, nr_nodes);

/* current node's capabilities */
Expand Down
2 changes: 2 additions & 0 deletions modules/clusterer/node_info.c
Expand Up @@ -233,8 +233,10 @@ int add_node_info(node_info_t **new_info, cluster_info_t **cl_list, int *int_val

(*new_info)->ls_seq_no = -1;
(*new_info)->top_seq_no = -1;
(*new_info)->cap_seq_no = -1;
(*new_info)->ls_timestamp = 0;
(*new_info)->top_timestamp = 0;
(*new_info)->cap_timestamp = 0;

(*new_info)->sp_info = shm_malloc(sizeof(struct node_search_info));
if (!(*new_info)->sp_info) {
Expand Down
14 changes: 14 additions & 0 deletions modules/clusterer/node_info.h
Expand Up @@ -92,8 +92,10 @@ struct node_info {
struct neighbour *neighbour_list; /* list of directly reachable neighbours */
int ls_seq_no; /* sequence number of the last link state update */
int top_seq_no; /* sequence number of the last topology update message */
int cap_seq_no;
int ls_timestamp;
int top_timestamp;
int cap_timestamp;
struct node_info *next_hop; /* next hop from the shortest path */
struct remote_cap *capabilities; /* known capabilities of this node */
int flags;
Expand Down Expand Up @@ -169,5 +171,17 @@ static inline node_info_t *get_node_by_id(cluster_info_t *cluster, int node_id)
return NULL;
}

static inline int validate_update(int seq_no, int msg_seq_no, int timestamp,
int msg_timestamp, int val_type, int node_id)
{
if (msg_seq_no == 0) {
if (seq_no == 0 && msg_timestamp <= timestamp)
return -1;
} else if (msg_seq_no <= seq_no)
return -1;

return 0;
}

#endif /* CL_NODE_INFO_H */

12 changes: 0 additions & 12 deletions modules/clusterer/topology.c
Expand Up @@ -678,18 +678,6 @@ static int send_ls_update(node_info_t *node, clusterer_link_state new_ls)
return 0;
}

static inline int validate_update(int seq_no, int msg_seq_no, int timestamp,
int msg_timestamp, int val_type, int node_id)
{
if (msg_seq_no == 0) {
if (seq_no == 0 && msg_timestamp <= timestamp)
return -1;
} else if (msg_seq_no <= seq_no)
return -1;

return 0;
}

static node_info_t *add_node(bin_packet_t *received, cluster_info_t *cl,
int src_node_id, str *str_vals, int *int_vals)
{
Expand Down

0 comments on commit 32dbcba

Please sign in to comment.