Skip to content

Commit

Permalink
dialog: sync dialogs from another node at startup
Browse files Browse the repository at this point in the history
The node to sync the dialogs from is chosen by the clusterer module.
Syncing is also possible at runtime through an MI command.
  • Loading branch information
rvlad-patrascu committed Mar 27, 2018
1 parent edd2064 commit db592fd
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 112 deletions.
18 changes: 12 additions & 6 deletions modules/dialog/dialog.c
Expand Up @@ -305,6 +305,7 @@ static mi_export_t mi_cmds[] = {
{ "dlg_end_dlg", 0, mi_terminate_dlg, 0, 0, 0},
{ "dlg_db_sync", 0, mi_sync_db_dlg, 0, 0, 0},
{ "dlg_restore_db", 0, mi_restore_dlg_db, 0, 0, 0},
{ "dlg_cluster_sync", 0, mi_sync_cl_dlg, 0, 0, 0},
{ "profile_get_size", 0, mi_get_profile, 0, 0, 0},
{ "profile_list_dlgs", 0, mi_profile_list, 0, 0, 0},
{ "profile_get_values", 0, mi_get_profile_values, 0, 0, 0},
Expand Down Expand Up @@ -862,18 +863,23 @@ static int mod_init(void)
return -1;
}

if (dialog_repl_cluster && clusterer_api.register_capability(
&dlg_repl_cap, receive_dlg_repl, NULL, dialog_repl_cluster) < 0) {
LM_ERR("Cannot register clusterer callback for dialog replication!\n");
return -1;
}

if (profile_repl_cluster && clusterer_api.register_capability(
&prof_repl_cap, receive_prof_repl, NULL, profile_repl_cluster) < 0) {
LM_ERR("Cannot register clusterer callback for profile replication!\n");
return -1;
}

if (dialog_repl_cluster) {
if (clusterer_api.register_capability(&dlg_repl_cap, receive_dlg_repl,
rcv_cluster_event, dialog_repl_cluster) < 0) {
LM_ERR("Cannot register clusterer callback for dialog replication!\n");
return -1;
}

if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster) < 0)
LM_ERR("Sync request failed\n");
}

if ( register_timer( "dlg-timer", dlg_timer_routine, NULL, 1,
TIMER_FLAG_DELAY_ON_DELAY)<0 ) {
LM_ERR("failed to register timer\n");
Expand Down
226 changes: 120 additions & 106 deletions modules/dialog/dlg_replication.c
Expand Up @@ -158,7 +158,7 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell, str *ftag
callee_sock = fetch_socket_info(&sock);

if (!caller_sock || !callee_sock) {
LM_ERR("Dialog in DB doesn't match any listening sockets\n");
LM_ERR("Replicated dialog doesn't match any listening sockets\n");
goto pre_linking_error;
}

Expand Down Expand Up @@ -455,6 +455,54 @@ int dlg_replicated_delete(bin_packet_t *packet)
}
#undef DLG_BIN_POP

void bin_push_dlg(bin_packet_t *packet, struct dlg_cell *dlg)
{
int callee_leg;
str *vars, *profiles;

callee_leg = callee_idx(dlg);

bin_push_str(packet, &dlg->callid);
bin_push_str(packet, &dlg->legs[DLG_CALLER_LEG].tag);
bin_push_str(packet, &dlg->legs[callee_leg].tag);

bin_push_str(packet, &dlg->from_uri);
bin_push_str(packet, &dlg->to_uri);

bin_push_int(packet, dlg->h_id);
bin_push_int(packet, dlg->start_ts);
bin_push_int(packet, dlg->state);

bin_push_str(packet, &dlg->legs[DLG_CALLER_LEG].bind_addr->sock_str);
if (dlg->legs[callee_leg].bind_addr)
bin_push_str(packet, &dlg->legs[callee_leg].bind_addr->sock_str);
else
bin_push_str(packet, NULL);

bin_push_str(packet, &dlg->legs[DLG_CALLER_LEG].r_cseq);
bin_push_str(packet, &dlg->legs[callee_leg].r_cseq);
bin_push_str(packet, &dlg->legs[DLG_CALLER_LEG].route_set);
bin_push_str(packet, &dlg->legs[callee_leg].route_set);
bin_push_str(packet, &dlg->legs[DLG_CALLER_LEG].contact);
bin_push_str(packet, &dlg->legs[callee_leg].contact);
bin_push_str(packet, &dlg->legs[callee_leg].from_uri);
bin_push_str(packet, &dlg->legs[callee_leg].to_uri);

/* XXX: on shutdown only? */
vars = write_dialog_vars(dlg->vals);
profiles = write_dialog_profiles(dlg->profile_links);

bin_push_str(packet, vars);
bin_push_str(packet, profiles);
bin_push_int(packet, dlg->user_flags);
bin_push_int(packet, dlg->mod_flags);
bin_push_int(packet, dlg->flags &
~(DLG_FLAG_NEW|DLG_FLAG_CHANGED|DLG_FLAG_VP_CHANGED));
bin_push_int(packet, (unsigned int)time(0) + dlg->tl.timeout - get_ticks());
bin_push_int(packet, dlg->legs[DLG_CALLER_LEG].last_gen_cseq);
bin_push_int(packet, dlg->legs[callee_leg].last_gen_cseq);
}

/* Binary Packet sending functions */


Expand All @@ -464,12 +512,11 @@ int dlg_replicated_delete(bin_packet_t *packet)
*/
void replicate_dialog_created(struct dlg_cell *dlg)
{
int callee_leg;
str *vars, *profiles;
int rc;
bin_packet_t packet;

dlg_lock_dlg(dlg);

if (dlg->state != DLG_STATE_CONFIRMED_NA && dlg->state != DLG_STATE_CONFIRMED) {
/* we don't need to replicate when in deleted state */
LM_WARN("not replicating dlg create message due to bad state %d (%.*s)\n",
Expand All @@ -487,48 +534,10 @@ void replicate_dialog_created(struct dlg_cell *dlg)
if (bin_init(&packet, &dlg_repl_cap, REPLICATION_DLG_CREATED, BIN_VERSION, 0) != 0)
goto init_error;

callee_leg = callee_idx(dlg);

bin_push_str(&packet, &dlg->callid);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].tag);
bin_push_str(&packet, &dlg->legs[callee_leg].tag);

bin_push_str(&packet, &dlg->from_uri);
bin_push_str(&packet, &dlg->to_uri);

bin_push_int(&packet, dlg->h_id);
bin_push_int(&packet, dlg->start_ts);
bin_push_int(&packet, dlg->state);

bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].bind_addr->sock_str);
if (dlg->legs[callee_leg].bind_addr)
bin_push_str(&packet, &dlg->legs[callee_leg].bind_addr->sock_str);
else
bin_push_str(&packet, NULL);

bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].r_cseq);
bin_push_str(&packet, &dlg->legs[callee_leg].r_cseq);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].route_set);
bin_push_str(&packet, &dlg->legs[callee_leg].route_set);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].contact);
bin_push_str(&packet, &dlg->legs[callee_leg].contact);
bin_push_str(&packet, &dlg->legs[callee_leg].from_uri);
bin_push_str(&packet, &dlg->legs[callee_leg].to_uri);
bin_push_dlg(&packet, dlg);

/* XXX: on shutdown only? */
vars = write_dialog_vars(dlg->vals);
profiles = write_dialog_profiles(dlg->profile_links);

bin_push_str(&packet, vars);
bin_push_str(&packet, profiles);
bin_push_int(&packet, dlg->user_flags);
bin_push_int(&packet, dlg->mod_flags);
bin_push_int(&packet, dlg->flags &
~(DLG_FLAG_NEW|DLG_FLAG_CHANGED|DLG_FLAG_VP_CHANGED));
bin_push_int(&packet, (unsigned int)time(0) + dlg->tl.timeout - get_ticks());
bin_push_int(&packet, dlg->legs[DLG_CALLER_LEG].last_gen_cseq);
bin_push_int(&packet, dlg->legs[callee_leg].last_gen_cseq);
dlg->replicated = 1;

dlg_unlock_dlg(dlg);

rc = clusterer_api.send_all(&packet, dialog_repl_cluster);
Expand Down Expand Up @@ -567,8 +576,6 @@ void replicate_dialog_created(struct dlg_cell *dlg)
*/
void replicate_dialog_updated(struct dlg_cell *dlg)
{
int callee_leg;
str *vars, *profiles;
int rc;
bin_packet_t packet;

Expand All @@ -584,48 +591,10 @@ void replicate_dialog_updated(struct dlg_cell *dlg)
if (bin_init(&packet, &dlg_repl_cap, REPLICATION_DLG_UPDATED, BIN_VERSION, 0) != 0)
goto init_error;

callee_leg = callee_idx(dlg);

bin_push_str(&packet, &dlg->callid);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].tag);
bin_push_str(&packet, &dlg->legs[callee_leg].tag);

bin_push_str(&packet, &dlg->from_uri);
bin_push_str(&packet, &dlg->to_uri);

bin_push_int(&packet, dlg->h_id);
bin_push_int(&packet, dlg->start_ts);
bin_push_int(&packet, dlg->state);

bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].bind_addr->sock_str);
if (dlg->legs[callee_leg].bind_addr)
bin_push_str(&packet, &dlg->legs[callee_leg].bind_addr->sock_str);
else
bin_push_str(&packet, NULL);

bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].r_cseq);
bin_push_str(&packet, &dlg->legs[callee_leg].r_cseq);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].route_set);
bin_push_str(&packet, &dlg->legs[callee_leg].route_set);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].contact);
bin_push_str(&packet, &dlg->legs[callee_leg].contact);
bin_push_str(&packet, &dlg->legs[callee_leg].from_uri);
bin_push_str(&packet, &dlg->legs[callee_leg].to_uri);

/* XXX: on shutdown only? */
vars = write_dialog_vars(dlg->vals);
profiles = write_dialog_profiles(dlg->profile_links);
bin_push_dlg(&packet, dlg);

bin_push_str(&packet, vars);
bin_push_str(&packet, profiles);
bin_push_int(&packet, dlg->user_flags);
bin_push_int(&packet, dlg->mod_flags);
bin_push_int(&packet, dlg->flags &
~(DLG_FLAG_NEW|DLG_FLAG_CHANGED|DLG_FLAG_VP_CHANGED));
bin_push_int(&packet, (unsigned int)time(0) + dlg->tl.timeout - get_ticks());
bin_push_int(&packet, dlg->legs[DLG_CALLER_LEG].last_gen_cseq);
bin_push_int(&packet, dlg->legs[callee_leg].last_gen_cseq);
dlg->replicated = 1;

dlg_unlock_dlg(dlg);

rc = clusterer_api.send_all(&packet, dialog_repl_cluster);
Expand Down Expand Up @@ -700,34 +669,71 @@ void replicate_dialog_deleted(struct dlg_cell *dlg)
void receive_dlg_repl(bin_packet_t *packet)
{
int rc = 0;
bin_packet_t *pkt;

switch (packet->type) {
case REPLICATION_DLG_CREATED:
if (dialog_repl_cluster) {
rc = dlg_replicated_create(packet, NULL, NULL, NULL, 1);
for (pkt = packet; pkt; pkt = pkt->next) {
switch (pkt->type) {
case REPLICATION_DLG_CREATED:
rc = dlg_replicated_create(pkt, NULL, NULL, NULL, 1);
if_update_stat(dlg_enable_stats, create_recv, 1);
}
break;
case REPLICATION_DLG_UPDATED:
if (dialog_repl_cluster) {
rc = dlg_replicated_update(packet);
break;
case REPLICATION_DLG_UPDATED:
rc = dlg_replicated_update(pkt);
if_update_stat(dlg_enable_stats, update_recv, 1);
}
break;
case REPLICATION_DLG_DELETED:
if (dialog_repl_cluster) {
rc = dlg_replicated_delete(packet);
break;
case REPLICATION_DLG_DELETED:
rc = dlg_replicated_delete(pkt);
if_update_stat(dlg_enable_stats, delete_recv, 1);
break;
case SYNC_PACKET_TYPE:
while (clusterer_api.sync_chunk_iter(pkt))
if (dlg_replicated_create(pkt, NULL, NULL, NULL, 1) < 0) {
LM_ERR("Failed to process sync packet\n");
return;
}
break;
default:
rc = -1;
LM_WARN("Invalid dialog binary packet command: %d "
"(from node: %d in cluster: %d)\n", pkt->type, pkt->src_id,
dialog_repl_cluster);
}
break;
default:
rc = -1;
LM_WARN("Invalid dialog binary packet command: %d (from node: %d in cluster: %d)\n",
packet->type, packet->src_id, dialog_repl_cluster);

if (rc != 0)
LM_ERR("Failed to process a binary packet!\n");
}
}

static int receive_sync_request(int node_id)
{
int i;
struct dlg_cell *dlg;
bin_packet_t *sync_packet;

for (i = 0; i < d_table->size; i++) {
dlg_lock(d_table, &(d_table->entries[i]));
for (dlg = d_table->entries[i].first; dlg; dlg = dlg->next) {
sync_packet = clusterer_api.sync_chunk_start(&dlg_repl_cap,
dialog_repl_cluster, node_id);
if (!sync_packet)
goto error;

bin_push_dlg(sync_packet, dlg);
}
dlg_unlock(d_table, &(d_table->entries[i]));
}

if (rc != 0)
LM_ERR("Failed to process a binary packet!\n");
return 0;

error:
dlg_unlock(d_table, &(d_table->entries[i]));
return -1;
}

void rcv_cluster_event(enum clusterer_event ev, int node_id)
{
if (ev == SYNC_REQ_RCV && receive_sync_request(node_id) < 0)
LM_ERR("Failed to reply to sync request from node: %d\n", node_id);
}

/**
Expand Down Expand Up @@ -1180,3 +1186,11 @@ static void broadcast_profiles(utime_t ticks, void *param)
bin_free_packet(&packet);
#undef REPL_PROF_TRYSEND
}

struct mi_root* mi_sync_cl_dlg(struct mi_root *cmd, void *param)
{
if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster) < 0)
return init_mi_tree(400, MI_SSTR("Failed to send sync request"));
else
return init_mi_tree(200, MI_SSTR(MI_OK));
}
3 changes: 3 additions & 0 deletions modules/dialog/dlg_replication.h
Expand Up @@ -56,6 +56,9 @@ int dlg_replicated_update(bin_packet_t *packet);
int dlg_replicated_delete(bin_packet_t *packet);

void receive_dlg_repl(bin_packet_t *packet);
void rcv_cluster_event(enum clusterer_event ev, int node_id);

struct mi_root* mi_sync_cl_dlg(struct mi_root *cmd, void *param);

#endif /* _DIALOG_DLG_REPLICATION_H_ */

0 comments on commit db592fd

Please sign in to comment.