Skip to content

Commit

Permalink
usrloc: sync contacts from another node in the replication cluster
Browse files Browse the repository at this point in the history
Contacts are synchronized at startup or through an MI command.
A valid source node is established by the clusterer module.
  • Loading branch information
rvlad-patrascu committed Jan 12, 2018
1 parent 00172f4 commit 006eea5
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 29 deletions.
13 changes: 12 additions & 1 deletion modules/usrloc/ul_mi.c
Expand Up @@ -44,7 +44,7 @@
#include "utime.h"
#include "ul_mod.h"
#include "usrloc.h"

#include "ureplication.h"


#define MI_UL_CSEQ 1
Expand Down Expand Up @@ -745,3 +745,14 @@ struct mi_root* mi_usrloc_sync(struct mi_root *cmd, void *param)
return ret;
}
}

/*! \brief
* Expects the node id
*/
struct mi_root* mi_usrloc_cl_sync(struct mi_root *cmd, void *param)
{
if (clusterer_api.request_sync(&contact_repl_cap, accept_replicated_udata) < 0)
return init_mi_tree(400, MI_SSTR("Failed to send sync request"));
else
return init_mi_tree(200, MI_SSTR(MI_OK));
}
4 changes: 2 additions & 2 deletions modules/usrloc/ul_mi.h
Expand Up @@ -38,7 +38,7 @@
#define MI_USRLOC_ADD "ul_add"
#define MI_USRLOC_SHOW_CONTACT "ul_show_contact"
#define MI_USRLOC_SYNC "ul_sync"

#define MI_USRLOC_CL_SYNC "ul_cluster_sync"


struct mi_root* mi_usrloc_rm_aor(struct mi_root *cmd, void *param);
Expand All @@ -53,6 +53,6 @@ struct mi_root* mi_usrloc_add(struct mi_root *cmd, void *param);

struct mi_root* mi_usrloc_show_contact(struct mi_root *cmd, void *param);
struct mi_root* mi_usrloc_sync(struct mi_root *cmd, void *param);

struct mi_root* mi_usrloc_cl_sync(struct mi_root *cmd, void *param);

#endif
10 changes: 8 additions & 2 deletions modules/usrloc/ul_mod.c
Expand Up @@ -217,6 +217,8 @@ static mi_export_t mi_cmds[] = {
mi_child_init },
{ MI_USRLOC_SYNC, 0, mi_usrloc_sync, 0, 0,
mi_child_init },
{ MI_USRLOC_CL_SYNC, 0, mi_usrloc_cl_sync, MI_NO_INPUT_FLAG, 0,
mi_child_init },
{ 0, 0, 0, 0, 0, 0}
};

Expand Down Expand Up @@ -416,11 +418,15 @@ static int mod_init(void)

/* register handler for processing usrloc packets to the clusterer module */
if (accept_replicated_udata && clusterer_api.register_capability(&contact_repl_cap,
receive_binary_packet, NULL, ul_repl_auth_check, accept_replicated_udata) < 0) {
LM_ERR("cannot register binary packet callback to clusterer module!\n");
receive_binary_packets, receive_cluster_event, ul_repl_auth_check,
accept_replicated_udata) < 0) {
LM_ERR("cannot register callbacks to clusterer module!\n");
return -1;
}

if (clusterer_api.request_sync(&contact_repl_cap, accept_replicated_udata) < 0)
LM_ERR("Sync request failed\n");

init_flag = 1;

return 0;
Expand Down
161 changes: 138 additions & 23 deletions modules/usrloc/ureplication.c
Expand Up @@ -616,34 +616,149 @@ static int receive_ucontact_delete(bin_packet_t *packet)
return -1;
}

static int receive_sync_packet(bin_packet_t *packet)
{
int is_contact;
int rc = -1;

while (clusterer_api.sync_chunk_iter(packet)) {
bin_pop_int(packet, &is_contact);
if (is_contact) {
if (receive_ucontact_insert(packet) == 0)
rc = 0;
} else
if (receive_urecord_insert(packet) == 0)
rc = 0;
}

return rc;
}

void receive_binary_packets(bin_packet_t *packet)
{
int rc;
bin_packet_t *pkt;

for (pkt = packet; pkt; pkt = pkt->next) {
LM_DBG("received a binary packet [%d]!\n", pkt->type);

switch (pkt->type) {
case REPL_URECORD_INSERT:
rc = receive_urecord_insert(pkt);
break;
case REPL_URECORD_DELETE:
rc = receive_urecord_delete(pkt);
break;
case REPL_UCONTACT_INSERT:
rc = receive_ucontact_insert(pkt);
break;
case REPL_UCONTACT_UPDATE:
rc = receive_ucontact_update(pkt);
break;
case REPL_UCONTACT_DELETE:
rc = receive_ucontact_delete(pkt);
break;
case SYNC_PACKET_TYPE:
rc = receive_sync_packet(pkt);
break;
default:
rc = -1;
LM_ERR("invalid usrloc binary packet type: %d\n", pkt->type);
}

LM_DBG("received a binary packet [%d]!\n", packet->type);

switch (packet->type) {
case REPL_URECORD_INSERT:
rc = receive_urecord_insert(packet);
break;
case REPL_URECORD_DELETE:
rc = receive_urecord_delete(packet);
break;
case REPL_UCONTACT_INSERT:
rc = receive_ucontact_insert(packet);
break;
case REPL_UCONTACT_UPDATE:
rc = receive_ucontact_update(packet);
break;
case REPL_UCONTACT_DELETE:
rc = receive_ucontact_delete(packet);
break;
default:
rc = -1;
LM_ERR("invalid usrloc binary packet type: %d\n", packet->type);
if (rc != 0)
LM_ERR("failed to process binary packet!\n");
}
}

if (rc != 0)
LM_ERR("failed to process binary packet!\n");
static int receive_sync_request(int node_id)
{
bin_packet_t *sync_packet;
dlist_t *dl;
udomain_t *dom;
map_iterator_t it;
struct urecord *r;
ucontact_t* c;
str st;
void **p;
int i;

for (dl = root; dl; dl = dl->next) {
dom = dl->d;
for(i = 0; i < dom->size; i++) {
lock_ulslot(dom, i);
for (map_first(dom->table[i].records, &it);
iterator_is_valid(&it);
iterator_next(&it)) {

p = iterator_val(&it);
if (p == NULL)
goto error_unlock;
r = (urecord_t *)*p;

sync_packet = clusterer_api.sync_chunk_start(&contact_repl_cap,
ul_replicate_cluster, node_id);
if (!sync_packet)
goto error_unlock;

/* urecord in this chunk */
bin_push_int(sync_packet, 0);

bin_push_str(sync_packet, r->domain);
bin_push_str(sync_packet, &r->aor);

for (c = r->contacts; c; c = c->next) {
sync_packet = clusterer_api.sync_chunk_start(&contact_repl_cap,
ul_replicate_cluster, node_id);
if (!sync_packet)
goto error_unlock;

/* contact in this chunk */
bin_push_int(sync_packet, 1);

bin_push_str(sync_packet, r->domain);
bin_push_str(sync_packet, &r->aor);
bin_push_str(sync_packet, &c->c);
bin_push_str(sync_packet, &c->callid);
bin_push_str(sync_packet, &c->user_agent);
bin_push_str(sync_packet, &c->path);
bin_push_str(sync_packet, &c->attr);
bin_push_str(sync_packet, &c->received);
bin_push_str(sync_packet, &c->instance);

st.s = (char *)&c->expires;
st.len = sizeof c->expires;
bin_push_str(sync_packet, &st);

st.s = (char *)&c->q;
st.len = sizeof c->q;
bin_push_str(sync_packet, &st);

bin_push_str(sync_packet, c->sock?&c->sock->sock_str:NULL);
bin_push_int(sync_packet, c->cseq);
bin_push_int(sync_packet, c->flags);
bin_push_int(sync_packet, c->cflags);
bin_push_int(sync_packet, c->methods);

st.s = (char *)&c->last_modified;
st.len = sizeof c->last_modified;
bin_push_str(sync_packet, &st);
}
}
unlock_ulslot(dom, i);
}
}

return 0;

error_unlock:
unlock_ulslot(dom, i);
return -1;
}

void receive_cluster_event(enum clusterer_event ev, int node_id)
{
if (ev == SYNC_REQ_RCV && receive_sync_request(node_id) < 0)
LM_ERR("Failed to send sync data to node: %d\n", node_id);
}

3 changes: 2 additions & 1 deletion modules/usrloc/ureplication.h
Expand Up @@ -57,7 +57,8 @@ void replicate_ucontact_insert(urecord_t *r, str *contact, ucontact_info_t *ci);
void replicate_ucontact_update(urecord_t *r, str *contact, ucontact_info_t *ci);
void replicate_ucontact_delete(urecord_t *r, ucontact_t *c);

void receive_binary_packet(bin_packet_t *packet);
void receive_binary_packets(bin_packet_t *packet);
void receive_cluster_event(enum clusterer_event ev, int node_id);

#endif /* _USRLOC_REPLICATION_H_ */

0 comments on commit 006eea5

Please sign in to comment.