Skip to content

Commit

Permalink
clusterer: fix internal traffic flood caused by node UP event
Browse files Browse the repository at this point in the history
The capability update broadcasting mechanism would generate an
unncessarily large number of packets, especially for clusters
of more than 6,7 nodes.

(cherry picked from commit 32dbcba)
  • Loading branch information
rvlad-patrascu committed Nov 1, 2021
1 parent a49a9e7 commit c493e01
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 15 deletions.
52 changes: 37 additions & 15 deletions modules/clusterer/clusterer.c
Expand Up @@ -1040,18 +1040,6 @@ static int flood_message(bin_packet_t *packet, cluster_info_t *cluster,
return 0;
}

static inline int validate_update(int seq_no, int msg_seq_no, int timestamp,
int msg_timestamp, int val_type, int node_id)
{
if (msg_seq_no == 0) {
if (seq_no == 0 && msg_timestamp <= timestamp)
return -1;
} else if (msg_seq_no <= seq_no)
return -1;

return 0;
}

static node_info_t *add_node(bin_packet_t *received, cluster_info_t *cl,
int src_node_id, str *str_vals, int *int_vals)
{
Expand Down Expand Up @@ -1262,6 +1250,21 @@ static void handle_cap_update(bin_packet_t *packet, node_info_t *source)
int cap_state;
int rc;
int require_reply;
int seq_no, timestamp;

bin_pop_int(packet, &seq_no);
bin_pop_int(packet, &timestamp);

lock_get(source->lock);
if (validate_update(source->cap_seq_no, seq_no,
source->cap_timestamp, timestamp, 0, source->node_id) < 0) {
lock_release(source->lock);
return;
} else {
source->cap_seq_no = seq_no;
source->cap_timestamp = timestamp;
}
lock_release(source->lock);

bin_pop_int(packet, &nr_nodes);

Expand Down Expand Up @@ -2268,26 +2271,35 @@ int send_single_cap_update(cluster_info_t *cluster, struct local_cap *cap,
node_info_t* destinations[MAX_NO_NODES];
struct neighbour *neigh;
int no_dests = 0, i;
int timestamp;

timestamp = time(NULL);

lock_get(cluster->current_node->lock);

for (neigh = cluster->current_node->neighbour_list; neigh;
neigh = neigh->next)
destinations[no_dests++] = neigh->node;

lock_release(cluster->current_node->lock);

if (no_dests == 0)
if (no_dests == 0) {
lock_release(cluster->current_node->lock);
return 0;
}

if (bin_init(&packet, &cl_internal_cap, CLUSTERER_CAP_UPDATE,
BIN_VERSION, 0) < 0) {
lock_release(cluster->current_node->lock);
LM_ERR("Failed to init bin send buffer\n");
return -1;
}
bin_push_int(&packet, cluster->cluster_id);
bin_push_int(&packet, current_id);

bin_push_int(&packet, ++cluster->current_node->cap_seq_no);
bin_push_int(&packet, timestamp);

lock_release(cluster->current_node->lock);

/* only the current node */
bin_push_int(&packet, 1);
bin_push_int(&packet, current_id);
Expand Down Expand Up @@ -2326,6 +2338,9 @@ static int send_cap_update(node_info_t *dest_node, int require_reply)
struct remote_cap *n_cap;
int nr_cap, nr_nodes = 0;
node_info_t *node;
int timestamp;

timestamp = time(NULL);

if (dest_node->cluster->capabilities)
nr_nodes++;
Expand All @@ -2347,6 +2362,13 @@ static int send_cap_update(node_info_t *dest_node, int require_reply)
bin_push_int(&packet, dest_node->cluster->cluster_id);
bin_push_int(&packet, current_id);

lock_get(dest_node->cluster->current_node->lock);

bin_push_int(&packet, ++dest_node->cluster->current_node->cap_seq_no);
bin_push_int(&packet, timestamp);

lock_release(dest_node->cluster->current_node->lock);

bin_push_int(&packet, nr_nodes);

/* current node's capabilities */
Expand Down
2 changes: 2 additions & 0 deletions modules/clusterer/node_info.c
Expand Up @@ -224,8 +224,10 @@ int add_node_info(node_info_t **new_info, cluster_info_t **cl_list, int *int_val

(*new_info)->ls_seq_no = -1;
(*new_info)->top_seq_no = -1;
(*new_info)->cap_seq_no = -1;
(*new_info)->ls_timestamp = 0;
(*new_info)->top_timestamp = 0;
(*new_info)->cap_timestamp = 0;

(*new_info)->sp_info = shm_malloc(sizeof(struct node_search_info));
if (!(*new_info)->sp_info) {
Expand Down
14 changes: 14 additions & 0 deletions modules/clusterer/node_info.h
Expand Up @@ -90,8 +90,10 @@ struct node_info {
struct neighbour *neighbour_list; /* list of directly reachable neighbours */
int ls_seq_no; /* sequence number of the last link state update */
int top_seq_no; /* sequence number of the last topology update message */
int cap_seq_no;
int ls_timestamp;
int top_timestamp;
int cap_timestamp;
struct node_info *next_hop; /* next hop from the shortest path */
struct remote_cap *capabilities; /* known capabilities of this node */
int flags;
Expand Down Expand Up @@ -166,5 +168,17 @@ static inline node_info_t *get_node_by_id(cluster_info_t *cluster, int node_id)
return NULL;
}

static inline int validate_update(int seq_no, int msg_seq_no, int timestamp,
int msg_timestamp, int val_type, int node_id)
{
if (msg_seq_no == 0) {
if (seq_no == 0 && msg_timestamp <= timestamp)
return -1;
} else if (msg_seq_no <= seq_no)
return -1;

return 0;
}

#endif /* CL_NODE_INFO_H */

0 comments on commit c493e01

Please sign in to comment.