Skip to content

Commit

Permalink
usrloc: Implement the NoSQL version of get_urecord()
Browse files Browse the repository at this point in the history
  • Loading branch information
liviuchircu committed Mar 12, 2018
1 parent 962228b commit 30f8e7c
Showing 1 changed file with 195 additions and 5 deletions.
200 changes: 195 additions & 5 deletions modules/usrloc/udomain.c
Expand Up @@ -419,7 +419,7 @@ void free_udomain(udomain_t* _d)
* Returns a static dummy urecord for temporary usage
*/
static inline void
get_static_urecord(udomain_t* _d, str* _aor, struct urecord** _r)
get_static_urecord(const udomain_t* _d, const str* _aor, struct urecord** _r)
{
static struct urecord r;

Expand Down Expand Up @@ -466,13 +466,117 @@ void print_udomain(FILE* _f, udomain_t* _d)
fprintf(_f, "\n---/Domain---\n");
}

/*! \brief
* expects (UL_COLS - 4) rows:
* contact, expires, q, callid, cseq, flags, cflags, ua,
* received, path, socket, methods, last_modified, instance, attr)
*/
static inline ucontact_info_t* cdb_row2info(const cdb_row_t *row, str *contact)
{
static ucontact_info_t ci;
static str callid, ua, received, host, path, instance;
static str attr;
struct list_head *_;

cdb_pair_t *pair;
int port, proto;

memset(&ci, 0, sizeof(ucontact_info_t));

/* TODO: find a less convoluted way of implementing this */
list_for_each (_, &row->dict) {
pair = list_entry(_, cdb_pair_t, list);

switch (pair->key.name.s[0]) {
case 'a':
attr = pair->val.val.st;
ci.attr = &attr;
break;
case 'c':
switch (pair->key.name.s[1]) {
case 'a':
callid = pair->val.val.st;
ci.callid = &callid;
break;
case 'f':
ci.flags = pair->val.val.i32;
break;
case 'o':
*contact = pair->val.val.st;
break;
case 's':
ci.cseq = pair->val.val.i32;
break;
}
break;
case 'e':
ci.expires = pair->val.val.i32;
break;
case 'f':
ci.cflags = flag_list_to_bitmask(&pair->val.val.st,
FLAG_TYPE_BRANCH, FLAG_DELIM);
break;
case 'l':
ci.last_modified = pair->val.val.i64;
break;
case 'm':
ci.methods = pair->val.val.i32;
break;
case 'p':
path = pair->val.val.st;
ci.path = &path;
break;
case 'q':
if (str2q(&ci.q, pair->val.val.st.s, pair->val.val.st.len) != 0) {
LM_ERR("bad q: %.*s\n", pair->val.val.st.len,
pair->val.val.st.s);
}
break;
case 'r':
received = pair->val.val.st;
ci.received = received;
break;
case 's':
switch (pair->key.name.s[1]) {
case 'i':
instance = pair->val.val.st;
ci.instance = instance;
break;
case 'o':
if (ZSTR(pair->val.val.st)) {
ci.sock = NULL;
} else {
if (parse_phostport(pair->val.val.st.s, pair->val.val.st.len,
&host.s, &host.len, &port, &proto) != 0) {
LM_ERR("bad socket <%.*s>\n", pair->val.val.st.len,
pair->val.val.st.s);
return NULL;
}

ci.sock = grep_sock_info(&host, (unsigned short)port, proto);
if (!ci.sock)
LM_DBG("non-local socket <%.*s>...ignoring\n",
pair->val.val.st.len, pair->val.val.st.s);
}
break;
}
break;
case 'u':
ua = pair->val.val.st;
ci.user_agent = &ua;
break;
}
}

return &ci;
}

/*! \brief
* expects (UL_COLS - 2) rows:
* expects (UL_COLS - 2) columns:
* contact_id, contact, expires, q, callid, cseq, flags, cflags, ua,
* received, path, socket, methods, last_modified, instance, kv_store, attr)
*/
static inline ucontact_info_t* dbrow2info( db_val_t *vals, str *contact)
static inline ucontact_info_t* dbrow2info(db_val_t *vals, str *contact)
{
static ucontact_info_t ci;
static str callid, ua, received, host, path, instance;
Expand Down Expand Up @@ -1044,6 +1148,78 @@ urecord_t* db_load_urecord(db_con_t* _c, udomain_t* _d, str *_aor)
return r;
}

/*! \brief
* loads from cache DB all contacts of an AOR
*/
urecord_t* cachedb_load_urecord(db_con_t* _c, const udomain_t* _d,
const str *_aor)
{
static const cdb_key_t aor_key = {{"aor", 3}, 1};
struct list_head *_;
ucontact_info_t *ci;
cdb_filter_t *aor_filter;
int_str_t val;
cdb_res_t res;
cdb_row_t *row;
str contact;

urecord_t *r;
ucontact_t *c;

val.is_str = 1;
val.s = *_aor;

aor_filter = cdb_append_filter(NULL, &aor_key, CDB_OP_EQ, &val);
if (!aor_filter) {
LM_ERR("oom\n");
return NULL;
}

cdbf.query(cdbc, aor_filter, &res);

/* TODO: implement use table _d->name */

if (res.count == 0) {
LM_DBG("aor %.*s not found in table %.*s\n", _aor->len, _aor->s,
_d->name->len, _d->name->s);
cdb_free_filters(aor_filter);
return NULL;
}

r = NULL;

list_for_each (_, &res.rows) {
row = list_entry(_, cdb_row_t, list);
ci = cdb_row2info(row, &contact);
if (!ci) {
LM_ERR("skipping record for %.*s in table %s\n",
_aor->len, _aor->s, _d->name->s);
continue;
}

if (!r)
get_static_urecord(_d, _aor, &r);

if (!(c = mem_insert_ucontact(r, &contact, ci))) {
LM_ERR("mem_insert failed\n");
free_urecord(r);
goto out_null;
}

/* We have to do this, because insert_ucontact sets state to CS_NEW
* and we have the contact in the database already */
c->state = CS_SYNC;
}

cdb_free_rows(&res);
cdb_free_filters(aor_filter);
return r;

out_null:
cdb_free_filters(aor_filter);
cdb_free_rows(&res);
return NULL;
}

int db_timer_udomain(udomain_t* _d)
{
Expand Down Expand Up @@ -1327,7 +1503,10 @@ int get_urecord(udomain_t* _d, str* _aor, struct urecord** _r)
urecord_t* r;
void ** dest;

if (db_mode!=DB_ONLY) {
switch (cluster_mode) {
case CM_NONE:
case CM_EDGE:
case CM_CORE:
/* search in cache */
aorhash = core_hash(_aor, 0, 0);
sl = aorhash&(_d->size-1);
Expand All @@ -1340,13 +1519,24 @@ int get_urecord(udomain_t* _d, str* _aor, struct urecord** _r)
*_r = *dest;

return 0;
} else {
break;
case CM_SQL_ONLY:
/* search in DB */
r = db_load_urecord( ul_dbh, _d, _aor);
if (r) {
*_r = r;
return 0;
}
break;
case CM_CORE_CACHEDB_ONLY:
r = cachedb_load_urecord(ul_dbh, _d, _aor);
if (r) {
*_r = r;
return 0;
}
break;
default:
abort();
}

out:
Expand Down

0 comments on commit 30f8e7c

Please sign in to comment.