Skip to content

Commit

Permalink
usrloc: Improve clustering mode nomenclature
Browse files Browse the repository at this point in the history
  • Loading branch information
liviuchircu committed Mar 23, 2018
1 parent e75fe04 commit 1773ecf
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 103 deletions.
14 changes: 7 additions & 7 deletions modules/usrloc/dlist.c
Expand Up @@ -536,7 +536,7 @@ get_domain_cdb_ucontacts(udomain_t *d, void *buf, int *len,
int shortage;

cur_node_idx = clusterer_api.get_my_index(
ul_replication_cluster, &contact_repl_cap, &nr_nodes);
location_cluster, &contact_repl_cap, &nr_nodes);

unit = MAX_DB_AOR_HASH / (double)(part_max * nr_nodes);
min = (int)(unit * part_max * cur_node_idx + unit * part_idx);
Expand Down Expand Up @@ -677,9 +677,9 @@ get_domain_mem_ucontacts(udomain_t *d,void *buf, int *len, unsigned int flags,
r =( urecord_t * ) *dest;

/* distribute ping workload across cluster nodes */
if (ul_replication_cluster) {
if (location_cluster) {
cur_node_idx = clusterer_api.get_my_index(
ul_replication_cluster, &contact_repl_cap, &nr_nodes);
location_cluster, &contact_repl_cap, &nr_nodes);
if (r->aorhash % nr_nodes != cur_node_idx)
continue;
}
Expand Down Expand Up @@ -886,7 +886,7 @@ int get_domain_ucontacts(udomain_t *d, void *buf, int len, unsigned int flags,
if (cluster_mode == CM_SQL_ONLY)
return get_domain_db_ucontacts(d, buf, &len,
flags, part_idx, part_max, 1, pack_coords);
else if (cluster_mode == CM_CORE_CACHEDB_ONLY)
else if (cluster_mode == CM_CACHEDB_ONLY)
return get_domain_cdb_ucontacts(d, buf, &len,
flags, part_idx, part_max, 1, pack_coords);
else
Expand Down Expand Up @@ -1200,7 +1200,7 @@ int delete_ucontact_from_coords(udomain_t *d, ucontact_coords ct_coords,
return -1;
}
return 0;
} else if (cluster_mode == CM_CORE_CACHEDB_ONLY) {
} else if (cluster_mode == CM_CACHEDB_ONLY) {
if (cdb_delete_ucontact_coords((ucontact_sip_coords *)ct_coords)) {
LM_ERR("failed to remove contact from cache\n");
return -1;
Expand All @@ -1215,7 +1215,7 @@ int delete_ucontact_from_coords(udomain_t *d, ucontact_coords ct_coords,
return 0;
}

if (!is_replicated && ul_replication_cluster)
if (!is_replicated && location_cluster)
replicate_ucontact_delete(r, c);

if (exists_ulcb_type(UL_CONTACT_DELETE)) {
Expand Down Expand Up @@ -1246,7 +1246,7 @@ int update_sipping_latency(udomain_t *d, ucontact_coords ct_coords,
ucontact_id contact_id = (ucontact_id)ct_coords;

/* TODO: add cachedb queries for latency updates */
if (cluster_mode == CM_SQL_ONLY || cluster_mode == CM_CORE_CACHEDB_ONLY)
if (cluster_mode == CM_SQL_ONLY || cluster_mode == CM_CACHEDB_ONLY)
return 0;

c = get_ucontact_from_id(d, contact_id, &r);
Expand Down
4 changes: 2 additions & 2 deletions modules/usrloc/ucontact.c
Expand Up @@ -962,7 +962,7 @@ int ucontact_coords_cmp(ucontact_coords _a, ucontact_coords _b)
{
ucontact_sip_coords *a, *b;

if (cluster_mode != CM_CORE_CACHEDB_ONLY)
if (cluster_mode != CM_CACHEDB_ONLY)
return _a == _b ? 0 : -1;

a = (ucontact_sip_coords *)_a;
Expand All @@ -977,7 +977,7 @@ int ucontact_coords_cmp(ucontact_coords _a, ucontact_coords _b)

void free_ucontact_coords(ucontact_coords coords)
{
if (cluster_mode == CM_CORE_CACHEDB_ONLY)
if (cluster_mode == CM_CACHEDB_ONLY)
shm_free((ucontact_sip_coords *)coords);
}

Expand Down
13 changes: 6 additions & 7 deletions modules/usrloc/udomain.c
Expand Up @@ -1490,7 +1490,7 @@ int insert_urecord(udomain_t* _d, str* _aor, struct urecord** _r,

(*_r)->label = CID_NEXT_RLABEL(_d, sl);

if (!is_replicated && ul_replication_cluster)
if (!is_replicated && location_cluster)
replicate_urecord_insert(*_r);
} else {
get_static_urecord( _d, _aor, _r);
Expand All @@ -1514,8 +1514,7 @@ int get_urecord(udomain_t* _d, str* _aor, struct urecord** _r)

switch (cluster_mode) {
case CM_NONE:
case CM_EDGE:
case CM_CORE:
case CM_FULL_MIRRORING:
/* search in cache */
aorhash = core_hash(_aor, 0, 0);
sl = aorhash&(_d->size-1);
Expand All @@ -1537,7 +1536,7 @@ int get_urecord(udomain_t* _d, str* _aor, struct urecord** _r)
return 0;
}
break;
case CM_CORE_CACHEDB_ONLY:
case CM_CACHEDB_ONLY:
r = cdb_load_urecord(ul_dbh, _d, _aor);
if (r) {
*_r = r;
Expand Down Expand Up @@ -1571,7 +1570,7 @@ int delete_urecord(udomain_t* _d, str* _aor, struct urecord* _r,
}
free_urecord(_r);
return 0;
case CM_CORE_CACHEDB_ONLY:
case CM_CACHEDB_ONLY:
if (!_r)
get_static_urecord(_d, _aor, &_r);
if (cdb_delete_urecord(_r) < 0) {
Expand All @@ -1580,7 +1579,7 @@ int delete_urecord(udomain_t* _d, str* _aor, struct urecord* _r,
}
free_urecord(_r);
return 0;
case CM_EDGE_CACHEDB_ONLY:
case CM_FEDERATION_CACHEDB:
/* TODO */
abort();
default:
Expand All @@ -1606,7 +1605,7 @@ int delete_urecord(udomain_t* _d, str* _aor, struct urecord* _r,
if (_r->no_clear_ref > 0)
return 0;

if (!is_replicated && ul_replication_cluster)
if (!is_replicated && location_cluster)
replicate_urecord_delete(_r);

release_urecord(_r, is_replicated);
Expand Down
2 changes: 1 addition & 1 deletion modules/usrloc/ul_mi.c
Expand Up @@ -753,7 +753,7 @@ struct mi_root* mi_usrloc_sync(struct mi_root *cmd, void *param)
*/
struct mi_root* mi_usrloc_cl_sync(struct mi_root *cmd, void *param)
{
if (clusterer_api.request_sync(&contact_repl_cap, ul_replication_cluster) < 0)
if (clusterer_api.request_sync(&contact_repl_cap, location_cluster) < 0)
return init_mi_tree(400, MI_SSTR("Failed to send sync request"));
else
return init_mi_tree(200, MI_SSTR(MI_OK));
Expand Down
126 changes: 75 additions & 51 deletions modules/usrloc/ul_mod.c
Expand Up @@ -155,8 +155,19 @@ unsigned int nat_bflag = (unsigned int)-1;
static char *nat_bflag_str = 0;
unsigned int init_flag = 0;

/* usrloc data replication using the clusterer module */
int ul_replication_cluster = 0;
/**
* A global SIP user location cluster. It must include all
* OpenSIPS user location nodes, regardless whether some of them form
* virtual IP pairs or not. Depending on the @cluster_mode, the behavior
* of the current node with regards to the @location_cluster may shift.
*
* For example, we may either:
* - fully broadcast contact updates to all nodes
* - broadcast contact updates to local nodes only
* - not directly broadcast any contact data at all
* (although it gets published into a global NoSQL cluster)
*/
int location_cluster;

db_con_t* ul_dbh = 0; /* Database connection handle */
db_func_t ul_dbf;
Expand Down Expand Up @@ -215,7 +226,7 @@ static param_export_t params[] = {
{"nat_bflag", STR_PARAM, &nat_bflag_str },
{"nat_bflag", INT_PARAM, &nat_bflag },
/* data replication through clusterer using TCP binary packets */
{ "contact_replication_cluster", INT_PARAM, &ul_replication_cluster },
{ "location_cluster", INT_PARAM, &location_cluster },
{ "skip_replicated_db_ops", INT_PARAM, &skip_replicated_db_ops },
{ "max_contact_delete", INT_PARAM, &max_contact_delete },
{ "regen_broken_contactid", INT_PARAM, &cid_regen},
Expand Down Expand Up @@ -288,7 +299,7 @@ static dep_export_t deps = {
{"working_mode_preset", get_deps_wmode_preset},
{"cluster_mode", get_deps_wmode_preset},
{"restart_persistency", get_deps_rr_persist},
{"contact_replication_cluster", get_deps_clusterer},
{"location_cluster", get_deps_clusterer},
{NULL, NULL},
},
};
Expand Down Expand Up @@ -459,26 +470,26 @@ static int mod_init(void)
return -1;
}

if (ul_replication_cluster < 0) {
if (location_cluster < 0) {
LM_ERR("Invalid cluster id to replicate contacts to, must be 0 or "
"a positive number\n");
return -1;
}

if (ul_replication_cluster) {
if (location_cluster) {
if (load_clusterer_api(&clusterer_api) != 0) {
LM_DBG("failed to find clusterer API - is clusterer module loaded?\n");
return -1;
}

/* register handler for processing usrloc packets to the clusterer module */
if (clusterer_api.register_capability(&contact_repl_cap,
receive_binary_packets, receive_cluster_event, ul_replication_cluster) < 0) {
receive_binary_packets, receive_cluster_event, location_cluster) < 0) {
LM_ERR("cannot register callbacks to clusterer module!\n");
return -1;
}

if (clusterer_api.request_sync(&contact_repl_cap, ul_replication_cluster) < 0)
if (clusterer_api.request_sync(&contact_repl_cap, location_cluster) < 0)
LM_ERR("Sync request failed\n");
}

Expand Down Expand Up @@ -667,35 +678,35 @@ int check_runtime_config(void)
cluster_mode = CM_SQL_ONLY;
rr_persist = RRP_NONE;
sql_wmode = SQL_NO_WRITE;
} else if (!strcasecmp(runtime_preset, "edge-cluster")) {
cluster_mode = CM_EDGE;
} else if (!strcasecmp(runtime_preset, "federation-cluster")) {
cluster_mode = CM_FEDERATION;
rr_persist = RRP_SYNC_FROM_CLUSTER;
sql_wmode = SQL_NO_WRITE;
} else if (!strcasecmp(runtime_preset, "edge-cluster-cachedb-only")) {
cluster_mode = CM_EDGE_CACHEDB_ONLY;
rr_persist = RRP_NONE;
} else if (!strcasecmp(runtime_preset, "federation-cachedb-cluster")) {
cluster_mode = CM_FEDERATION_CACHEDB;
rr_persist = RRP_SYNC_FROM_CLUSTER;
sql_wmode = SQL_NO_WRITE;
} else if (!strcasecmp(runtime_preset, "core-cluster")) {
cluster_mode = CM_CORE;
} else if (!strcasecmp(runtime_preset, "full-mirroring-cluster")) {
cluster_mode = CM_FULL_MIRRORING;
rr_persist = RRP_SYNC_FROM_CLUSTER;
sql_wmode = SQL_NO_WRITE;
} else if (!strcasecmp(runtime_preset, "core-cluster-cachedb-only")) {
cluster_mode = CM_CORE_CACHEDB_ONLY;
} else if (!strcasecmp(runtime_preset, "cachedb-only-cluster")) {
cluster_mode = CM_CACHEDB_ONLY;
rr_persist = RRP_NONE;
sql_wmode = SQL_NO_WRITE;
}
} else {
if (cluster_mode_str) {
if (!strcasecmp(cluster_mode_str, "none"))
cluster_mode = CM_NONE;
else if (!strcasecmp(cluster_mode_str, "edge"))
cluster_mode = CM_EDGE;
else if (!strcasecmp(cluster_mode_str, "edge-cachedb-only"))
cluster_mode = CM_EDGE_CACHEDB_ONLY;
else if (!strcasecmp(cluster_mode_str, "core"))
cluster_mode = CM_CORE;
else if (!strcasecmp(cluster_mode_str, "core-cachedb-only"))
cluster_mode = CM_CORE_CACHEDB_ONLY;
else if (!strcasecmp(cluster_mode_str, "federation"))
cluster_mode = CM_FEDERATION;
else if (!strcasecmp(cluster_mode_str, "federation-cachedb"))
cluster_mode = CM_FEDERATION_CACHEDB;
else if (!strcasecmp(cluster_mode_str, "full-mirroring"))
cluster_mode = CM_FULL_MIRRORING;
else if (!strcasecmp(cluster_mode_str, "cachedb-only"))
cluster_mode = CM_CACHEDB_ONLY;
else if (!strcasecmp(cluster_mode_str, "sql-only"))
cluster_mode = CM_SQL_ONLY;
else
Expand Down Expand Up @@ -746,43 +757,55 @@ int check_runtime_config(void)
}

if (rr_persist != RRP_LOAD_FROM_SQL && sql_wmode != SQL_NO_WRITE) {
LM_ERR("the 'sql_write_mode' can only be set with an "
"SQL-based restart persistency!\n");
return -1;
LM_WARN("the 'sql_write_mode' only makes sense with an "
"SQL-based restart persistency -- ignoring...\n");
sql_wmode = SQL_NO_WRITE;
} else if (rr_persist == RRP_LOAD_FROM_SQL && sql_wmode == SQL_NO_WRITE) {
LM_WARN("using SQL restart persistency without an 'sql_write_mode' "
"- defaulting to 'write-back'...\n");
sql_wmode = SQL_WRITE_BACK;
}

if (cluster_mode == CM_EDGE || cluster_mode == CM_EDGE_CACHEDB_ONLY) {
LM_ERR("built-in edge clustering not implemented yet! :(\n");
return -1;
}

if (cluster_mode == CM_NONE) {
switch (cluster_mode) {
case CM_NONE:
if (rr_persist == RRP_SYNC_FROM_CLUSTER) {
LM_ERR("cannot sync from cluster without first "
"enabling a clustering mode!\n");
return -1;
}

if (ul_replication_cluster) {
LM_WARN("a 'contact_replication_cluster' has been defined with "
if (location_cluster) {
LM_WARN("a 'location_cluster' has been defined with "
"a single-instance clustering mode! ignoring...\n");

ul_replication_cluster = 0;
location_cluster = 0;
}
}
break;

case CM_FEDERATION:
LM_ERR("usrloc 'federation' clustering not implemented yet! :(\n");
return -1;

if (cluster_mode == CM_CORE) {
if (!ul_replication_cluster) {
LM_ERR("'contact_replication_cluster' is not set!\n");
case CM_FEDERATION_CACHEDB:
if (!location_cluster) {
LM_ERR("'location_cluster' is not set!\n");
return -1;
}
}

if (cluster_mode == CM_CORE_CACHEDB_ONLY) {
if (!cdb_url.s) {
LM_ERR("no cache database URL defined! ('cdb_url')\n");
return -1;
}
break;

case CM_FULL_MIRRORING:
if (!location_cluster) {
LM_ERR("'location_cluster' is not set!\n");
return -1;
}
break;

case CM_CACHEDB_ONLY:
if (rr_persist != RRP_NONE) {
LM_WARN("externally managed data is already restart persistent!"
" -- auto-disabling 'restart_persistency'\n");
Expand All @@ -795,18 +818,18 @@ int check_runtime_config(void)
sql_wmode = SQL_NO_WRITE;
}

if (!ul_replication_cluster) {
LM_ERR("'contact_replication_cluster' is not set!\n");
if (!location_cluster) {
LM_ERR("'location_cluster' is not set!\n");
return -1;
}

if (!cdb_url.s) {
LM_ERR("no cache database URL defined! ('cdb_url')\n");
return -1;
}
}
break;

if (cluster_mode == CM_SQL_ONLY) {
case CM_SQL_ONLY:
if (rr_persist != RRP_NONE) {
LM_WARN("externally managed data is already restart persistent!"
" -- auto-disabling 'restart_persistency'\n");
Expand All @@ -819,12 +842,13 @@ int check_runtime_config(void)
sql_wmode = SQL_NO_WRITE;
}

if (ul_replication_cluster) {
LM_WARN("a 'contact_replication_cluster' has been defined with "
"a single-instance clustering mode! ignoring...\n");
if (location_cluster) {
LM_WARN("a 'location_cluster' has been defined although "
"it is not required! ignoring...\n");

ul_replication_cluster = 0;
location_cluster = 0;
}
break;
}

return 0;
Expand Down

0 comments on commit 1773ecf

Please sign in to comment.