Skip to content

Commit

Permalink
load_balancer: add cluster syncing at startup and after DB reload
Browse files Browse the repository at this point in the history
  • Loading branch information
rvlad-patrascu committed Jan 17, 2020
1 parent a6a85cd commit 373f9b3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 24 deletions.
109 changes: 90 additions & 19 deletions modules/load_balancer/lb_clustering.c
Expand Up @@ -38,7 +38,7 @@ static str status_repl_cap = str_init("load_balancer-status-repl");

/* implemented in load_balancer.c which has no .h file */
int lb_update_from_replication( unsigned int group, str *uri,
unsigned int flags);
unsigned int flags, int raise_event);


int lb_cluster_shtag_is_active(void)
Expand All @@ -51,6 +51,12 @@ int lb_cluster_shtag_is_active(void)
return 0;
}

static void bin_push_dst_status(bin_packet_t *packet, struct lb_dst *dst)
{
bin_push_int(packet, dst->group);
bin_push_str(packet, &dst->uri);
bin_push_int(packet, dst->flags&LB_DST_STAT_MASK);
}

void replicate_lb_status(struct lb_dst *dst)
{
Expand All @@ -67,9 +73,7 @@ void replicate_lb_status(struct lb_dst *dst)
return;
}

bin_push_int(&packet, dst->group);
bin_push_str(&packet, &dst->uri);
bin_push_int(&packet, dst->flags&LB_DST_STAT_MASK);
bin_push_dst_status(&packet, dst);

rc = c_api.send_all(&packet, lb_cluster_id);
switch (rc) {
Expand All @@ -88,30 +92,89 @@ void replicate_lb_status(struct lb_dst *dst)
bin_free_packet(&packet);
}


static void receive_lb_binary_packet(bin_packet_t *packet)
static int lb_recv_status_update(bin_packet_t *packet, int raise_event)
{
unsigned int group, flags;
str uri;

LM_DBG("received a binary packet [%d]!\n", packet->type);
bin_pop_int(packet, &group);
bin_pop_str(packet, &uri);
bin_pop_int(packet, &flags);

if(get_bin_pkg_version(packet) != BIN_VERSION) {
LM_ERR("incompatible bin protocol version\n");
return;
return lb_update_from_replication( group, &uri, flags, raise_event);
}

static void receive_lb_binary_packet(bin_packet_t *packet)
{
bin_packet_t *pkt;
int rc;

for (pkt = packet; pkt; pkt = pkt->next) {
LM_DBG("received a binary packet [%d]!\n", packet->type);

switch (pkt->type) {
case REPL_LB_STATUS_UPDATE:
ensure_bin_version(pkt, BIN_VERSION);

rc = lb_recv_status_update(pkt, 1);
break;
case SYNC_PACKET_TYPE:
_ensure_bin_version(pkt, BIN_VERSION, "load_balancer sync packet");

while (c_api.sync_chunk_iter(pkt))
if (lb_recv_status_update(pkt, 0) < 0)
LM_WARN("failed to process sync chunk!\n");
break;
default:
LM_ERR("invalid load_balancer binary packet type: %d\n", pkt->type);
}

if (rc != 0)
LM_ERR("failed to process binary packet!\n");
}
}

if (packet->type == REPL_LB_STATUS_UPDATE) {
bin_pop_int(packet, &group);
bin_pop_str(packet, &uri);
bin_pop_int(packet, &flags);
static int lb_recv_sync_request(int node_id)
{
bin_packet_t *sync_packet;
struct lb_dst *dst;

lb_update_from_replication( group, &uri, flags);
} else {
LM_ERR("invalid load_balancer binary packet type: %d\n", packet->type);
lock_start_read(ref_lock);

for (dst = (*curr_data)->dsts; dst; dst = dst->next) {
sync_packet = c_api.sync_chunk_start(&status_repl_cap, lb_cluster_id,
node_id, BIN_VERSION);
if (!sync_packet)
goto error;

bin_push_dst_status(sync_packet, dst);
}

lock_stop_read(ref_lock);

return 0;

error:
return -1;
lock_stop_read(ref_lock);
}

void receive_lb_cluster_event(enum clusterer_event ev, int node_id)
{
if (ev == SYNC_REQ_RCV && lb_recv_sync_request(node_id) < 0)
LM_ERR("Failed to send sync data to node: %d\n", node_id);
else if (ev == SYNC_DONE)
LM_INFO("Synchronized destinations status from cluster\n");
}

int lb_cluster_sync(void) {
if (c_api.request_sync(&status_repl_cap, lb_cluster_id) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}

return 0;
}

int lb_init_cluster(void)
{
Expand All @@ -124,7 +187,8 @@ int lb_init_cluster(void)
/* register handler for processing load-balancer packets
* to the clusterer module */
if (c_api.register_capability( &status_repl_cap,
receive_lb_binary_packet, NULL, lb_cluster_id, 0, NODE_CMP_ANY) < 0) {
receive_lb_binary_packet, receive_lb_cluster_event, lb_cluster_id, 1,
NODE_CMP_ANY) < 0) {
LM_ERR("cannot register binary packet callback to "
"clusterer module!\n");
return -1;
Expand All @@ -142,6 +206,13 @@ int lb_init_cluster(void)
lb_cluster_shtag.len = 0;
}

if (c_api.request_sync(&status_repl_cap, lb_cluster_id) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}

if (lb_cluster_sync() < 0)
return -1;

return 0;
}

3 changes: 3 additions & 0 deletions modules/load_balancer/lb_clustering.h
Expand Up @@ -36,4 +36,7 @@ int lb_cluster_shtag_is_active(void);
/* replicate the LB status via BIN */
void replicate_lb_status(struct lb_dst *dst);

/* request sync of destination states from cluster */
int lb_cluster_sync(void);

#endif
3 changes: 3 additions & 0 deletions modules/load_balancer/lb_data.c
Expand Up @@ -42,6 +42,9 @@ extern int fetch_freeswitch_stats;
extern int initial_fs_load;
extern struct fs_binds fs_api;

/* reader-writers lock for data reloading */
rw_lock_t *ref_lock = NULL;


struct lb_data* load_lb_data(void)
{
Expand Down
2 changes: 2 additions & 0 deletions modules/load_balancer/lb_data.h
Expand Up @@ -47,6 +47,8 @@
/* max number of IPs for a destination (DNS loookup) */
#define LB_MAX_IPS 32

extern rw_lock_t *ref_lock;

struct lb_resource {
str name;
gen_lock_t *lock;
Expand Down
12 changes: 7 additions & 5 deletions modules/load_balancer/load_balancer.c
Expand Up @@ -48,8 +48,6 @@ static char *table_name = NULL;
/* dialog stuff */
struct dlg_binds lb_dlg_binds;

/* reader-writers lock for data reloading */
static rw_lock_t *ref_lock = NULL;
struct lb_data **curr_data = NULL;

/* probing related stuff */
Expand Down Expand Up @@ -839,6 +837,9 @@ mi_response_t *mi_lb_reload(const mi_params_t *params,
goto error;
}

if (lb_cluster_id && lb_cluster_sync() < 0)
return init_mi_error(500, MI_SSTR("Failed to synchronize from cluster"));

return init_mi_result_ok();
error:
return init_mi_error( 500, MI_SSTR("Failed to reload"));
Expand Down Expand Up @@ -1075,7 +1076,7 @@ mi_response_t *mi_lb_list(const mi_params_t *params,


int lb_update_from_replication( unsigned int group, str *uri,
unsigned int flags)
unsigned int flags, int raise_event)
{
struct lb_dst *dst;

Expand All @@ -1088,8 +1089,9 @@ int lb_update_from_replication( unsigned int group, str *uri,
/* import the status flags */
dst->flags = ((~LB_DST_STAT_MASK)&dst->flags)|
(LB_DST_STAT_MASK&flags);
/* raise event of status change */
lb_raise_event(dst);
if (raise_event)
/* raise event of status change */
lb_raise_event(dst);
lock_stop_read( ref_lock );
return 0;
}
Expand Down

0 comments on commit 373f9b3

Please sign in to comment.