Skip to content

Commit

Permalink
[qrouting] call acc done only on invites
Browse files Browse the repository at this point in the history
  • Loading branch information
tallicamike authored and razvancrainea committed Feb 11, 2020
1 parent 665c921 commit 121fd30
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 87 deletions.
9 changes: 9 additions & 0 deletions modules/drouting/dr_cb.h
Expand Up @@ -54,6 +54,15 @@ enum drcb_types {
DRCB_MAX /*keep this at the end*/
};

struct dr_acc_call_params {
void *rule; /* qr_handler/rule */
int cr_id; /* destination id */
int gw_id; /* in the case the destination is a carrier */
struct sip_msg *msg;
};



/* callback function prototype */
typedef void (dr_cb) (void *param);
/* function to free callback param */
Expand Down
76 changes: 60 additions & 16 deletions modules/drouting/drouting.c
Expand Up @@ -2452,18 +2452,39 @@ static int sort_rt_dst(pgw_list_t *pgwl, unsigned short size,
}


inline static int push_gw_for_usage(struct sip_msg *msg,
struct head_db *current_partition,
struct sip_uri *uri, pgw_t *gw , str *c_id, str *c_attrs, int idx)
inline static int push_gw_for_usage(struct sip_msg *msg, struct head_db *current_partition,
struct sip_uri *uri, rt_info_t *rt, pgw_list_t * dst, int cr_id, int gw_id/* pgw_t *gw , str *c_id, str *c_attrs */, int idx)
{
char buf[PTR_STRING_SIZE]; /* a hexa string */
str *ruri;
pgw_t *gw = NULL;
str *c_id = NULL;
str *c_attrs = NULL;
int_str val;

struct dr_acc_call_params * acc_call_params = NULL;
if( current_partition==NULL ) {
return -1;
}

if(rt != NULL) { /* qrouting is requested => called from drouting */
if(cr_id == -1) { /* it is not a carrier */
gw = rt->pgwl[gw_id].dst.gw;
} else if(gw_id == -1){ /* destination is a carrier */
c_id = &rt->pgwl[cr_id].dst.carrier->id;
c_attrs = &rt->pgwl[cr_id].dst.carrier->attrs;
gw = rt->pgwl[cr_id].dst.carrier->pgwl[gw_id].dst.gw;
}
} else if(dst != NULL) { /* routing was not done rule-based => don't use
qrouting : called from route_2gw or route_2cr */
if(dst->is_carrier) {
gw = dst->dst.carrier->pgwl[gw_id].dst.gw;
c_id = &dst->dst.carrier->id;
c_attrs = &dst->dst.carrier->attrs;
} else {
gw = dst->dst.gw;
}
}

/* build uri*/
ruri = build_ruri( uri, gw->strip, &gw->pri, &gw->ip_str);
if (ruri==0) {
Expand All @@ -2486,6 +2507,25 @@ inline static int push_gw_for_usage(struct sip_msg *msg,
if (gw->sock)
msg->force_send_socket = gw->sock;

if(rt != NULL) { /* if routing was done rule based */
acc_call_params = (struct dr_acc_call_params*)pkg_malloc(
sizeof(struct dr_acc_call_params));
if(acc_call_params == NULL) {
LM_ERR("no more pkg memory!\n");
goto error;
}
memset(acc_call_params, 0, sizeof(struct dr_acc_call_params));
/* save callback parameters */
acc_call_params->rule = (void*)rt->qr_handler;
acc_call_params->cr_id = cr_id;
acc_call_params->gw_id = gw_id;
acc_call_params->msg = msg;
LM_INFO("RUN CALL\n");

run_callbacks(dr_acc_cbs, DRCB_ACC_CALL, acc_call_params); /* qr
accouting */
}

} else {

/* add ruri as AVP */
Expand Down Expand Up @@ -2897,9 +2937,8 @@ static int do_routing(struct sip_msg* msg, struct head_db *part, int grp,
/*ignore it*/
} else {
/* add gateway to usage list */
if ( push_gw_for_usage(msg, current_partition,
&uri, cdst->dst.gw ,
&dst->dst.carrier->id, &dst->dst.carrier->attrs, n ) ) {
if ( push_gw_for_usage(msg, current_partition, &uri, rt_info ,
NULL, dsts_idx[i], carrier_idx[j], n) ) {
LM_ERR("failed to use gw <%.*s>, skipping\n",
cdst->dst.gw->id.len, cdst->dst.gw->id.s);
} else {
Expand Down Expand Up @@ -2931,7 +2970,7 @@ static int do_routing(struct sip_msg* msg, struct head_db *part, int grp,

/* add gateway to usage list */
if ( push_gw_for_usage(msg, current_partition, &uri,
dst->dst.gw, NULL, NULL, n) ) {
rt_info, NULL, -1, dsts_idx[i], n) ) {
LM_ERR("failed to use gw <%.*s>, skipping\n",
dst->dst.gw->id.len, dst->dst.gw->id.s);
} else {
Expand Down Expand Up @@ -3110,6 +3149,7 @@ static int route2_carrier(struct sip_msg* msg, str* ids,
static unsigned short carrier_idx_size;
struct sip_uri uri;
pgw_list_t *cdst;
pgw_list_t tmp;
pcr_t *cr;
pv_value_t pv_val;
str ruri, id;
Expand Down Expand Up @@ -3221,14 +3261,17 @@ static int route2_carrier(struct sip_msg* msg, str* ids,
continue;
}

/* iterate through the list of GWs provided by carrier */
for ( j=0 ; j<cr->pgwa_len ; j++ ) {

cdst = &cr->pgwl[carrier_idx[j]];

/* is gateway disabled ? */
if (cdst->dst.gw->flags & DR_DST_STAT_DSBL_FLAG ) {
/*ignore it*/
/* is gateway disabled ? */
if (cdst->dst.gw->flags & DR_DST_STAT_DSBL_FLAG ) {
/*ignore it*/
} else {
/* add gateway to usage list */
tmp.is_carrier = 1;
tmp.dst.carrier = cr;
if ( push_gw_for_usage(msg, current_partition, &uri, NULL,
&tmp, -1, carrier_idx[j], n ) ) {
LM_ERR("failed to use gw <%.*s>, skipping\n",
cdst->dst.gw->id.len, cdst->dst.gw->id.s);
} else {
/* add gateway to usage list */
if ( push_gw_for_usage(msg, current_partition, &uri,
Expand Down Expand Up @@ -3305,6 +3348,7 @@ static int route2_gw(struct sip_msg* msg, str* ids, pv_spec_t* gw_attr,
{
struct sip_uri uri;
pgw_t *gw;
pgw_list_t tmp;
pv_value_t pv_val;
str ruri, id;
str next_gw_attrs = {NULL, 0};
Expand Down
2 changes: 2 additions & 0 deletions modules/drouting/prefix_tree.h
Expand Up @@ -140,6 +140,8 @@ typedef struct rt_info_ {
unsigned short pgwa_len;
/* how many lists link this element */
unsigned short ref_cnt;
/* handler used by qr for accouting (actually qr_rule_t*) */
void *qr_handler;
} rt_info_t;

typedef struct rt_info_wrp_ {
Expand Down
2 changes: 1 addition & 1 deletion modules/drouting/routing.c
Expand Up @@ -94,7 +94,6 @@ int parse_destination_list(rt_data_t* rd, char *dstlist, pgw_list_t** pgwl_ret,
unsigned int size, pgwl_size;
long int t;
char *tmp, *ep;
int i;
str id;


Expand Down Expand Up @@ -387,6 +386,7 @@ build_rt_info(
LM_ERR("No callback list to match the given type\n");
}
qr_rule = (void*)((struct dr_reg_init_rule_params*)*cb_params->param)->rule;
rt->qr_handler = qr_rule;

p = rt->pgwl;

Expand Down
119 changes: 65 additions & 54 deletions modules/qrouting/qr_acc.c
Expand Up @@ -28,6 +28,8 @@
*/
#include "qr_acc.h"

#include "../drouting/dr_cb.h"

int myn = 0;

/* free the parameter of the dialog callback */
Expand Down Expand Up @@ -75,64 +77,72 @@ static void release_trans_prop(void *param) {
shm_free(to_free);
}

int test_acc(struct sip_msg* msg) {
void qr_acc(int type, struct dr_cb_params * params) {
qr_trans_prop_t *trans_prop;
qr_rule_t *rule;
int gw_id, cr_id;
struct sip_msg *msg = NULL;

msg = ((struct dr_acc_call_params*)(*params->param))->msg;

if(msg->first_line.type != SIP_REQUEST ||
msg->first_line.u.request.method_value != METHOD_INVITE) {
if(/*msg->first_line.type != SIP_REQUEST ||*/
msg->first_line.u.request.method_value == METHOD_INVITE) {
/*TODO: check if works only on invite (as it should) */
return -1;/* it is not an invite */
}

trans_prop = (qr_trans_prop_t*)shm_malloc(sizeof(qr_trans_prop_t));
if(trans_prop == NULL) {
LM_ERR("no more shm memory\n");
goto error;
}
rule = ((struct dr_acc_call_params*)(*params->param))->rule;
gw_id = ((struct dr_acc_call_params*)(*params->param))->gw_id;
cr_id = ((struct dr_acc_call_params*)(*params->param))->cr_id;

memset(trans_prop, 0, sizeof(qr_trans_prop_t));
trans_prop = (qr_trans_prop_t*)shm_malloc(sizeof(qr_trans_prop_t));
if(trans_prop == NULL) {
LM_ERR("no more shm memory\n");
goto error;
}

if(init_trans_prop(trans_prop) < 0) {
LM_ERR("failed to init transaction properties (for qrouting)\n");
goto error;
}
memset(trans_prop, 0, sizeof(qr_trans_prop_t));

/* save transaction properties */
trans_prop->gw = (*qr_rules_start)->dest[0].dst.gw;
if(init_trans_prop(trans_prop) < 0) {
LM_ERR("failed to init transaction properties (for qrouting)\n");
goto error;
}

/* get the time of INVITE */
if(clock_gettime(CLOCK_REALTIME, trans_prop->invite) < 0) {
LM_ERR("failed to get system time\n");
goto error;
}
/* save transaction properties */
if(cr_id == -1) { /* if the destination is not within a carrier */
trans_prop->gw = rule->dest[gw_id].dst.gw;
} else { /* if the destination is within a carrier */
trans_prop->gw = rule->dest[cr_id].dst.grp.gw[gw_id];
}
/* get the time of INVITE */
if(clock_gettime(CLOCK_REALTIME, trans_prop->invite) < 0) {
LM_ERR("failed to get system time\n");
goto error;
}

if(dlgcb.create_dlg(msg, 0) < 0) { /* for call duration */
LM_ERR("failed to create dialog\n");
goto error;
}
/* register callback for the responses to this INVITE */
if(tmb.register_tmcb(msg, 0,TMCB_RESPONSE_IN, qr_check_reply_tmcb,
(void*)trans_prop, release_trans_prop) <= 0) {
LM_ERR("cannot register TMCB_RESPONSE_IN\n");
goto error;
if(dlgcb.create_dlg(msg, 0) < 0) { /* for call duration */
LM_ERR("failed to create dialog\n");
goto error;
}
/* register callback for the responses to this INVITE */
if(tmb.register_tmcb(msg, 0,TMCB_RESPONSE_IN, qr_check_reply_tmcb,
(void*)trans_prop, release_trans_prop) <= 0) {
LM_ERR("cannot register TMCB_RESPONSE_IN\n");
goto error;
}
}

return 1;
return ;
error:
if(trans_prop != NULL) {
release_trans_prop(trans_prop); /* cur_time is released here */
}

return -1;
}

/* a call for this gateway returned 200OK */
inline void qr_add_200OK(qr_gw_t * gw) {
lock_get(gw->acc_lock);
LM_INFO("200OK - inside lock\n");
++(gw->current_interval.stats.as);
++(gw->current_interval.stats.cc);
LM_INFO("200OK %d\n", gw->current_interval.stats.as);
lock_release(gw->acc_lock);
}

Expand All @@ -144,17 +154,17 @@ inline void qr_add_4xx(qr_gw_t * gw) {
}

inline void qr_add_pdd(qr_gw_t *gw, double pdd_tm) {
lock_get(gw->acc_lock); /* protect the statistics */
++(gw->current_interval.n.pdd);
gw->current_interval.stats.pdd += pdd_tm;
lock_release(gw->acc_lock);
lock_get(gw->acc_lock); /* protect the statistics */
++(gw->current_interval.n.pdd);
gw->current_interval.stats.pdd += pdd_tm;
lock_release(gw->acc_lock);
}

inline void qr_add_setup(qr_gw_t *gw, double st) {
lock_get(gw->acc_lock); /* protect the statistics */
++(gw->current_interval.n.setup);
gw->current_interval.stats.st += st;
lock_release(gw->acc_lock);
lock_get(gw->acc_lock); /* protect the statistics */
++(gw->current_interval.n.setup);
gw->current_interval.stats.st += st;
lock_release(gw->acc_lock);
}

/*
Expand All @@ -175,7 +185,7 @@ static double get_elapsed_time(struct timespec * start, char mu) {
seconds = difftime(now.tv_sec, start->tv_sec); /* seconds elapsed betwen
now and the initial invite */
if(seconds < 0) {
LM_ERR("negative time elapsed from INVITE\n");
LM_ERR("negative time elapsed\n");
return -1;
}
if(mu == 'm') {
Expand All @@ -199,16 +209,16 @@ static double get_elapsed_time(struct timespec * start, char mu) {
static void call_ended(struct dlg_cell* dlg, int type,
struct dlg_cb_params * params) {
double cd;
qr_dialog_prop_t *dialog_prop = (qr_dialog_prop_t*)params;
struct timespec *time_200OK = (struct timespec*)*params->param;
qr_dialog_prop_t *dialog_prop = (qr_dialog_prop_t*)*params->param;
struct timespec *time_200OK = dialog_prop->time_200OK;
if((cd = get_elapsed_time(time_200OK,'s')) < 0) {
LM_ERR("call duration negative\n");
return;
}
lock_get(dialog_prop->gw->acc_lock); /* protect the statistics */
++(dialog_prop->gw->current_interval.n.cd);
dialog_prop->gw->current_interval.stats.cd += cd;
lock_release(dialog_prop->gw->acc_lock);
LM_DBG("call duration = %lf", cd);
}

/*
Expand Down Expand Up @@ -278,14 +288,15 @@ void qr_check_reply_tmcb(struct cell *cell, int type, struct tmcb_params *ps) {
LM_ERR("failed to register callback for call termination\n");
goto error;
}
LM_INFO("callback for call duration registered\n");
} else if (ps->code != 408 || (ps->code == 408 && (cell->flags &
T_UAC_HAS_RECV_REPLY) )){ /* if it's 408 it must have
one provisional response */
qr_add_4xx(trans_prop->gw);
}
}
if(ps->code >= 200) { /* 1XX should not be accounted -
provisional responses */
provisional responses */
lock_get(trans_prop->gw->acc_lock);
++(trans_prop->gw->current_interval.n.ok);
lock_release(trans_prop->gw->acc_lock);
Expand Down Expand Up @@ -340,15 +351,15 @@ static inline void add_stats(qr_stats_t *x, qr_stats_t *y, char op) {
/* testing purpose only */
void show_stats(qr_gw_t *gw) {
LM_INFO("*****************************\n");
LM_INFO("ans seizure: %d / %d\n", gw->history_stats.stats.as,
LM_INFO("ans seizure: %lf / %lf\n", gw->history_stats.stats.as,
gw->history_stats.n.ok);
LM_INFO("completed calls: %d / %d\n", gw->history_stats.stats.cc,
LM_INFO("completed calls: %lf / %lf\n", gw->history_stats.stats.cc,
gw->history_stats.n.ok);
LM_INFO("post dial delay: %lf / %d\n", gw->history_stats.stats.pdd,
LM_INFO("post dial delay: %lf / %lf\n", gw->history_stats.stats.pdd,
gw->history_stats.n.pdd);
LM_INFO("setup time: %lf / %d\n", gw->history_stats.stats.st,
LM_INFO("setup time: %lf / %lf\n", gw->history_stats.stats.st,
gw->history_stats.n.setup);
LM_INFO("call duration: %lf / %d\n", gw->history_stats.stats.cd,
LM_INFO("call duration: %lf / %lf\n", gw->history_stats.stats.cd,
gw->history_stats.n.cd);
LM_INFO("*****************************\n");
}
Expand All @@ -365,11 +376,11 @@ void update_gw_stats(qr_gw_t *gw) {
gw->state |= QR_STATUS_DIRTY;
lock_stop_write(gw->ref_lock);
gw->next_interval->calls = gw->current_interval;
show_stats(gw);
memset(&gw->current_interval, 0, sizeof(qr_stats_t));
gw->next_interval = gw->next_interval->next; /* the 'oldest' sample interval
becomes the 'newest' */
lock_release(gw->acc_lock);
show_stats(gw);
}


Expand Down

0 comments on commit 121fd30

Please sign in to comment.