Skip to content

Commit

Permalink
clusterer: issue sync request under lock
Browse files Browse the repository at this point in the history
  • Loading branch information
rvlad-patrascu committed Jul 20, 2022
1 parent d866ea9 commit bf178b0
Show file tree
Hide file tree
Showing 13 changed files with 34 additions and 22 deletions.
2 changes: 1 addition & 1 deletion modules/b2b_entities/b2be_clustering.c
Expand Up @@ -68,7 +68,7 @@ int b2be_init_clustering(void)
return -1;
}

if (cl_api.request_sync(&entities_repl_cap, b2be_cluster) < 0)
if (cl_api.request_sync(&entities_repl_cap, b2be_cluster, 0) < 0)
LM_ERR("Sync request failed\n");

return 0;
Expand Down
2 changes: 1 addition & 1 deletion modules/cachedb_local/cachedb_local.c
Expand Up @@ -636,7 +636,7 @@ static int mod_init(void)
}

if (rr_persist == RRP_SYNC_FROM_CLUSTER &&
clusterer_api.request_sync(&cache_repl_cap, cluster_id) < 0)
clusterer_api.request_sync(&cache_repl_cap, cluster_id, 0) < 0)
LM_ERR("cachedb sync request failed\n");

}
Expand Down
3 changes: 2 additions & 1 deletion modules/clusterer/api.h
Expand Up @@ -183,7 +183,8 @@ typedef int (*register_capability_f)(str *cap, cl_packet_cb_f packet_cb,
/*
* Request to synchronize data for a given capability from another node.
*/
typedef int (*request_sync_f)(str * capability, int cluster_id);
typedef int (*request_sync_f)(str * capability, int cluster_id, int from_cb);

/*
* Returns a BIN packet in which to include a distinct "chunk" of data
* (e.g. info about a single usrloc contact) to sync.
Expand Down
27 changes: 19 additions & 8 deletions modules/clusterer/sync.c
Expand Up @@ -107,19 +107,23 @@ int queue_sync_request(cluster_info_t *cluster, struct local_cap *lcap)
return 0;
}

int cl_request_sync(str *capability, int cluster_id)
int cl_request_sync(str *capability, int cluster_id, int from_cb)
{
cluster_info_t *cluster;
struct local_cap *lcap;
int source_id;
int rc = -1;

LM_DBG("requesting %.*s sync in cluster %d\n",
capability->len, capability->s, cluster_id);

if (!from_cb)
lock_start_read(cl_list_lock);

cluster = get_cluster_by_id(cluster_id);
if (!cluster) {
LM_ERR("Unknown cluster [%d]\n", cluster_id);
return -1;
goto end;
}

for (lcap = cluster->capabilities; lcap; lcap = lcap->next)
Expand All @@ -128,25 +132,28 @@ int cl_request_sync(str *capability, int cluster_id)
if (!lcap) {
LM_ERR("Request sync for unknown capability: %.*s\n",
capability->len, capability->s);
return -1;
goto end;
}

lock_get(cluster->lock);
if (lcap->flags & CAP_SYNC_PENDING) {
lock_release(cluster->lock);
LM_DBG("Sync request already pending\n");
return 0;
rc = 0;
goto end;
}
if (lcap->flags & CAP_SYNC_IN_PROGRESS) {
lock_release(cluster->lock);
LM_DBG("Sync already in progress\n");
return 1;
rc = 1;
goto end;
}

if (!(lcap->flags & CAP_STATE_ENABLED)) {
lock_release(cluster->lock);
LM_DBG("Capability disabled, skip send sync request\n");
return 0;
rc = 0;
goto end;
}

lcap->sync_total_chunks_cnt = 0;
Expand Down Expand Up @@ -176,11 +183,15 @@ int cl_request_sync(str *capability, int cluster_id)
STR2CI(CAP_SR_STATUS_STR(CAP_SR_SYNC_PENDING)), 0);
if (sr_add_report_fmt(cl_srg, STR2CI(lcap->reg.sr_id), 0,
"Sync requested from node [%d]", source_id))
return -1;
goto end;
}
}

return 0;
rc = 0;
end:
if (!from_cb)
lock_stop_read(cl_list_lock);
return rc;
}

static int no_sync_chunks_sent;
Expand Down
2 changes: 1 addition & 1 deletion modules/clusterer/sync.h
Expand Up @@ -33,7 +33,7 @@ struct reply_rpc_params {
int node_id;
};

int cl_request_sync(str *capability, int cluster_id);
int cl_request_sync(str *capability, int cluster_id, int from_cb);
bin_packet_t *cl_sync_chunk_start(str *capability, int cluster_id, int dst_id,
short data_version);
int cl_sync_chunk_iter(bin_packet_t *packet);
Expand Down
2 changes: 1 addition & 1 deletion modules/dialog/dialog.c
Expand Up @@ -859,7 +859,7 @@ static int mod_init(void)
return -1;
}

if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster) < 0)
if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster, 0) < 0)
LM_ERR("Sync request failed\n");
}

Expand Down
4 changes: 2 additions & 2 deletions modules/dialog/dlg_replication.c
Expand Up @@ -1162,7 +1162,7 @@ void rcv_cluster_event(enum clusterer_event ev, int node_id)
LM_DBG("Requesting sync for dialogs marked with backup "
"sharing tags\n");
rc = clusterer_api.request_sync(&dlg_repl_cap,
dialog_repl_cluster);
dialog_repl_cluster, 1);
if (rc < 0)
LM_ERR("Failed to send sync request");
else if (rc == 1)
Expand Down Expand Up @@ -1665,7 +1665,7 @@ mi_response_t *mi_sync_cl_dlg(const mi_params_t *params,
}
}

rc = clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster);
rc = clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster, 0);

if (rc < 0)
return init_mi_error(400, MI_SSTR("Failed to send sync request"));
Expand Down
2 changes: 1 addition & 1 deletion modules/dispatcher/ds_clustering.c
Expand Up @@ -212,7 +212,7 @@ void receive_ds_cluster_event(enum clusterer_event ev, int node_id)
}

int ds_cluster_sync(void) {
if (c_api.request_sync(&status_repl_cap, ds_cluster_id) < 0) {
if (c_api.request_sync(&status_repl_cap, ds_cluster_id, 0) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion modules/drouting/dr_clustering.c
Expand Up @@ -355,7 +355,7 @@ int dr_cluster_sync(void)
if (!dr_cluster_id)
return 0;

if (c_api.request_sync(&status_repl_cap, dr_cluster_id) < 0) {
if (c_api.request_sync(&status_repl_cap, dr_cluster_id, 0) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}
Expand Down
4 changes: 2 additions & 2 deletions modules/load_balancer/lb_clustering.c
Expand Up @@ -162,7 +162,7 @@ void receive_lb_cluster_event(enum clusterer_event ev, int node_id)
}

int lb_cluster_sync(void) {
if (c_api.request_sync(&status_repl_cap, lb_cluster_id) < 0) {
if (c_api.request_sync(&status_repl_cap, lb_cluster_id, 0) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}
Expand Down Expand Up @@ -200,7 +200,7 @@ int lb_init_cluster(void)
lb_cluster_shtag.len = 0;
}

if (c_api.request_sync(&status_repl_cap, lb_cluster_id) < 0) {
if (c_api.request_sync(&status_repl_cap, lb_cluster_id, 0) < 0) {
LM_ERR("Sync request failed\n");
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion modules/presence/clustering.c
Expand Up @@ -97,7 +97,7 @@ int init_pres_clustering(void)
}

if (cluster_federation == FEDERATION_FULL_SHARING &&
c_api.request_sync(&presence_capability, pres_cluster_id) < 0)
c_api.request_sync(&presence_capability, pres_cluster_id, 0) < 0)
LM_ERR("Sync request failed\n");

return 0;
Expand Down
2 changes: 1 addition & 1 deletion modules/usrloc/ul_cluster.c
Expand Up @@ -59,7 +59,7 @@ int ul_init_cluster(void)
}

if (rr_persist == RRP_SYNC_FROM_CLUSTER &&
clusterer_api.request_sync(&contact_repl_cap, location_cluster) < 0)
clusterer_api.request_sync(&contact_repl_cap, location_cluster, 0) < 0)
LM_ERR("Sync request failed\n");

return 0;
Expand Down
2 changes: 1 addition & 1 deletion modules/usrloc/ul_mi.c
Expand Up @@ -751,7 +751,7 @@ mi_response_t *mi_usrloc_cl_sync(const mi_params_t *params,
if (!location_cluster)
return init_mi_error(400, MI_SSTR("Clustering not enabled"));

if (clusterer_api.request_sync(&contact_repl_cap, location_cluster) < 0)
if (clusterer_api.request_sync(&contact_repl_cap, location_cluster, 0) < 0)
return init_mi_error(400, MI_SSTR("Failed to send sync request"));
else
return init_mi_result_ok();
Expand Down

0 comments on commit bf178b0

Please sign in to comment.