Skip to content

Commit

Permalink
dialog: add mechanism that allows tagging dialogs when replicating
Browse files Browse the repository at this point in the history
Dialogs can be labeled from the the script using the set_dlg_repl_tag()
function and each tag has a state(active/backup) which can be set via
the "set_repl_tag_active" MI cmd. The tags and their states are shared
in the cluster and nodes automatically go to the backup state when
another node is set to active.
  • Loading branch information
rvlad-patrascu committed Mar 27, 2018
1 parent db592fd commit 91b9259
Show file tree
Hide file tree
Showing 6 changed files with 272 additions and 5 deletions.
61 changes: 61 additions & 0 deletions modules/dialog/dialog.c
Expand Up @@ -153,6 +153,7 @@ static int fixup_get_vals(void** param, int param_no);
static int w_get_dlg_info(struct sip_msg*, char*, char*, char*, char*);
static int w_get_dlg_vals(struct sip_msg*, char*, char*, char*);
static int w_tsl_dlg_flag(struct sip_msg *msg, char *_idx, char *_val);
static int w_set_dlg_repltag(struct sip_msg *msg, char *repltag);

/* item/pseudo-variables functions */
int pv_get_dlg_lifetime(struct sip_msg *msg,pv_param_t *param,pv_value_t *res);
Expand Down Expand Up @@ -222,6 +223,8 @@ static cmd_export_t cmds[]={
BRANCH_ROUTE | LOCAL_ROUTE | EVENT_ROUTE | TIMER_ROUTE },
{"match_dialog", (cmd_function)w_match_dialog, 0,NULL,
0, REQUEST_ROUTE},
{"set_dlg_repl_tag", (cmd_function)w_set_dlg_repltag, 1,
fixup_spve_null, 0, REQUEST_ROUTE},
{"load_dlg", (cmd_function)load_dlg, 0, 0, 0, 0},
{0,0,0,0,0,0}
};
Expand Down Expand Up @@ -311,6 +314,7 @@ static mi_export_t mi_cmds[] = {
{ "profile_get_values", 0, mi_get_profile_values, 0, 0, 0},
{ "list_all_profiles", 0, mi_list_all_profiles, 0, 0, 0},
{ "profile_end_dlgs", 0, mi_profile_terminate, 0, 0, 0},
{ "set_repl_tag_active",0, mi_set_repltag_active, 0, 0, 0},
{ 0, 0, 0, 0, 0, 0}
};

Expand Down Expand Up @@ -878,6 +882,17 @@ static int mod_init(void)

if (clusterer_api.request_sync(&dlg_repl_cap, dialog_repl_cluster) < 0)
LM_ERR("Sync request failed\n");

if ((repltags_list = shm_malloc(sizeof *repltags_list)) == NULL) {
LM_CRIT("No more shm memory\n");
return -1;
}
*repltags_list = NULL;

if ((repltags_lock = lock_init_rw()) == NULL) {
LM_CRIT("Failed to init lock\n");
return -1;
}
}

if ( register_timer( "dlg-timer", dlg_timer_routine, NULL, 1,
Expand Down Expand Up @@ -995,6 +1010,24 @@ static int child_init(int rank)

static void mod_destroy(void)
{
struct dlg_repl_tag *tag, *tag_tmp;

if (repltags_list) {
if (*repltags_list) {
for (tag = *repltags_list; tag; ) {
tag_tmp = tag;
tag = tag->next;
shm_free(tag_tmp);
}
}
shm_free(repltags_list);
repltags_list = NULL;
}
if (repltags_lock) {
lock_destroy_rw(repltags_lock);
repltags_lock = NULL;
}

if (dlg_db_mode != DB_MODE_NONE) {
dialog_update_db(0, 0/*do not do locking*/);
destroy_dlg_db();
Expand Down Expand Up @@ -1599,6 +1632,34 @@ static int w_get_dlg_vals(struct sip_msg *msg, char *v_name, char *v_val,
return 1;
}

static int w_set_dlg_repltag(struct sip_msg *msg, char *repltag)
{
str tag_name;
struct dlg_cell *dlg;

if (!dialog_repl_cluster) {
LM_DBG("Dialog replication not configured\n");
return 1;
}

if (fixup_get_svalue(msg, (gparam_p)repltag, &tag_name) < 0) {
LM_ERR("no replication tag\n");
return -1;
}

if ((dlg = get_current_dialog()) == NULL) {
LM_ERR("Unable to fetch dialog\n");
return -1;
}

if (set_dlg_repltag(dlg, &tag_name) < 0) {
LM_ERR("Unable to set replication tag\n");
return -1;
}

return 1;
}


/* item/pseudo-variables functions */
int pv_get_dlg_lifetime(struct sip_msg *msg, pv_param_t *param,
Expand Down
1 change: 0 additions & 1 deletion modules/dialog/dlg_db_handler.c
Expand Up @@ -43,7 +43,6 @@
#include "dlg_cb.h"
#include "dlg_profile.h"


str dlg_id_column = str_init(DLG_ID_COL);
str call_id_column = str_init(CALL_ID_COL);
str from_uri_column = str_init(FROM_URI_COL);
Expand Down
190 changes: 189 additions & 1 deletion modules/dialog/dlg_replication.c
Expand Up @@ -51,6 +51,10 @@ extern stat_var *delete_recv;

struct clusterer_binds clusterer_api;

str repltag_dlg_val = str_init("dlgX_repltag");
struct dlg_repl_tag **repltags_list;
rw_lock_t *repltags_lock;

static struct socket_info * fetch_socket_info(str *addr)
{
struct socket_info *sock;
Expand Down Expand Up @@ -666,6 +670,97 @@ void replicate_dialog_deleted(struct dlg_cell *dlg)
LM_ERR("Failed to replicate deleted dialog\n");
}

struct dlg_repl_tag *create_dlg_repltag(str *tag_name)
{
struct dlg_repl_tag *new_tag;

new_tag = shm_malloc(sizeof *new_tag + tag_name->len);
if (!new_tag) {
LM_ERR("No more shm memory\n");
return NULL;
}
memset(new_tag, 0, sizeof *new_tag);

new_tag->name.s = (char *)(new_tag + 1);
new_tag->name.len = tag_name->len;
memcpy(new_tag->name.s, tag_name->s, tag_name->len);

new_tag->state = REPLTAG_STATE_BACKUP;

new_tag->next = *repltags_list;
*repltags_list = new_tag;

return new_tag;
}

/* should be called under writing lock */
struct dlg_repl_tag *get_repltag_unsafe(str *tag_name)
{
struct dlg_repl_tag *tag;

for (tag = *repltags_list; tag && str_strcmp(&tag->name, tag_name);
tag = tag->next) ;
if (!tag && !(tag = create_dlg_repltag(tag_name))) {
LM_ERR("Failed to create replication tag\n");
return NULL;
}

return tag;
}

/* you must release the lock for switchable reading if @lock_stop_r = 0
* in case of error the lock is released by the function
*/
struct dlg_repl_tag *get_repltag(str *tag_name, int lock_stop_r)
{
struct dlg_repl_tag *tag;
int lock_old_flag;

lock_start_sw_read(repltags_lock);

for (tag = *repltags_list; tag && str_strcmp(&tag->name, tag_name);
tag = tag->next) ;
if (!tag) {
lock_switch_write(repltags_lock, lock_old_flag);
if ((tag = create_dlg_repltag(tag_name)) == NULL) {
LM_ERR("Failed to create replication tag\n");
lock_switch_read(repltags_lock, lock_old_flag);
lock_stop_sw_read(repltags_lock);
return NULL;
}
lock_switch_read(repltags_lock, lock_old_flag);
}

if (lock_stop_r)
lock_stop_sw_read(repltags_lock);

return tag;
}

static int receive_repltag_active_msg(bin_packet_t *packet)
{
str tag_name;
struct dlg_repl_tag *tag;

bin_pop_str(packet, &tag_name);

lock_start_write(repltags_lock);

if ((tag = get_repltag_unsafe(&tag_name)) == NULL) {
LM_ERR("Unable to fetch replication tag\n");
lock_stop_write(repltags_lock);
return -1;
}

/* directly go to backup state when another
* node in the cluster is to active */
tag->state = REPLTAG_STATE_BACKUP;

lock_stop_write(repltags_lock);

return 0;
}

void receive_dlg_repl(bin_packet_t *packet)
{
int rc = 0;
Expand All @@ -685,6 +780,9 @@ void receive_dlg_repl(bin_packet_t *packet)
rc = dlg_replicated_delete(pkt);
if_update_stat(dlg_enable_stats, delete_recv, 1);
break;
case REPLICATION_TAG_ACTIVE:
rc = receive_repltag_active_msg(pkt);
break;
case SYNC_PACKET_TYPE:
while (clusterer_api.sync_chunk_iter(pkt))
if (dlg_replicated_create(pkt, NULL, NULL, NULL, 1) < 0) {
Expand Down Expand Up @@ -1193,4 +1291,94 @@ struct mi_root* mi_sync_cl_dlg(struct mi_root *cmd, void *param)
return init_mi_tree(400, MI_SSTR("Failed to send sync request"));
else
return init_mi_tree(200, MI_SSTR(MI_OK));
}
}

int set_dlg_repltag(struct dlg_cell *dlg, str *tag_name)
{
if (get_repltag(tag_name, 1) == NULL) {
LM_ERR("Unable to fetch replication tag\n");
return -1;
}

if (store_dlg_value(dlg, &repltag_dlg_val, tag_name) < 0) {
LM_ERR("Failed to store dlg value\n");
return -1;
}

return 0;
}

static int broadcast_repltag_active(str *tag_name)
{
bin_packet_t packet;

if (bin_init(&packet, &dlg_repl_cap, REPLICATION_TAG_ACTIVE, BIN_VERSION,
0) < 0) {
LM_ERR("Failed to init bin packet");
return -1;
}
bin_push_str(&packet, tag_name);

if (clusterer_api.send_all(&packet, dialog_repl_cluster) !=
CLUSTERER_SEND_SUCCES) {
bin_free_packet(&packet);
return -1;
}

bin_free_packet(&packet);

return 0;
}

struct mi_root *mi_set_repltag_active(struct mi_root *cmd_tree, void *param)
{
struct mi_node* node;
struct dlg_repl_tag *tag;

node = cmd_tree->node.kids;
if (node == NULL || !node->value.s || !node->value.len)
return init_mi_tree(400, MI_SSTR(MI_MISSING_PARM));

lock_start_write(repltags_lock);

if ((tag = get_repltag_unsafe(&node->value)) == NULL)
return init_mi_tree(500, MI_SSTR("Unable to set replication tag"));

tag->state = REPLTAG_STATE_ACTIVE;

lock_stop_write(repltags_lock);

if (broadcast_repltag_active(&node->value) < 0)
LM_ERR("Failed to broadcast message about tag [%.*s] going active\n",
node->value.len, node->value.s);

return init_mi_tree( 200, MI_SSTR(MI_OK));
}

/* @return:
* 0 - backup
* 1 - active
* -1 - error
*/
int get_repltag_state(struct dlg_cell *dlg)
{
str tag_name;
struct dlg_repl_tag *tag;
int rc;

if (fetch_dlg_value(dlg, &repltag_dlg_val, &tag_name, 0) < 0) {
LM_ERR("Unable to fetch dlg value for replication tag\n");
return -1;
}

if ((tag = get_repltag(&tag_name, 0)) == NULL) {
LM_ERR("Unable to fetch replication tag\n");
return -1;
}

rc = tag->state;

lock_stop_sw_read(repltags_lock);

return rc;
}
23 changes: 22 additions & 1 deletion modules/dialog/dlg_replication.h
Expand Up @@ -28,6 +28,7 @@
#include "../../bin_interface.h"
#include "../../socket_info.h"
#include "../../timer.h"
#include "../../rw_locking.h"
#include "../clusterer/api.h"

#ifndef _DIALOG_DLG_REPLICATION_H_
Expand All @@ -36,9 +37,22 @@
#define REPLICATION_DLG_CREATED 1
#define REPLICATION_DLG_UPDATED 2
#define REPLICATION_DLG_DELETED 3
#define REPLICATION_TAG_ACTIVE 4

#define BIN_VERSION 1

#define REPLTAG_STATE_BACKUP 0
#define REPLTAG_STATE_ACTIVE 1

struct dlg_repl_tag {
str name;
int state;
struct dlg_repl_tag *next;
};

extern struct dlg_repl_tag **repltags_list;
extern rw_lock_t *repltags_lock;

extern int dialog_repl_cluster;
extern int profile_repl_cluster;

Expand All @@ -47,18 +61,25 @@ extern str prof_repl_cap;

extern struct clusterer_binds clusterer_api;

extern str repltag_dlg_val;

void replicate_dialog_created(struct dlg_cell *dlg);
void replicate_dialog_updated(struct dlg_cell *dlg);
void replicate_dialog_deleted(struct dlg_cell *dlg);

int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell, str *ftag, str *ttag, int safe);
int dlg_replicated_create(bin_packet_t *packet, struct dlg_cell *cell, str *ftag,
str *ttag, int safe);
int dlg_replicated_update(bin_packet_t *packet);
int dlg_replicated_delete(bin_packet_t *packet);

void receive_dlg_repl(bin_packet_t *packet);
void rcv_cluster_event(enum clusterer_event ev, int node_id);

struct mi_root* mi_sync_cl_dlg(struct mi_root *cmd, void *param);
struct mi_root *mi_set_repltag_active(struct mi_root *cmd, void *param);

int get_repltag_state(struct dlg_cell *dlg);
int set_dlg_repltag(struct dlg_cell *dlg, str *tag_name);

#endif /* _DIALOG_DLG_REPLICATION_H_ */

1 change: 0 additions & 1 deletion modules/dialog/dlg_req_within.c
Expand Up @@ -44,7 +44,6 @@
#include "dlg_profile.h"
#include "dlg_handlers.h"


extern str dlg_extra_hdrs;

int free_tm_dlg(dlg_t *td)
Expand Down
1 change: 0 additions & 1 deletion modules/dialog/dlg_vals.c
Expand Up @@ -303,4 +303,3 @@ int pv_set_dlg_val(struct sip_msg* msg, pv_param_t *param, int op,
return 0;
}


0 comments on commit 91b9259

Please sign in to comment.