Skip to content

Commit

Permalink
cgrates: add cgrates_auth() async command
Browse files Browse the repository at this point in the history
  • Loading branch information
razvancrainea committed Jan 12, 2017
1 parent 4ab66e8 commit b16e91a
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 23 deletions.
20 changes: 5 additions & 15 deletions modules/cgrates/cgrates.c
Expand Up @@ -34,7 +34,6 @@
#include "../../mod_fix.h"
#include "../../lib/list.h"
#include "../../resolve.h"
#include "../../reactor_defs.h"
#include "cgrates.h"
#include "cgrates_acc.h"
#include "cgrates_auth.h"
Expand All @@ -52,15 +51,9 @@ static void mod_destroy(void);
static int child_init(int rank);
static int cgrates_set_engine(modparam_t type, void * val);

struct cgr_param {
int wait_for_reply;
struct sip_msg *msg;
struct cgr_conn *c;
};
int cgr_ctx_idx;
int cgr_ctx_local_idx;
int cgr_tm_ctx_idx = -1;
static int cgrates_async_resume_repl(int fd, struct sip_msg *msg, void *param);

static int pv_set_cgr(struct sip_msg *msg, pv_param_t *param,
int op, pv_value_t *val);
Expand Down Expand Up @@ -102,10 +95,9 @@ static pv_export_t pvars[] = {


static acmd_export_t acmds[] = {
/*
{"cgrates_engage", (acmd_function)w_async_cgr_engage, 1,
fixup_cgrates},
*/
{"cgrates_auth", (acmd_function)w_acgr_auth, 0, fixup_cgrates},
{"cgrates_auth", (acmd_function)w_acgr_auth, 1, fixup_cgrates},
{"cgrates_auth", (acmd_function)w_acgr_auth, 2, fixup_cgrates},
{0, 0, 0, 0, }
};

Expand Down Expand Up @@ -153,10 +145,8 @@ struct module_exports exports = {

static int fixup_cgrates(void ** param, int param_no)
{
if (param_no > 0 && param_no < 5)
if (param_no == 1 || param_no == 2)
return fixup_spve(param);
if (param_no == 5)
return fixup_pvar(param);
LM_CRIT("Unknown parameter number %d\n", param_no);
return E_UNSPEC;
}
Expand Down Expand Up @@ -234,7 +224,7 @@ static int mod_init(void)
if (load_tm_api(&cgr_tmb)!=0) {
LM_INFO("TM not loaded- cannot store variables in transaction!\n");
} else {
cgr_tm_ctx_idx = cgr_tmb.t_ctx_register_ptr(NULL);
cgr_tm_ctx_idx = cgr_tmb.t_ctx_register_ptr(cgr_free_ctx);
/* register a routine to move the pointer in tm when the transaction
* is created! */
if (cgr_tmb.register_tmcb(0, 0, TMCB_REQUEST_IN, cgr_move_ctx, 0, 0)<=0) {
Expand Down
28 changes: 25 additions & 3 deletions modules/cgrates/cgrates_auth.c
Expand Up @@ -64,18 +64,18 @@ static json_object *cgr_get_auth_msg(struct sip_msg *msg, str *acc, str *dst)
LM_ERR("Cannot get callid of the message!\n");
return NULL;
}
ctx = CGR_GET_CTX();
ctx = cgr_try_get_ctx();
stime.s = int2str(time(NULL), &stime.len);

cmsg = cgr_get_generic_msg("SMGenericV1.MaxUsage", ctx->kv_store);
cmsg = cgr_get_generic_msg("SMGenericV1.MaxUsage", ctx ? ctx->kv_store : NULL);
if (!cmsg) {
LM_ERR("cannot create generic cgrates message!\n");
return NULL;
}

/* OriginID */
/* if origin was not added from script, add it now */
if (ctx && !cgr_get_const_kv(ctx->kv_store, "OriginID") &&
if (((ctx && !cgr_get_const_kv(ctx->kv_store, "OriginID")) || !ctx) &&
cgr_msg_push_str(cmsg, "OriginID", &msg->callid->body) < 0) {
LM_ERR("cannot push OriginID!\n");
goto error;
Expand Down Expand Up @@ -125,3 +125,25 @@ int w_cgr_auth(struct sip_msg* msg, char* acc_c, char *dst_c)

return cgr_handle_cmd(msg, jmsg, cgr_proc_auth_reply, NULL);
}

int w_acgr_auth(struct sip_msg* msg, async_resume_module **resume_f,
void **resume_p, char* acc_c, char *dst_c)
{
str *acc;
str *dst;
json_object *jmsg = NULL;

if ((acc = cgr_get_acc(msg, acc_c)) == NULL)
return -4;
if ((dst = cgr_get_dst(msg, dst_c)) == NULL)
return -4;

jmsg = cgr_get_auth_msg(msg, acc, dst);
if (!jmsg) {
LM_ERR("cannot build the json to send to cgrates\n");
return -1;
}

return cgr_handle_async_cmd(msg, jmsg, cgr_proc_auth_reply, NULL,
resume_f, resume_p);
}
4 changes: 4 additions & 0 deletions modules/cgrates/cgrates_auth.h
Expand Up @@ -37,5 +37,9 @@
*/
int w_cgr_auth(struct sip_msg* msg, char* acc_c, char *dst_c);

/* async version of w_cgr_auth */
int w_acgr_auth(struct sip_msg* msg, async_resume_module **resume_f,
void **resume_p, char* acc_c, char *dst_c);

#endif /* _CGRATES_AUTH_H_ */

115 changes: 110 additions & 5 deletions modules/cgrates/cgrates_common.c
Expand Up @@ -27,6 +27,7 @@
#include "../../mod_fix.h"
#include "../../parser/parse_from.h"
#include "../../parser/parse_uri.h"
#include "../../reactor_defs.h"
#include "cgrates.h"
#include "cgrates_acc.h"
#include "cgrates_common.h"
Expand Down Expand Up @@ -234,20 +235,22 @@ struct cgr_ctx *cgr_try_get_ctx(void)
struct cgr_ctx *cgr_get_ctx(void)
{
struct cell* t;
struct cgr_ctx *ctx = CGR_GET_CTX();
struct cgr_ctx *ctx = cgr_try_get_ctx();

t = cgr_tmb.t_gett ? cgr_tmb.t_gett() : NULL;
t = t==T_UNDEFINED ? NULL : t;

if (ctx) {
/* if it is local, and we have transaction, move it in transaction */
if (t) {
if (t && CGR_GET_CTX()) {
LM_DBG("ctx=%p moved in transaction\n", ctx);
CGR_PUT_TM_CTX(t, ctx);
CGR_PUT_CTX(NULL);
}
return ctx;
}


ctx = shm_malloc(sizeof *ctx);
if (!ctx) {
LM_ERR("out of shm memory\n");
Expand Down Expand Up @@ -316,7 +319,7 @@ int cgr_handle_cmd(struct sip_msg *msg, json_object *jmsg,
/* go through each server and initialize the state */
list_for_each(l, &cgrates_engines) {
e = list_entry(l, struct cgr_engine, list);
if (!(c = cgr_get_free_conn(e)))
if (!(c = cgr_get_default_conn(e)))
continue;
/* found a free connection - build the buffer */
if (cgrc_send(c, &smsg) > 0)
Expand All @@ -340,6 +343,108 @@ int cgr_handle_cmd(struct sip_msg *msg, json_object *jmsg,
return ret;
}

struct cgr_param {
struct cgr_conn *c;
cgr_proc_reply_f reply_f;
void *reply_p;
};

static int cgrates_async_resume_repl(int fd,
struct sip_msg *msg, void *param)
{
int ret;
struct cgr_param *cp = (struct cgr_param *)param;
struct cgr_conn *c = cp->c;

/* reset the error */
CGR_RESET_REPLY_CTX();

ret = cgrc_async_read(c, cp->reply_f, cp->reply_p);

if (async_status == ASYNC_DONE) {
/* processing done - remove the FD and replace the handler */
async_status = ASYNC_DONE_NO_IO;
reactor_del_reader(c->fd, -1, 0);
if (cgrc_start_listen(c) < 0) {
LM_CRIT("cannot re-register fd for cgrates events!\n");
ret = -1;
goto end;
}
}
end:
/* done with this connection */
c->state = CGRC_FREE;
pkg_free(cp);
return ret;
}

int cgr_handle_async_cmd(struct sip_msg *msg, json_object *jmsg,
cgr_proc_reply_f f, void *p, async_resume_module **resume_f,
void **resume_p)
{
struct list_head *l;
struct cgr_engine *e;
struct cgr_conn *c;
struct cgr_param *cp = NULL;
int ret = 1;
str smsg;
char *r;

smsg.s = (char *)json_object_to_json_string(jmsg);
smsg.len = strlen(smsg.s);

cp = pkg_malloc(sizeof *cp);
if (!cp) {
LM_ERR("out of pkg memory\n");
return -1;
}
memset(cp, 0, sizeof *cp);
cp->reply_f = f;
cp->reply_p = p;

r = (char *)json_object_to_json_string(jmsg);
LM_DBG("sending json string: %s\n", r);

list_for_each(l, &cgrates_engines) {
e = list_entry(l, struct cgr_engine, list);
if (!(c = cgr_get_free_conn(e)))
continue;
/* found a free connection - build the buffer */
if (cgrc_send(c, &smsg) < 0) {
cgrc_close(c, CGRC_IS_LISTEN(c));
continue;
}
cp->c = c;
/* message succesfully sent - now fetch the reply */
if (CGRC_IS_DEFAULT(c)) {
/* reset the error */
CGR_RESET_REPLY_CTX();
do {
ret = cgrc_async_read(c, f, p);
} while(async_status == ASYNC_CONTINUE);
if (async_status == ASYNC_DONE)
/* do the reading in sync mode */
async_status = ASYNC_SYNC;
pkg_free(cp);
return ret;
} else {
c->state = CGRC_USED;
if (CGRC_IS_LISTEN(c)) {
/* remove the fd from the reactor because it will be added at the end of
* this function */
reactor_del_reader(c->fd, -1, 0);
CGRC_UNSET_LISTEN(c);
}
async_status = c->fd;
*resume_f = cgrates_async_resume_repl;
*resume_p = cp;
}
return ret;
}
pkg_free(cp);
return -3;
}

/* returns the processing status */
int cgrc_async_read(struct cgr_conn *c,
cgr_proc_reply_f f, void *p)
Expand Down Expand Up @@ -600,7 +705,7 @@ void cgr_free_ctx(void *param)
/* function that moves the context from global context to the transaction one */
void cgr_move_ctx( struct cell* t, int type, struct tmcb_params *ps)
{
struct cgr_ctx *ctx = (struct cgr_ctx *)*ps->param;
struct cgr_ctx *ctx = cgr_try_get_ctx();

if (!ctx)
return; /* nothing to move */
Expand All @@ -612,7 +717,7 @@ void cgr_move_ctx( struct cell* t, int type, struct tmcb_params *ps)
return;
}

LM_DBG("context moved in transaction\n");
LM_DBG("ctx=%p moved in transaction\n", ctx);
CGR_PUT_TM_CTX(t, ctx);
CGR_PUT_CTX(NULL);
}
Expand Down
3 changes: 3 additions & 0 deletions modules/cgrates/cgrates_common.h
Expand Up @@ -146,6 +146,9 @@ typedef int (*cgr_proc_reply_f)(struct cgr_conn *, json_object *,
*/
int cgr_handle_cmd(struct sip_msg *msg, json_object *jmsg,
cgr_proc_reply_f f, void *p);
int cgr_handle_async_cmd(struct sip_msg *msg, json_object *jmsg,
cgr_proc_reply_f f, void *p, async_resume_module **resume_f,
void **resume_p);
int cgrates_async_resume_req(int fd, void *param);
int cgrc_async_read(struct cgr_conn *c,
cgr_proc_reply_f f, void *p);
Expand Down
5 changes: 5 additions & 0 deletions modules/cgrates/cgrates_engine.c
Expand Up @@ -48,6 +48,11 @@ struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e)
} else {
LM_DBG("maximum async connections per process reached!\n");
}
return cgr_get_default_conn(e);
}

struct cgr_conn *cgr_get_default_conn(struct cgr_engine *e)
{
/* use the default connection */
if (e->default_con && e->default_con->state == CGRC_FREE) {
LM_DBG("using default connection - running in sync mode!\n");
Expand Down
1 change: 1 addition & 0 deletions modules/cgrates/cgrates_engine.h
Expand Up @@ -59,6 +59,7 @@ int cgrc_start_listen(struct cgr_conn *c);
void cgrc_close(struct cgr_conn *c, int remove);
struct cgr_conn *cgrc_new(struct cgr_engine *e);
struct cgr_conn *cgr_get_free_conn(struct cgr_engine *e);
struct cgr_conn *cgr_get_default_conn(struct cgr_engine *e);

#endif /* _CGRATES_ENGINE_H_ */

0 comments on commit b16e91a

Please sign in to comment.