Skip to content

Commit

Permalink
dialog: Fix data reload race conditions on startup
Browse files Browse the repository at this point in the history
Since loading the data on child_init(), the load_dialog_info_from_db()
routines and rcv_cluster_event() routines could run in parallel, without
any synchronization on the dialog table, which could lead to duplicate
dialogs in the hash.

(cherry picked from commit aa93d0f)
  • Loading branch information
liviuchircu committed Jul 30, 2019
1 parent 765c521 commit a6345cc
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 67 deletions.
65 changes: 34 additions & 31 deletions modules/dialog/dlg_db_handler.c
Expand Up @@ -504,6 +504,7 @@ static int load_dialog_info_from_db(int dlg_hash_size)
db_row_t * rows;
int i, nr_rows;
struct dlg_cell *dlg;
struct dlg_entry *d_entry;
str callid, from_uri, to_uri, from_tag, to_tag;
str cseq1,cseq2,contact1,contact2,rroute1,rroute2,mangled_fu,mangled_tu;
int no_rows = 10;
Expand Down Expand Up @@ -560,13 +561,26 @@ static int load_dialog_info_from_db(int dlg_hash_size)

/*restore the dialog info*/
GET_STR_VALUE(callid, values, 1, 1, 0);
GET_STR_VALUE(from_uri, values, 2, 1, 0);
GET_STR_VALUE(from_tag, values, 3, 1, 0);
GET_STR_VALUE(to_tag, values, 5, 1, 0);

d_entry = &d_table->entries[hash_entry];
dlg_lock(d_table, d_entry);

if (get_dlg_unsafe(d_entry, &callid, &from_tag, &to_tag,
&dlg) == 0) {
dlg_unlock(d_table, d_entry);
LM_DBG("dialog already exists, skipping (ci: %.*s)\n",
callid.len, callid.s);
continue;
}

GET_STR_VALUE(from_uri, values, 2, 1, 0);
GET_STR_VALUE(to_uri, values, 4, 1, 0);

if((dlg=build_new_dlg(&callid, &from_uri, &to_uri, &from_tag))==0){
LM_ERR("failed to build new dialog\n");
goto error;
goto error_unlock;
}

if(dlg->h_entry != hash_entry){
Expand All @@ -577,11 +591,11 @@ static int load_dialog_info_from_db(int dlg_hash_size)
dialog_table_name.len, dialog_table_name.s,
dlg->h_entry,hash_entry);
shm_free(dlg);
goto error;
goto error_unlock;
}

/*link the dialog*/
link_dlg(dlg, 0);
/* link the dialog */
link_dlg_unsafe(d_entry, dlg);

dlg->h_id = hash_id;

Expand Down Expand Up @@ -618,7 +632,7 @@ static int load_dialog_info_from_db(int dlg_hash_size)
NULL, &cseq2, callee_sock,&mangled_fu,&mangled_tu,0,0)!=0) ) {
LM_ERR("dlg_set_leg_info failed\n");
/* destroy the dialog */
unref_dlg(dlg,1);
unref_dlg_unsafe(dlg, 1, d_entry);
continue;
}
dlg->legs_no[DLG_LEG_200OK] = DLG_FIRST_CALLEE_LEG;
Expand Down Expand Up @@ -676,12 +690,12 @@ static int load_dialog_info_from_db(int dlg_hash_size)
dlg->legs[callee_idx(dlg)].tag.len,
ZSW(dlg->legs[callee_idx(dlg)].tag.s));
/* destroy the dialog */
unref_dlg(dlg,1);
unref_dlg_unsafe(dlg, 1, d_entry);
continue;
}

/* reference the dialog as kept in the timer list */
ref_dlg(dlg,1);
ref_dlg_unsafe(dlg, 1);
LM_DBG("current dialog timeout is %u\n", dlg->tl.timeout);

dlg->lifetime = 0;
Expand All @@ -696,7 +710,7 @@ static int load_dialog_info_from_db(int dlg_hash_size)
LM_CRIT("Unable to insert dlg %p into ping timer\n",dlg);
else {
/* reference dialog as kept in ping timer list */
ref_dlg(dlg,1);
ref_dlg_unsafe(dlg, 1);
}
}

Expand All @@ -709,7 +723,7 @@ static int load_dialog_info_from_db(int dlg_hash_size)
"ping timer\n", dlg);
else {
/* reference dialog as kept in reinvite ping timer list */
ref_dlg(dlg,1);
ref_dlg_unsafe(dlg, 1);
}
}

Expand All @@ -721,13 +735,15 @@ static int load_dialog_info_from_db(int dlg_hash_size)

if (dlg_db_mode == DB_MODE_DELAYED) {
/* to be later removed by timer */
ref_dlg(dlg,1);
ref_dlg_unsafe(dlg, 1);
}

update_dlg_stats(dlg, +1);

dlg_unlock(d_table, d_entry);
run_load_callback_per_dlg(dlg);

next_dialog:
;
next_dialog:;
}

/* any more data to be fetched ?*/
Expand All @@ -748,6 +764,8 @@ static int load_dialog_info_from_db(int dlg_hash_size)
remove_ended_dlgs_from_db();
return 0;

error_unlock:
dlg_unlock(d_table, d_entry);
error:
dialog_dbf.free_result(dialog_db_handle, res);
if (found_ended_dlgs)
Expand Down Expand Up @@ -1679,7 +1697,7 @@ static int sync_dlg_db_mem(void)
db_val_t * values;
db_row_t * rows;
struct dlg_entry *d_entry;
struct dlg_cell *it,*known_dlg,*dlg=NULL;
struct dlg_cell *known_dlg, *dlg = NULL;
int i, nr_rows,callee_leg_idx,db_timeout;
int no_rows = 10;
unsigned int db_caller_cseq = 0, db_callee_cseq = 0;
Expand Down Expand Up @@ -1707,7 +1725,7 @@ static int sync_dlg_db_mem(void)

values = ROW_VALUES(rows + i);

if (VAL_NULL(values)) {
if (VAL_NULL(values) || VAL_TYPE(values) != DB_BIGINT) {
LM_ERR("column %.*s cannot be null -> skipping\n",
dlg_id_column.len, dlg_id_column.s);
continue;
Expand Down Expand Up @@ -1740,22 +1758,7 @@ static int sync_dlg_db_mem(void)
/* lock the whole entry */
dlg_lock( d_table, d_entry);

for (it=d_entry->first;it;it=it->next)
if (it->callid.len == callid.len &&
it->legs[DLG_CALLER_LEG].tag.len == from_tag.len &&
memcmp(it->callid.s,callid.s,callid.len)==0 &&
memcmp(it->legs[DLG_CALLER_LEG].tag.s,from_tag.s,from_tag.len)==0) {
/* callid & ftag match */
callee_leg_idx = callee_idx(it);
if (it->legs[callee_leg_idx].tag.len == to_tag.len &&
memcmp(it->legs[callee_leg_idx].tag.s,to_tag.s,to_tag.len)==0) {
/* full dlg match */
known_dlg = it;
break;
}
}

if (known_dlg == 0) {
if (get_dlg_unsafe(d_entry, &callid, &from_tag, &to_tag, &known_dlg) != 0) {
/* we can safely unlock here */
dlg_unlock( d_table, d_entry);
LM_DBG("First seen dialog - load all stuff - callid = [%.*s]\n",callid.len,callid.s);
Expand Down
20 changes: 5 additions & 15 deletions modules/dialog/dlg_hash.c
Expand Up @@ -893,25 +893,15 @@ void link_dlg(struct dlg_cell *dlg, int n)

d_entry = &(d_table->entries[dlg->h_entry]);

dlg_lock( d_table, d_entry);

dlg->h_id = d_entry->next_id++;
if (d_entry->first==0) {
d_entry->first = d_entry->last = dlg;
} else {
d_entry->last->next = dlg;
dlg->prev = d_entry->last;
d_entry->last = dlg;
}
dlg_lock(d_table, d_entry);

dlg->ref += 1 + n;
d_entry->cnt++;
link_dlg_unsafe(d_entry, dlg);
dlg->ref += n;

LM_DBG("ref dlg %p with %d -> %d in h_entry %p - %d \n", dlg, n+1, dlg->ref,
d_entry,dlg->h_entry);
LM_DBG("ref dlg %p with %d -> %d in h_entry %p - %d \n",
dlg, n + 1, dlg->ref, d_entry, dlg->h_entry);

dlg_unlock( d_table, d_entry);
return;
}


Expand Down
56 changes: 56 additions & 0 deletions modules/dialog/dlg_hash.h
Expand Up @@ -379,6 +379,25 @@ struct dlg_cell* get_dlg_by_callid( str *callid, int active_only);

void link_dlg(struct dlg_cell *dlg, int n);

#define _link_dlg_unsafe(d_entry, dlg) \
do { \
if (!d_entry->first) { \
d_entry->first = d_entry->last = dlg; \
} else { \
d_entry->last->next = dlg; \
dlg->prev = d_entry->last; \
d_entry->last = dlg; \
} \
dlg->ref++; \
d_entry->cnt++; \
} while (0)

#define link_dlg_unsafe(d_entry, dlg) \
do { \
dlg->h_id = d_entry->next_id++; \
_link_dlg_unsafe(d_entry, dlg); \
} while (0)

void unref_dlg(struct dlg_cell *dlg, unsigned int cnt);

void ref_dlg(struct dlg_cell *dlg, unsigned int cnt);
Expand Down Expand Up @@ -533,6 +552,43 @@ static inline int match_dialog(struct dlg_cell *dlg, str *callid,
*/
}

/* @return: 0 if found, -1 otherwise */
static inline int get_dlg_unsafe(struct dlg_entry *d_entry,
str *callid, str *from_tag, str *to_tag, struct dlg_cell **out_dlg)
{
struct dlg_cell *it;
int callee_leg_idx;

for (it = d_entry->first; it; it = it->next) {
if (it->callid.len == callid->len &&
it->legs[DLG_CALLER_LEG].tag.len == from_tag->len &&
!memcmp(it->callid.s, callid->s, callid->len) &&
!memcmp(it->legs[DLG_CALLER_LEG].tag.s, from_tag->s, from_tag->len)) {
/* callid & ftag match */
callee_leg_idx = callee_idx(it);
if (it->legs[callee_leg_idx].tag.len == to_tag->len &&
!memcmp(it->legs[callee_leg_idx].tag.s, to_tag->s, to_tag->len)) {
/* full dlg match */
*out_dlg = it;
return 0;
}
}
}

*out_dlg = NULL;
return -1;
}

static inline void update_dlg_stats(struct dlg_cell *dlg, int amount)
{
if (dlg->state == DLG_STATE_CONFIRMED_NA ||
dlg->state==DLG_STATE_CONFIRMED) {
if_update_stat(dlg_enable_stats, active_dlgs, amount);
} else if (dlg->state == DLG_STATE_EARLY) {
if_update_stat(dlg_enable_stats, early_dlgs, amount);
}
}

int mi_print_dlg(mi_item_t *dialog_obj, struct dlg_cell *dlg, int with_context);

static inline void init_dlg_term_reason(struct dlg_cell *dlg,char *reason,int reason_len)
Expand Down
25 changes: 4 additions & 21 deletions modules/dialog/dlg_replication.c
Expand Up @@ -93,7 +93,6 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
str *ftag, str *ttag, int safe)
{
int h_entry;
unsigned int dir, dst_leg;
str callid = { NULL, 0 }, from_uri, to_uri, from_tag, to_tag;
str cseq1, cseq2, contact1, contact2, adv_ct1, adv_ct2;
str rroute1, rroute2, mangled_fu, mangled_tu;
Expand All @@ -113,18 +112,15 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
DLG_BIN_POP(str, packet, from_uri, malformed);
DLG_BIN_POP(str, packet, to_uri, malformed);

dlg = get_dlg(&callid, &from_tag, &to_tag, &dir, &dst_leg);

h_entry = dlg_hash(&callid);
d_entry = &d_table->entries[h_entry];

if (safe)
dlg_lock(d_table, d_entry);

if (dlg) {
if (get_dlg_unsafe(d_entry, &callid, &from_tag, &to_tag, &dlg) == 0) {
LM_DBG("Dialog with ci '%.*s' is already created\n",
callid.len, callid.s);
unref_dlg_unsafe(dlg, 1, d_entry);
callid.len, callid.s);
/* unmark dlg as loaded from DB (otherwise it would have been
* dropped later when syncing from cluster is done) */
dlg->flags &= ~DLG_FLAG_FROM_DB;
Expand Down Expand Up @@ -203,15 +199,7 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
dlg->legs_no[DLG_LEG_200OK] = DLG_FIRST_CALLEE_LEG;

/* link the dialog into the hash */
if (!d_entry->first)
d_entry->first = d_entry->last = dlg;
else {
d_entry->last->next = dlg;
dlg->prev = d_entry->last;
d_entry->last = dlg;
}
dlg->ref++;
d_entry->cnt++;
_link_dlg_unsafe(d_entry, dlg);

DLG_BIN_POP(str, packet, vars, pre_linking_error);
DLG_BIN_POP(str, packet, profiles, pre_linking_error);
Expand Down Expand Up @@ -319,7 +307,6 @@ int dlg_replicated_update(bin_packet_t *packet)
{
struct dlg_cell *dlg;
str call_id, from_tag, to_tag, from_uri, to_uri, vars, profiles;
unsigned int dir, dst_leg;
int timeout, h_entry;
str st;
struct dlg_entry *d_entry;
Expand All @@ -335,14 +322,12 @@ int dlg_replicated_update(bin_packet_t *packet)
call_id.len, call_id.s, from_tag.len, from_tag.s, to_tag.len, to_tag.s,
from_uri.len, from_uri.s, to_uri.len, to_uri.s);

dlg = get_dlg(&call_id, &from_tag, &to_tag, &dir, &dst_leg);

h_entry = dlg_hash(&call_id);
d_entry = &d_table->entries[h_entry];

dlg_lock(d_table, d_entry);

if (!dlg) {
if (get_dlg_unsafe(d_entry, &call_id, &from_tag, &to_tag, &dlg) != 0) {
LM_DBG("dialog not found, building new\n");

dlg = build_new_dlg(&call_id, &from_uri, &to_uri, &from_tag);
Expand Down Expand Up @@ -406,8 +391,6 @@ int dlg_replicated_update(bin_packet_t *packet)
}
}

unref_dlg_unsafe(dlg, 1, d_entry);

if (vars.s && vars.len != 0) {
read_dialog_vars(vars.s, vars.len, dlg);
run_dlg_callbacks(DLGCB_PROCESS_VARS, dlg,
Expand Down

0 comments on commit a6345cc

Please sign in to comment.