Skip to content

Commit

Permalink
dialog: fix incorrectly discarded dialogs after sync
Browse files Browse the repository at this point in the history
Syncing while the donor node is also sending live replication packets
may lead to incorrectly discarding some of the newly received dialogs.
There were two ways in which new dialogs were actually mistaken as old,
"local" dialogs from before syncing:
* overwriting the dialog flags when handling a replicated update;
* not marking live replicated dialogs received during sync as "new" dialogs.

(cherry picked from commit 46e9a53)
  • Loading branch information
rvlad-patrascu committed Jul 28, 2022
1 parent 6aea5c5 commit edc90fb
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 37 deletions.
36 changes: 2 additions & 34 deletions modules/dialog/dialog.c
Expand Up @@ -830,42 +830,10 @@ static int mod_init(void)
return -1;
}

/* check params and register to clusterer for dialogs and
* profiles replication */
if (dialog_repl_cluster < 0) {
LM_ERR("Invalid dialog_replication_cluster, must be 0 or "
"a positive cluster id\n");
if (dlg_init_clustering() < 0) {
LM_ERR("Failed to initialize clustering\n");
return -1;
}
if (profile_repl_cluster < 0) {
LM_ERR("Invalid profile_repl_cluster, must be 0 or "
"a positive cluster id\n");
return -1;
}

if ((dialog_repl_cluster || profile_repl_cluster) &&
(load_clusterer_api(&clusterer_api) < 0)) {
LM_DBG("failed to load clusterer API - is clusterer module loaded?\n");
return -1;
}

if (profile_repl_cluster && clusterer_api.register_capability(
&prof_repl_cap, receive_prof_repl, NULL, profile_repl_cluster, 0,
NODE_CMP_ANY) < 0) {
LM_ERR("Cannot register clusterer callback for profile replication!\n");
return -1;
}

if (dialog_repl_cluster) {
if (clusterer_api.register_capability(&dlg_repl_cap, receive_dlg_repl,
rcv_cluster_event, dialog_repl_cluster, 1, NODE_CMP_ANY) < 0) {
LM_ERR("Cannot register clusterer callback for dialog replication!\n");
return -1;
}

if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster, 0) < 0)
LM_ERR("Sync request failed\n");
}

if ( register_timer( "dlg-timer", dlg_timer_routine, NULL, 1,
TIMER_FLAG_DELAY_ON_DELAY)<0 ) {
Expand Down
69 changes: 66 additions & 3 deletions modules/dialog/dlg_replication.c
Expand Up @@ -45,6 +45,8 @@ struct clusterer_binds clusterer_api;

str shtag_dlg_val = str_init("dlgX_shtag");

char *dlg_sync_in_progress;

static int get_shtag_sync_status(struct dlg_cell *dlg);

static struct socket_info * fetch_socket_info(str *addr)
Expand Down Expand Up @@ -120,6 +122,56 @@ static struct dlg_cell *lookup_dlg_unsafe(unsigned int h_entry, unsigned int h_i
return 0;
}

int dlg_init_clustering(void)
{
/* check params and register to clusterer for dialogs and
* profiles replication */
if (dialog_repl_cluster < 0) {
LM_ERR("Invalid dialog_replication_cluster, must be 0 or "
"a positive cluster id\n");
return -1;
}
if (profile_repl_cluster < 0) {
LM_ERR("Invalid profile_repl_cluster, must be 0 or "
"a positive cluster id\n");
return -1;
}

if ((dialog_repl_cluster || profile_repl_cluster) &&
(load_clusterer_api(&clusterer_api) < 0)) {
LM_DBG("failed to load clusterer API - is clusterer module loaded?\n");
return -1;
}

if (profile_repl_cluster && clusterer_api.register_capability(
&prof_repl_cap, receive_prof_repl, NULL, profile_repl_cluster, 0,
NODE_CMP_ANY) < 0) {
LM_ERR("Cannot register clusterer callback for profile replication!\n");
return -1;
}

if (dialog_repl_cluster) {
if (clusterer_api.register_capability(&dlg_repl_cap, receive_dlg_repl,
rcv_cluster_event, dialog_repl_cluster, 1, NODE_CMP_ANY) < 0) {
LM_ERR("Cannot register clusterer callback for dialog replication!\n");
return -1;
}

dlg_sync_in_progress = shm_malloc(sizeof *dlg_sync_in_progress);
if (*dlg_sync_in_progress) {
LM_ERR("no more shm memory!\n");
return -1;
}

*dlg_sync_in_progress = 1;
if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster, 0) < 0)
LM_ERR("Sync request failed\n");

}

return 0;
}

/* Binary Packet receiving functions */

/**
Expand Down Expand Up @@ -176,8 +228,9 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
/* unmark dlg as loaded from DB (otherwise it would have been
* dropped later when syncing from cluster is done) */
dlg->flags &= ~DLG_FLAG_FROM_DB;
if (from_sync)
if (from_sync || *dlg_sync_in_progress)
dlg->flags |= DLG_FLAG_SYNCED;

dlg_unlock(d_table, d_entry);
return 0;
}
Expand Down Expand Up @@ -327,6 +380,10 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,

return 0;
}
} else if (*dlg_sync_in_progress) {
/* dialogs received after sync started and until SYNC_DONE callback
* is run should never be dropped as if they were "local" dialogs */
dlg->flags |= DLG_FLAG_SYNCED;
}

if (dlg_db_mode == DB_MODE_DELAYED) {
Expand Down Expand Up @@ -389,7 +446,7 @@ int dlg_replicated_update(bin_packet_t *packet)
int timeout, h_entry;
str st;
struct dlg_entry *d_entry;
int rcv_flags, save_new_flag;
int rcv_flags, save_new_flag, save_sync_flag;
unsigned int h_id;
short pkg_ver = get_bin_pkg_version(packet);

Expand Down Expand Up @@ -490,8 +547,10 @@ int dlg_replicated_update(bin_packet_t *packet)
/* make sure an update received immediately after a create can't
* incorrectly erase the DLG_FLAG_NEW before locally writing to DB */
save_new_flag = dlg->flags & DLG_FLAG_NEW;
save_sync_flag = dlg->flags & DLG_FLAG_SYNCED;
dlg->flags = rcv_flags;
dlg->flags |= ((save_new_flag ? DLG_FLAG_NEW : 0) | DLG_FLAG_CHANGED);
dlg->flags |= ((save_new_flag ? DLG_FLAG_NEW : 0) |
(save_sync_flag ? DLG_FLAG_SYNCED : 0) | DLG_FLAG_CHANGED);

bin_pop_int(packet, &timeout);
bin_skip_int(packet, 2);
Expand Down Expand Up @@ -1165,6 +1224,8 @@ void rcv_cluster_event(enum clusterer_event ev, int node_id)
}
dlg_unlock(d_table, &d_table->entries[i]);
}

*dlg_sync_in_progress = 0;
} else if (ev == CLUSTER_NODE_UP) {
if (cluster_auto_sync) {
if ((sync_required = clusterer_api.shtag_sync_all_backup(
Expand All @@ -1176,6 +1237,7 @@ void rcv_cluster_event(enum clusterer_event ev, int node_id)
if (sync_required) {
LM_DBG("Requesting sync for dialogs marked with backup "
"sharing tags\n");
*dlg_sync_in_progress = 1;
rc = clusterer_api.request_sync(&dlg_repl_cap,
dialog_repl_cluster, 1);
if (rc < 0)
Expand Down Expand Up @@ -1678,6 +1740,7 @@ mi_response_t *mi_sync_cl_dlg(const mi_params_t *params,
}
}

*dlg_sync_in_progress = 1;
rc = clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster, 0);

if (rc < 0)
Expand Down
2 changes: 2 additions & 0 deletions modules/dialog/dlg_replication.h
Expand Up @@ -50,6 +50,8 @@ extern str shtag_dlg_val;

extern int cluster_auto_sync;

int dlg_init_clustering(void);

void replicate_dialog_created(struct dlg_cell *dlg);
void replicate_dialog_updated(struct dlg_cell *dlg);
void replicate_dialog_deleted(struct dlg_cell *dlg);
Expand Down

0 comments on commit edc90fb

Please sign in to comment.