Skip to content

Commit

Permalink
dialog: match replicated dialogs by did
Browse files Browse the repository at this point in the history
This fixes issues in call looping scenarios when trying to replicate
multiple dialogs with the same SIP coordinates.

(cherry picked from commit b3c31fb)
  • Loading branch information
rvlad-patrascu committed Sep 24, 2021
1 parent 7917c42 commit 63359e8
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 26 deletions.
149 changes: 126 additions & 23 deletions modules/dialog/dlg_replication.c
Expand Up @@ -93,14 +93,39 @@ do { \
_dlg->rt_ ## _type = 0; \
} while(0)

static struct dlg_cell *lookup_dlg_unsafe(unsigned int h_entry, unsigned int h_id)
{
struct dlg_cell *dlg;
struct dlg_entry *d_entry;

if (h_entry>=d_table->size)
goto not_found;

d_entry = &(d_table->entries[h_entry]);

for( dlg=d_entry->first ; dlg ; dlg=dlg->next ) {
if (dlg->h_id == h_id) {
if (dlg->state==DLG_STATE_DELETED)
goto not_found;

LM_DBG("dialog id=%u found on entry %u\n", h_id, h_entry);
return dlg;
}
}

not_found:
LM_DBG("no dialog id=%u found on entry %u\n", h_id, h_entry);
return 0;
}

/* Binary Packet receiving functions */

/**
* replicates a confirmed dialog from another OpenSIPS instance
* by reading the relevant information using the Binary Packet Interface
*/
int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
str *ftag, str *ttag, int safe)
str *ftag, str *ttag, unsigned int hid, int safe)
{
int h_entry, rc;
str callid = { NULL, 0 }, from_uri, to_uri, from_tag, to_tag;
Expand All @@ -112,22 +137,38 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
struct socket_info *caller_sock, *callee_sock;
struct dlg_entry *d_entry;
str tag_name;
unsigned int h_id;
unsigned int state;
unsigned int start_ts;
short pkg_ver = get_bin_pkg_version(packet);

LM_DBG("Received replicated dialog!\n");

if (!cell) {
DLG_BIN_POP(str, packet, callid, malformed);
DLG_BIN_POP(str, packet, from_tag, malformed);
DLG_BIN_POP(str, packet, to_tag, malformed);
DLG_BIN_POP(str, packet, from_uri, malformed);
DLG_BIN_POP(str, packet, to_uri, malformed);
DLG_BIN_POP(int, packet, h_id, pre_linking_error);
}

DLG_BIN_POP(int, packet, start_ts, pre_linking_error);
DLG_BIN_POP(int, packet, state, pre_linking_error);

if (!cell) {
h_entry = dlg_hash(&callid);
d_entry = &d_table->entries[h_entry];

if (safe)
if (!safe)
dlg_lock(d_table, d_entry);

if (get_dlg_unsafe(d_entry, &callid, &from_tag, &to_tag, &dlg) == 0) {
if (pkg_ver == DLG_BIN_V4)
dlg = lookup_dlg_unsafe(h_entry, h_id);
else
get_dlg_unsafe(d_entry, &callid, &from_tag, &to_tag, &dlg);

if (dlg) {
LM_DBG("Dialog with ci '%.*s' is already created\n",
callid.len, callid.s);
/* unmark dlg as loaded from DB (otherwise it would have been
Expand All @@ -146,18 +187,19 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
h_entry = dlg_hash(&cell->callid);
d_entry = &d_table->entries[h_entry];

if (safe)
if (!safe)
dlg_lock(d_table, d_entry);

from_tag = *ftag;
to_tag = *ttag;
h_id = hid;
dlg = cell;
}
if_update_stat(dlg_enable_stats, processed_dlgs, 1);

DLG_BIN_POP(int, packet, dlg->h_id, pre_linking_error);
DLG_BIN_POP(int, packet, dlg->start_ts, pre_linking_error);
DLG_BIN_POP(int, packet, dlg->state, pre_linking_error);
dlg->h_id = h_id;
dlg->start_ts = start_ts;
dlg->state = state;

/* next_id follows the max value of all replicated ids */
if (d_table->entries[dlg->h_entry].next_id <= dlg->h_id)
Expand Down Expand Up @@ -326,19 +368,23 @@ int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
*/
int dlg_replicated_update(bin_packet_t *packet)
{
struct dlg_cell *dlg;
struct dlg_cell *dlg = NULL;
str call_id, from_tag, to_tag, from_uri, to_uri, vars, profiles;
int timeout, h_entry;
str st;
struct dlg_entry *d_entry;
int rcv_flags, save_new_flag;
unsigned int h_id;
short pkg_ver = get_bin_pkg_version(packet);

bin_pop_str(packet, &call_id);
bin_pop_str(packet, &from_tag);
bin_pop_str(packet, &to_tag);
bin_pop_str(packet, &from_uri);
bin_pop_str(packet, &to_uri);

bin_pop_int(packet, &h_id);

LM_DBG("replicated update for ['%.*s' '%.*s' '%.*s' '%.*s' '%.*s']\n",
call_id.len, call_id.s, from_tag.len, from_tag.s, to_tag.len, to_tag.s,
from_uri.len, from_uri.s, to_uri.len, to_uri.s);
Expand All @@ -348,7 +394,12 @@ int dlg_replicated_update(bin_packet_t *packet)

dlg_lock(d_table, d_entry);

if (get_dlg_unsafe(d_entry, &call_id, &from_tag, &to_tag, &dlg) != 0) {
if (pkg_ver == DLG_BIN_V4)
dlg = lookup_dlg_unsafe(h_entry, h_id);
else
get_dlg_unsafe(d_entry, &call_id, &from_tag, &to_tag, &dlg);

if (!dlg) {
LM_DBG("dialog not found, building new\n");

dlg = build_new_dlg(&call_id, &from_uri, &to_uri, &from_tag);
Expand All @@ -357,7 +408,7 @@ int dlg_replicated_update(bin_packet_t *packet)
goto error;
}

return dlg_replicated_create(packet ,dlg, &from_tag, &to_tag, 0);
return dlg_replicated_create(packet ,dlg, &from_tag, &to_tag, h_id, 1);
}

/* discard an update for a deleted dialog */
Expand All @@ -366,7 +417,7 @@ int dlg_replicated_update(bin_packet_t *packet)
return 0;
}

bin_skip_int(packet, 2);
bin_skip_int(packet, 1);
bin_pop_int(packet, &dlg->state);

/* sockets */
Expand Down Expand Up @@ -483,14 +534,25 @@ int dlg_replicated_delete(bin_packet_t *packet)
unsigned int dir, dst_leg;
struct dlg_cell *dlg;
int old_state, new_state, unref, ret;
unsigned int h_id;
int h_entry;
short pkg_ver = get_bin_pkg_version(packet);

DLG_BIN_POP(str, packet, call_id, malformed);
DLG_BIN_POP(str, packet, from_tag, malformed);
DLG_BIN_POP(str, packet, to_tag, malformed);

LM_DBG("Deleting dialog with callid: %.*s\n", call_id.len, call_id.s);

dlg = get_dlg(&call_id, &from_tag, &to_tag, &dir, &dst_leg);
if (pkg_ver == DLG_BIN_V4) {
DLG_BIN_POP(int, packet, h_id, malformed);

h_entry = dlg_hash(&call_id);
dlg = lookup_dlg(h_entry, h_id);
} else {
dlg = get_dlg(&call_id, &from_tag, &to_tag, &dir, &dst_leg);
}

if (!dlg) {
/* may be already deleted due to timeout */
LM_DBG("dialog not found (callid: |%.*s| ftag: |%.*s|\n",
Expand Down Expand Up @@ -551,32 +613,61 @@ int dlg_replicated_delete(bin_packet_t *packet)
int dlg_replicated_cseq_updated(bin_packet_t *packet)
{
str call_id, from_tag, to_tag;
unsigned int dir, dst_leg;
unsigned int dir, dst_leg = -1;
unsigned int cseq;
struct dlg_cell *dlg;
unsigned int h_id;
int h_entry;
struct dlg_entry *d_entry;
short pkg_ver = get_bin_pkg_version(packet);

DLG_BIN_POP(str, packet, call_id, malformed);
DLG_BIN_POP(str, packet, from_tag, malformed);
DLG_BIN_POP(str, packet, to_tag, malformed);

LM_DBG("Updating cseq for dialog with callid: %.*s\n", call_id.len, call_id.s);

dst_leg = -1;
dlg = get_dlg(&call_id, &from_tag, &to_tag, &dir, &dst_leg);
if (pkg_ver == DLG_BIN_V4) {
DLG_BIN_POP(int, packet, h_id, malformed);

h_entry = dlg_hash(&call_id);
d_entry = &(d_table->entries[h_entry]);

dlg_lock(d_table, d_entry);

dlg = lookup_dlg_unsafe(h_entry, h_id);

if (!match_dialog(dlg, &call_id, &from_tag, &to_tag, &dir, &dst_leg)) {
LM_ERR("Failed to match dialog\n");
goto err_unlock;
}

dlg_unlock(d_table, d_entry);
} else {
dlg = get_dlg(&call_id, &from_tag, &to_tag, &dir, &dst_leg);
}

if (!dlg) {
/* may be already deleted due to timeout */
LM_DBG("dialog not found (callid: |%.*s| ftag: |%.*s|\n",
call_id.len, call_id.s, from_tag.len, from_tag.s);
return 0;
}
DLG_BIN_POP(int, packet, cseq, malformed);

DLG_BIN_POP(int, packet, cseq, err_unlock);
dlg->legs[dst_leg].last_gen_cseq = cseq;
unref_dlg(dlg, 1);

if (pkg_ver != DLG_BIN_V4)
unref_dlg(dlg, 1);

return 0;
malformed:
LM_ERR("malformed cseq update packet for %.*s\n", call_id.len, call_id.s);
return -1;
err_unlock:
if (pkg_ver == DLG_BIN_V4)
dlg_unlock(d_table, d_entry);
return -1;
}
#undef DLG_BIN_POP

Expand Down Expand Up @@ -807,6 +898,7 @@ void replicate_dialog_deleted(struct dlg_cell *dlg)
bin_push_str(&packet, &dlg->callid);
bin_push_str(&packet, &dlg->legs[DLG_CALLER_LEG].tag);
bin_push_str(&packet, &dlg->legs[callee_idx(dlg)].tag);
bin_push_int(&packet, dlg->h_id);

rc = clusterer_api.send_all(&packet, dialog_repl_cluster);
switch (rc) {
Expand Down Expand Up @@ -848,6 +940,8 @@ void replicate_dialog_cseq_updated(struct dlg_cell *dlg, int leg)
bin_push_str(&packet,
&dlg->legs[leg == DLG_CALLER_LEG?callee_idx(dlg):DLG_CALLER_LEG].tag);
bin_push_str(&packet, &dlg->legs[leg].tag);
bin_push_int(&packet, dlg->h_id);

bin_push_int(&packet, dlg->legs[leg].last_gen_cseq);

rc = clusterer_api.send_all(&packet, dialog_repl_cluster);
Expand Down Expand Up @@ -878,33 +972,42 @@ void receive_dlg_repl(bin_packet_t *packet)
bin_packet_t *pkt;

for (pkt = packet; pkt; pkt = pkt->next) {
short ver = get_bin_pkg_version(pkt);

switch (pkt->type) {
case REPLICATION_DLG_CREATED:
ensure_bin_version(pkt, BIN_VERSION);
if (ver != DLG_BIN_V3)
ensure_bin_version(pkt, BIN_VERSION);

rc = dlg_replicated_create(pkt, NULL, NULL, NULL, 1);
rc = dlg_replicated_create(pkt, NULL, NULL, NULL, 0, 0);
if_update_stat(dlg_enable_stats, create_recv, 1);
break;
case REPLICATION_DLG_UPDATED:
ensure_bin_version(pkt, BIN_VERSION);
if (ver != DLG_BIN_V3)
ensure_bin_version(pkt, BIN_VERSION);

rc = dlg_replicated_update(pkt);
if_update_stat(dlg_enable_stats, update_recv, 1);
break;
case REPLICATION_DLG_DELETED:
ensure_bin_version(pkt, BIN_VERSION);
if (ver != DLG_BIN_V3)
ensure_bin_version(pkt, BIN_VERSION);

rc = dlg_replicated_delete(pkt);
if_update_stat(dlg_enable_stats, delete_recv, 1);
break;
case REPLICATION_DLG_CSEQ:
if (ver != DLG_BIN_V3)
ensure_bin_version(pkt, BIN_VERSION);

rc = dlg_replicated_cseq_updated(pkt);
break;
case SYNC_PACKET_TYPE:
ensure_bin_version(pkt, BIN_VERSION);
if (ver != DLG_BIN_V3)
ensure_bin_version(pkt, BIN_VERSION);

while (clusterer_api.sync_chunk_iter(pkt))
if (dlg_replicated_create(pkt, NULL, NULL, NULL, 1) < 0) {
if (dlg_replicated_create(pkt, NULL, NULL, NULL, 0, 0) < 0) {
LM_ERR("Failed to process sync packet\n");
return;
}
Expand Down
9 changes: 6 additions & 3 deletions modules/dialog/dlg_replication.h
Expand Up @@ -33,7 +33,10 @@
#define REPLICATION_DLG_DELETED 3
#define REPLICATION_DLG_CSEQ 4

#define BIN_VERSION 3
#define DLG_BIN_V3 3
#define DLG_BIN_V4 4

#define BIN_VERSION DLG_BIN_V4

extern int dialog_repl_cluster;
extern int profile_repl_cluster;
Expand All @@ -50,8 +53,8 @@ void replicate_dialog_updated(struct dlg_cell *dlg);
void replicate_dialog_deleted(struct dlg_cell *dlg);
void replicate_dialog_cseq_updated(struct dlg_cell *dlg, int leg);

int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell, str *ftag,
str *ttag, int safe);
int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell,
str *ftag, str *ttag, unsigned int hid, int safe);
int dlg_replicated_update(bin_packet_t *packet);
int dlg_replicated_delete(bin_packet_t *packet);

Expand Down

0 comments on commit 63359e8

Please sign in to comment.