From a99ba909545afeeb1831034d50923518a9e09f10 Mon Sep 17 00:00:00 2001 From: Vlad Patrascu Date: Wed, 12 Jan 2022 15:22:25 +0200 Subject: [PATCH] b2b_entities/logic: add support for Redis as database storage Work sponsored by ng-voice GmbH. --- cachedb/cachedb.h | 10 + cachedb/cachedb_cap.h | 16 + cachedb/test/test_cachedb.c | 169 ++++ modules/b2b_entities/b2b_entities.c | 121 ++- modules/b2b_entities/b2b_entities.h | 6 + modules/b2b_entities/b2be_db.c | 784 +++++++++++++----- .../b2b_entities/doc/b2b_entities_admin.xml | 36 + modules/b2b_logic/b2b_logic.c | 119 ++- modules/b2b_logic/b2b_logic.h | 5 + modules/b2b_logic/b2bl_db.c | 547 +++++++++--- modules/b2b_logic/doc/b2b_logic_admin.xml | 34 + modules/cachedb_redis/cachedb_redis.c | 3 + modules/cachedb_redis/cachedb_redis_dbase.c | 335 +++++++- modules/cachedb_redis/cachedb_redis_dbase.h | 12 + 14 files changed, 1808 insertions(+), 389 deletions(-) diff --git a/cachedb/cachedb.h b/cachedb/cachedb.h index 84119407175..98ec49eaec5 100644 --- a/cachedb/cachedb.h +++ b/cachedb/cachedb.h @@ -144,6 +144,16 @@ typedef struct cachedb_funcs_t { const cdb_dict_t *pairs); /* TODO: can we also implement these ^ with Redis, or can we adapt them? */ + /* + * Endpoints specific for "map" operations (Redis) + * Support for these endpoints can be verified via CACHEDB_CAP_MAP + */ + + int (*map_get) (cachedb_con *con, const str *key, cdb_res_t *res); + int (*map_set) (cachedb_con *con, const str *key, const str *subkey, + const cdb_dict_t *pairs); + int (*map_remove) (cachedb_con *con, const str *key, const str *subkey); + int capability; } cachedb_funcs; diff --git a/cachedb/cachedb_cap.h b/cachedb/cachedb_cap.h index 5744cc9ea28..c95af9a59e8 100644 --- a/cachedb/cachedb_cap.h +++ b/cachedb/cachedb_cap.h @@ -44,6 +44,12 @@ typedef enum { CACHEDB_CAP_QUERY = 1<<8, CACHEDB_CAP_UPDATE = 1<<9, CACHEDB_CAP_COL_ORIENTED = (CACHEDB_CAP_QUERY|CACHEDB_CAP_UPDATE), + + CACHEDB_CAP_MAP_GET = 1<<10, + CACHEDB_CAP_MAP_SET = 1<<11, + CACHEDB_CAP_MAP_REMOVE = 1<<12, + CACHEDB_CAP_MAP = + (CACHEDB_CAP_MAP_GET|CACHEDB_CAP_MAP_SET|CACHEDB_CAP_MAP_REMOVE), } cachedb_cap; #define CACHEDB_CAPABILITY(cdbf,cpv) (((cdbf)->capability & (cpv)) == (cpv)) @@ -88,6 +94,16 @@ static inline int check_cachedb_api(cachedb_engine *cde) if (cde->cdb_func.query && cde->cdb_func.update) cde->cdb_func.capability |= CACHEDB_CAP_COL_ORIENTED; + if (cde->cdb_func.map_get) + cde->cdb_func.capability |= CACHEDB_CAP_MAP_GET; + if (cde->cdb_func.map_set) + cde->cdb_func.capability |= CACHEDB_CAP_MAP_SET; + if (cde->cdb_func.map_remove) + cde->cdb_func.capability |= CACHEDB_CAP_MAP_REMOVE; + + if (cde->cdb_func.map_get && cde->cdb_func.map_set && cde->cdb_func.map_remove) + cde->cdb_func.capability |= CACHEDB_CAP_MAP; + return 0; } diff --git a/cachedb/test/test_cachedb.c b/cachedb/test/test_cachedb.c index f8393eca13d..36112eeee31 100644 --- a/cachedb/test/test_cachedb.c +++ b/cachedb/test/test_cachedb.c @@ -65,6 +65,11 @@ static void load_cachedb_modules(void) exit(-1); } + if (load_module("cachedb_redis.so") != 0) { + printf("failed to load redis\n"); + exit(-1); + } + if (set_mod_param_regex("cachedb_mongodb", "cachedb_url", STR_PARAM, "mongodb://10.0.0.177:27017/OpensipsTests.OpensipsTests") != 0) { printf("failed to set mongo url\n"); @@ -83,6 +88,12 @@ static void load_cachedb_modules(void) printf("failed to set cassandra url\n"); exit(-1); } + + if (set_mod_param_regex("cachedb_redis", "cachedb_url", STR_PARAM, + "redis://localhost:6379/") != 0) { + printf("failed to set Redis url\n"); + exit(-1); + } } @@ -429,6 +440,158 @@ static int test_column_ops(cachedb_funcs *api, cachedb_con *con1, return 1; } +static int test_map_set(cachedb_funcs *api, cachedb_con *con, + cdb_dict_t *out_pairs1, cdb_dict_t *out_pairs2) +{ + str key, subkey; + cdb_key_t field; + cdb_pair_t *pair; + + cdb_dict_init(out_pairs1); + + init_str(&key, "keyA"); + init_str(&subkey, "subkeyAB"); + + cdb_key_init(&field, "field_null"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_NULL; + cdb_dict_add(pair, out_pairs1); + + cdb_key_init(&field, "field_32bit"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_INT32; + pair->val.val.i32 = 2147483647; + cdb_dict_add(pair, out_pairs1); + + cdb_key_init(&field, "field_str"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_STR; + init_str(&pair->val.val.st, pkg_strdup("foo")); + cdb_dict_add(pair, out_pairs1); + + if (!ok(api->map_set(con, &key, &subkey, out_pairs1) == 0)) + return 0; + + cdb_dict_init(out_pairs2); + + init_str(&key, "keyB"); + + cdb_key_init(&field, "field_null"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_NULL; + cdb_dict_add(pair, out_pairs2); + + cdb_key_init(&field, "field_32bit"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_INT32; + pair->val.val.i32 = 1148367; + cdb_dict_add(pair, out_pairs2); + + cdb_key_init(&field, "field_str"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_STR; + init_str(&pair->val.val.st, pkg_strdup("bar")); + cdb_dict_add(pair, out_pairs2); + + if (!ok(api->map_set(con, &key, &subkey, out_pairs2) == 0)) + return 0; + + return 1; +} + +static int test_map_get(cachedb_funcs *api, cachedb_con *con, + const cdb_dict_t *pairs1, const cdb_dict_t *pairs2) +{ + cdb_res_t res; + cdb_row_t *row; + struct list_head *_; + cdb_pair_t *pair1, *pair2; + cdb_key_t key1, key2; + int found = 0; + + if (!ok(api->map_get(con, NULL, &res) == 0)) + return 0; + + key1.is_pk = 1; + init_str(&key1.name, "keyA"); + key2.is_pk = 1; + init_str(&key2.name, "keyB"); + + list_for_each (_, &res.rows) { + row = list_entry(_, cdb_row_t, list); + + pair1 = cdb_dict_fetch(&key1, &row->dict); + if (pair1 && dict_cmp(&pair1->val.val.dict, pairs1) == 0) + found++; + pair2 = cdb_dict_fetch(&key2, &row->dict); + if (pair2 && dict_cmp(&pair2->val.val.dict, pairs2) == 0) + found++; + } + + cdb_free_rows(&res); + + if (!ok(found == 2)) + return 0; + + return 1; +} + +static int test_map_ops(cachedb_funcs *api, cachedb_con *con) +{ + cdb_dict_t cols1, cols2; + str key1, key2, subkey; + cdb_key_t field; + cdb_pair_t *pair; + + if (!ok(test_map_set(api, con, &cols1, &cols2), "test map set") || + !ok(test_map_get(api, con, &cols1, &cols2), "test map get")) + return 0; + + init_str(&subkey, "subkeyAB"); + + if (!ok(api->map_remove(con, NULL, &subkey) == 0)) + return 0; + + cdb_free_entries(&cols1, osips_pkg_free); + cdb_free_entries(&cols2, osips_pkg_free); + + cdb_dict_init(&cols1); + + init_str(&key1, "keyC"); + init_str(&subkey, "subkeyCD"); + + cdb_key_init(&field, "field_str"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_STR; + init_str(&pair->val.val.st, pkg_strdup("baz")); + cdb_dict_add(pair, &cols1); + + api->map_set(con, &key1, &subkey, &cols1); + + cdb_dict_init(&cols2); + + init_str(&key2, "keyD"); + init_str(&subkey, "subkeyCD"); + + cdb_key_init(&field, "field_str"); + pair = cdb_mk_pair(&field, NULL); + pair->val.type = CDB_STR; + init_str(&pair->val.val.st, pkg_strdup("biz")); + cdb_dict_add(pair, &cols2); + + api->map_set(con, &key2, &subkey, &cols2); + + if (!ok(api->map_remove(con, &key1, &subkey) == 0)) + return 0; + if (!ok(api->map_remove(con, &key2, &subkey) == 0)) + return 0; + + cdb_free_entries(&cols1, osips_pkg_free); + cdb_free_entries(&cols2, osips_pkg_free); + + return 1; +} + static void test_cachedb_api(const char *cachedb_name, const char *group1, const char *group2) { @@ -487,6 +650,10 @@ static void test_cachedb_api(const char *cachedb_name, const char *group1, if (CACHEDB_CAPABILITY(&cde->cdb_func, CACHEDB_CAP_COL_ORIENTED)) ok(test_column_ops(&cde->cdb_func, con1, con2, cachedb_name), "column-oriented tests"); + + if (CACHEDB_CAPABILITY(&cde->cdb_func, CACHEDB_CAP_MAP)) + ok(test_map_ops(&cde->cdb_func, con1), + "map ops tests"); } /* @@ -511,6 +678,8 @@ static void test_cachedb_backends(void) test_cachedb_api("mongodb", NULL, NULL); test_cachedb_api("cassandra", "test1", "test2"); + test_cachedb_api("redis", NULL, NULL); + // todo(); // skip tests here // end_todo; diff --git a/modules/b2b_entities/b2b_entities.c b/modules/b2b_entities/b2b_entities.c index 886c9a8e060..375f7fc3833 100644 --- a/modules/b2b_entities/b2b_entities.c +++ b/modules/b2b_entities/b2b_entities.c @@ -64,7 +64,9 @@ static char* script_req_route; static char* script_reply_route; int req_routeid = -1; int reply_routeid = -1; -static str db_url; +str db_url; +str b2be_cdb_url; +str cdb_key_prefix = str_init("b2be$"); db_con_t *b2be_db; db_func_t b2be_dbf; str b2be_dbtable= str_init("b2b_entities"); @@ -75,6 +77,9 @@ int b2be_db_mode = WRITE_BACK; b2b_table server_htable; b2b_table client_htable; +cachedb_funcs b2be_cdbf; +cachedb_con *b2be_cdb; + int b2be_cluster; int serialize_backend; @@ -101,6 +106,8 @@ static param_export_t params[]={ { "script_req_route", STR_PARAM, &script_req_route }, { "script_reply_route", STR_PARAM, &script_reply_route }, { "db_url", STR_PARAM, &db_url.s }, + { "cachedb_url", STR_PARAM, &b2be_cdb_url.s }, + { "cachedb_key_prefix", STR_PARAM, &cdb_key_prefix.s }, { "db_table", STR_PARAM, &b2be_dbtable.s }, { "db_mode", INT_PARAM, &b2be_db_mode }, { "update_period", INT_PARAM, &b2b_update_period }, @@ -223,6 +230,19 @@ static int mod_init(void) } memset(&b2be_dbf, 0, sizeof(db_func_t)); + if(b2be_db_mode) { + if (!b2be_cdb_url.s) { + init_db_url(db_url, 1); + if (!db_url.s) + b2be_db_mode = NO_DB; + } else if (db_url.s) { + LM_ERR("Both 'db_url' and 'cachedb_url' defined\n"); + return -1; + } else { + b2be_cdb_url.len = strlen(b2be_cdb_url.s); + } + } + if(b2be_db_mode) init_db_url(db_url, 1); @@ -270,9 +290,40 @@ static int mod_init(void) if(b2be_db) b2be_dbf.close(b2be_db); b2be_db = NULL; + } else if (b2be_db_mode && b2be_cdb_url.s) { + if (cachedb_bind_mod(&b2be_cdb_url, &b2be_cdbf) < 0) { + LM_ERR("cannot bind functions for cachedb_url %.*s\n", + b2be_cdb_url.len, b2be_cdb_url.s); + return -1; + } + + if (!CACHEDB_CAPABILITY(&b2be_cdbf, CACHEDB_CAP_MAP)) { + LM_ERR("not enough capabilities for cachedb_url %.*s\n", + b2be_cdb_url.len, b2be_cdb_url.s); + return -1; + } + + b2be_cdb = b2be_cdbf.init(&b2be_cdb_url); + if (!b2be_cdb) { + LM_ERR("connecting to database failed\n"); + return -1; + } + + cdb_key_prefix.len = strlen(cdb_key_prefix.s); + + b2be_initialize(); + + /* reload data */ + if(b2b_entities_restore() < 0) + { + LM_ERR("Failed to restore data from database\n"); + return -1; + } + + if(b2be_cdb) + b2be_cdbf.destroy(b2be_cdb); + b2be_cdb = NULL; } - else - b2be_db_mode = 0; if(register_script_cb( b2b_prescript_f, PRE_SCRIPT_CB|REQ_TYPE_CB, 0 ) < 0) { @@ -367,21 +418,35 @@ void check_htables(void) static int child_init(int rank) { /* if database is needed */ - if (b2be_db_mode && db_url.s) - { - if (b2be_dbf.init==0) - { - LM_CRIT("child_init: database not bound\n"); - return -1; - } + if (b2be_db_mode) { + if (db_url.s) { + if (b2be_dbf.init==0) + { + LM_CRIT("child_init: database not bound\n"); + return -1; + } - b2be_db = b2be_dbf.init(&db_url); - if(!b2be_db) - { - LM_ERR("connecting to database failed\n"); - return -1; + b2be_db = b2be_dbf.init(&db_url); + if(!b2be_db) + { + LM_ERR("connecting to database failed\n"); + return -1; + } + LM_DBG("child %d: Database connection opened successfully\n", rank); + } else { + if (!b2be_cdbf.init) { + LM_ERR("cachedb functions not initialized\n"); + return -1; + } + + b2be_cdb = b2be_cdbf.init(&b2be_cdb_url); + if (!b2be_cdb) { + LM_ERR("connecting to database failed\n"); + return -1; + } + + LM_DBG("child %d: cachedb connection opened successfully\n", rank); } - LM_DBG("child %d: Database connection opened successfully\n", rank); } check_htables(); return 0; @@ -390,13 +455,23 @@ static int child_init(int rank) /** Module destroy function */ static void mod_destroy(void) { - if (b2be_dbf.init && b2be_db_mode==WRITE_BACK) { - b2be_db = b2be_dbf.init(&db_url); - if(!b2be_db) { - LM_ERR("connecting to database failed, unable to flush\n"); - } else { - b2b_entities_dump(1); - b2be_dbf.close(b2be_db); + if (b2be_db_mode==WRITE_BACK) { + if (b2be_dbf.init) { + b2be_db = b2be_dbf.init(&db_url); + if(!b2be_db) { + LM_ERR("connecting to database failed, unable to flush\n"); + } else { + b2b_entities_dump(1); + b2be_dbf.close(b2be_db); + } + } else if (b2be_cdbf.init) { + b2be_cdb = b2be_cdbf.init(&b2be_cdb_url); + if (!b2be_cdb) { + LM_ERR("connecting to database failed\n"); + } else { + b2b_entities_dump(1); + b2be_cdbf.destroy(b2be_cdb); + } } } destroy_b2b_htables(); diff --git a/modules/b2b_entities/b2b_entities.h b/modules/b2b_entities/b2b_entities.h index da3829e1e94..aa78670da0c 100644 --- a/modules/b2b_entities/b2b_entities.h +++ b/modules/b2b_entities/b2b_entities.h @@ -37,6 +37,7 @@ #include "client.h" #include "server.h" #include "../../db/db.h" +#include "../../cachedb/cachedb.h" /* modes to write in db */ #define NO_DB 0 @@ -53,12 +54,17 @@ extern struct tm_binds tmb; extern uac_auth_api_t uac_auth_api; extern int req_routeid; extern int reply_routeid; +extern str db_url; +extern str b2be_cdb_url; extern db_con_t *b2be_db; extern db_func_t b2be_dbf; +extern cachedb_funcs b2be_cdbf; +extern cachedb_con *b2be_cdb; extern str b2be_dbtable; extern int b2be_db_mode; extern int serialize_backend; extern int b2b_ctx_idx; +extern str cdb_key_prefix; void *b2b_get_context(void); diff --git a/modules/b2b_entities/b2be_db.c b/modules/b2b_entities/b2be_db.c index 3945e555e37..85ea6ffcc6c 100644 --- a/modules/b2b_entities/b2be_db.c +++ b/modules/b2b_entities/b2be_db.c @@ -133,17 +133,113 @@ void b2be_initialize(void) qvals[26].type= DB_STR; } -int b2be_db_insert(b2b_dlg_t* dlg, int type) +void cdb_add_n_pairs(cdb_dict_t *pairs, int idx_start, int idx_end) { - dlg_leg_t* leg; - int cols_no; + int i; - if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); + for (i = idx_start; i <= idx_end; i++) + if (qvals[i].nul || (qvals[i].type == DB_STR && !qvals[i].val.str_val.s)) + cdb_dict_add_null(pairs, qcols[i]->s, qcols[i]->len); + else if (qvals[i].type == DB_STR || qvals[i].type == DB_BLOB) + cdb_dict_add_str(pairs, qcols[i]->s, qcols[i]->len, + &qvals[i].val.str_val); + else if (qvals[i].type == DB_INT) + cdb_dict_add_int32(pairs, qcols[i]->s, qcols[i]->len, + qvals[i].val.int_val); +} + +static inline str *get_b2be_map_key(int type, str *tag0, str *tag1, str *callid) +{ + static str key = {0,0}; + int len = 3/*3 x '$'*/ + 1 /*type*/ + cdb_key_prefix.len + + tag0->len + tag1->len + callid->len + 1; + + /* map key format: [prefix][type]$[tag0]$[tag1]$[callid] */ + key.s = pkg_malloc(len); + if (!key.s) { + LM_ERR("no more pkg memory\n"); + return NULL; + } + + key.len = snprintf(key.s, len, "%.*s%d$%.*s$%.*s$%.*s", + cdb_key_prefix.len, cdb_key_prefix.s, type, tag0->len, tag0->s, + tag1->len, tag1->s, callid->len, callid->s); + + return &key; +} + +static inline str *get_b2be_map_subkey(str *param) +{ + static str subkey = {0,0}; + + /* subkey format: [prefix][param] */ + subkey.len = cdb_key_prefix.len + param->len; + subkey.s = pkg_malloc(subkey.len); + if (!subkey.s) { + LM_ERR("no more pkg memory\n"); + return NULL; + } + + memcpy(subkey.s, cdb_key_prefix.s, cdb_key_prefix.len); + memcpy(subkey.s + cdb_key_prefix.len, param->s, param->len); + + return &subkey; +} + +static int b2be_cdb_insert(int type, b2b_dlg_t* dlg, int cols_no) +{ + cdb_dict_t cdb_pairs; + str *cdb_key, *cdb_subkey; + int i; + int rc; + + cdb_dict_init(&cdb_pairs); + + cdb_key = get_b2be_map_key(type, &dlg->tag[0], &dlg->tag[1], &dlg->callid); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); return -1; } + cdb_add_n_pairs(&cdb_pairs, 0, cols_no - 1); + + if(!dlg->legs) { + for(i = cols_no; i < cols_no + 4; i++) + qvals[i].nul = 1; + + cdb_add_n_pairs(&cdb_pairs, cols_no, cols_no + 3); + + for(i = cols_no; i < cols_no + 4; i++) + qvals[i].nul = 0; + } + + if (qvals[12].val.str_val.s) { + cdb_subkey = get_b2be_map_subkey(&qvals[12].val.str_val); + if (!cdb_subkey) { + LM_ERR("Failed to build map subkey\n"); + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + return -1; + } + } else { + cdb_subkey = NULL; + } + + if ((rc = b2be_cdbf.map_set(b2be_cdb, cdb_key, cdb_subkey, &cdb_pairs))) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_subkey->s); + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + + return rc; +} + +int b2be_db_insert(b2b_dlg_t* dlg, int type) +{ + dlg_leg_t* leg; + int cols_no; + qvals[0].val.int_val = type; qvals[1].val.str_val = dlg->tag[0]; qvals[2].val.str_val = dlg->tag[1]; @@ -198,29 +294,88 @@ int b2be_db_insert(b2b_dlg_t* dlg, int type) cols_no = 27; } - /* insert into database */ - if(b2be_dbf.insert(b2be_db, qcols, qvals, cols_no)< 0) - { - LM_ERR("Sql insert failed\n"); - return -1; + if (b2be_cdb_url.s) { + if (b2be_cdb_insert(type, dlg, cols_no)) + return -1; + } else { + if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return -1; + } + + /* insert into database */ + if(b2be_dbf.insert(b2be_db, qcols, qvals, cols_no)< 0) + { + LM_ERR("Sql insert failed\n"); + return -1; + } } - LM_DBG("INSERTED [%.*s], [%.*s]\n", dlg->tag[0].len, dlg->tag[0].s, dlg->callid.len, dlg->callid.s); + + LM_DBG("INSERTED [%.*s], [%.*s]\n", dlg->tag[0].len, dlg->tag[0].s, + dlg->callid.len, dlg->callid.s); return 0; } -int b2be_db_update(b2b_dlg_t* dlg, int type) +static void b2b_entity_cdb_delete(int type, b2b_dlg_t* dlg) { - dlg_leg_t* leg; - int cols_no; + str *cdb_key, *cdb_subkey; - qvals[0].val.int_val = type; + cdb_key = get_b2be_map_key(type, &dlg->tag[0], &dlg->tag[1], &dlg->callid); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + return; + } - if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); + if (!str_check_token(&dlg->param)) { + cdb_subkey = NULL; + } else { + cdb_subkey = get_b2be_map_subkey(&dlg->param); + if (!cdb_subkey) { + LM_ERR("Failed to build map key\n"); + return; + } + } + + if (b2be_cdbf.map_remove(b2be_cdb, cdb_key, cdb_subkey) < 0) + LM_ERR("Failed to delete from cachedb\n"); + + if (cdb_subkey) + pkg_free(cdb_subkey->s); + pkg_free(cdb_key->s); +} + +static int b2be_cdb_update(int type, b2b_dlg_t* dlg, int cols_no) +{ + cdb_dict_t cdb_pairs; + str *cdb_key; + int rc; + + cdb_dict_init(&cdb_pairs); + + cdb_key = get_b2be_map_key(type, &dlg->tag[0], &dlg->tag[1], &dlg->callid); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); return -1; } + cdb_add_n_pairs(&cdb_pairs, n_start_update, cols_no - 1); + + if ((rc = b2be_cdbf.map_set(b2be_cdb, cdb_key, NULL, &cdb_pairs))) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + + return rc; +} + +int b2be_db_update(b2b_dlg_t* dlg, int type) +{ + dlg_leg_t* leg; + int cols_no; + + qvals[0].val.int_val = type; qvals[1].val.str_val = dlg->tag[0]; qvals[2].val.str_val = dlg->tag[1]; qvals[3].val.str_val = dlg->callid; @@ -228,10 +383,20 @@ int b2be_db_update(b2b_dlg_t* dlg, int type) /* if the state is terminated delete the record */ if(dlg->state == B2B_TERMINATED) { - if(b2be_dbf.delete(b2be_db, qcols, 0, qvals, n_query_update)< 0) - { - LM_ERR("Sql delete failed\n"); - return -1; + if (b2be_cdb_url.s) { + b2b_entity_cdb_delete(type, dlg); + } else { + if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return -1; + } + + if(b2be_dbf.delete(b2be_db, qcols, 0, qvals, n_query_update)< 0) + { + LM_ERR("Sql delete failed\n"); + return -1; + } } return 0; } @@ -252,6 +417,7 @@ int b2be_db_update(b2b_dlg_t* dlg, int type) qvals[21].val.str_val = dlg->contact[0]; qvals[22].val.str_val = dlg->contact[1]; cols_no = 23; + leg = dlg->legs; if(leg) /* there can only be one leg as we do not deal with dialogs in early state */ { @@ -262,13 +428,25 @@ int b2be_db_update(b2b_dlg_t* dlg, int type) cols_no = 27; } - if(b2be_dbf.update(b2be_db, qcols, 0, qvals, - qcols+n_start_update, qvals+n_start_update, - n_query_update, cols_no-n_start_update)< 0) - { - LM_ERR("Sql update failed\n"); - return -1; + if (b2be_cdb_url.s) { + if (b2be_cdb_update(type, dlg, cols_no)) + return -1; + } else { + if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return -1; + } + + if(b2be_dbf.update(b2be_db, qcols, 0, qvals, + qcols+n_start_update, qvals+n_start_update, + n_query_update, cols_no-n_start_update)< 0) + { + LM_ERR("Sql update failed\n"); + return -1; + } } + LM_DBG("UPDATED [%.*s], [%.*s] State= %d\n", dlg->tag[0].len, dlg->tag[0].s, dlg->callid.len, dlg->callid.s, dlg->state); return 0; @@ -279,13 +457,16 @@ void store_b2b_dlg(b2b_table htable, unsigned int hsize, int type, int no_lock) int i; dlg_leg_t* leg; b2b_dlg_t* dlg; + int cols_no; - if (!b2be_dbf.init) + if (db_url.s && !b2be_dbf.init) + return; + else if (b2be_cdb_url.s && !b2be_cdbf.init) return; qvals[0].val.int_val = type; //LM_DBG("storing b2b_entities type '%d' in db\n", type); - if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + if(db_url.s && b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) { LM_ERR("sql use table failed\n"); return; @@ -346,6 +527,7 @@ void store_b2b_dlg(b2b_table htable, unsigned int hsize, int type, int no_lock) qvals[20].val.int_val = dlg->last_invite_cseq; qvals[21].val.str_val = dlg->contact[0]; qvals[22].val.str_val = dlg->contact[1]; + cols_no = 23; leg = dlg->legs; if(leg) /* there can only be one leg as we do not deal with dialogs in early state */ @@ -354,29 +536,46 @@ void store_b2b_dlg(b2b_table htable, unsigned int hsize, int type, int no_lock) qvals[24].val.int_val= leg->cseq; qvals[25].val.str_val= leg->contact; qvals[26].val.str_val= leg->route_set; + cols_no = 27; } if(dlg->db_flag == INSERTDB_FLAG) { - /* insert into database */ - if(b2be_dbf.insert(b2be_db, qcols, qvals, DB_COLS_NO)< 0) - { - LM_ERR("Sql insert failed\n"); - if(!no_lock) - lock_release(&htable[i].lock); - return; + if (b2be_cdb_url.s) { + if (b2be_cdb_insert(type, dlg, cols_no)) { + if(!no_lock) + lock_release(&htable[i].lock); + return; + } + } else { + /* insert into database */ + if(b2be_dbf.insert(b2be_db, qcols, qvals, DB_COLS_NO)< 0) + { + LM_ERR("Sql insert failed\n"); + if(!no_lock) + lock_release(&htable[i].lock); + return; + } } } else { - if(b2be_dbf.update(b2be_db, qcols, 0, qvals, - qcols+n_start_update, qvals+n_start_update, - n_query_update, DB_COLS_NO-n_start_update)< 0) - { - LM_ERR("Sql update failed\n"); - if(!no_lock) - lock_release(&htable[i].lock); - return; + if (b2be_cdb_url.s) { + if (b2be_cdb_update(type, dlg, cols_no)) { + if(!no_lock) + lock_release(&htable[i].lock); + return; + } + } else { + if(b2be_dbf.update(b2be_db, qcols, 0, qvals, + qcols+n_start_update, qvals+n_start_update, + n_query_update, DB_COLS_NO-n_start_update)< 0) + { + LM_ERR("Sql update failed\n"); + if(!no_lock) + lock_release(&htable[i].lock); + return; + } } } @@ -394,25 +593,144 @@ void store_b2b_dlg(b2b_table htable, unsigned int hsize, int type, int no_lock) } } -int b2b_entities_restore(void) +static int load_entity(int_str_t *vals) { - db_res_t *result= NULL; - db_row_t *rows = NULL; - db_val_t *row_vals= NULL; - int i; dlg_leg_t leg, *new_leg; b2b_dlg_t dlg, *shm_dlg= NULL; unsigned int hash_index, local_index; - int nr_rows; str* b2b_key; str sockinfo_str; str host; int port, proto; b2b_table htable; int type; - int no_rows = 10; uint64_t ts = 0; + memset(&dlg, 0, sizeof(b2b_dlg_t)); + + type = vals[0].i; + dlg.tag[1] = vals[2].s; + dlg.callid = vals[3].s; + + if(type == B2B_SERVER)/* extract hash and local index */ + { + htable = server_htable; + if(b2b_parse_key(&dlg.tag[1], &hash_index, &local_index, &ts) < 0) + { + LM_ERR("Wrong format for b2b key [%.*s]\n", dlg.tag[1].len, dlg.tag[1].s); + return -1; + } + dlg.tag[1].s = NULL; + dlg.tag[1].len = 0; + + if (hash_index >= server_hsize) { + LM_ERR("Hash Index [%d] too large! Increase the 'server_hsize'" + "parameter!\n", hash_index); + return -1; + } + } + else + { + htable = client_htable; + + if(b2b_parse_key(&dlg.callid, &hash_index, &local_index, NULL) < 0) + { + LM_ERR("Wrong format for b2b key [%.*s]\n", dlg.callid.len, dlg.callid.s); + return -1; + } + + if (hash_index >= client_hsize) { + LM_DBG("Hash Index [%d] too large! Increase the 'client_hsize'" + "parameter!\n", hash_index); + return -1; + } + } + dlg.id = local_index; + dlg.state = vals[15].i; + dlg.ruri = vals[4].s; + dlg.from_uri = vals[5].s; + dlg.from_dname = vals[6].s; + dlg.to_uri = vals[7].s; + dlg.to_dname = vals[8].s; + dlg.tag[0] = vals[1].s; + dlg.cseq[0] = vals[16].i; + dlg.cseq[1] = vals[17].i; + dlg.route_set[0] = vals[9].s; + dlg.route_set[1] = vals[10].s; + dlg.contact[0] = vals[21].s; + dlg.contact[1] = vals[22].s; + dlg.last_method = vals[18].i; + dlg.last_reply_code = vals[19].i; + dlg.last_invite_cseq = vals[20].i; + dlg.param = vals[12].s; + dlg.mod_name = vals[13].s; + sockinfo_str = vals[11].s; + if(sockinfo_str.s) + { + if(sockinfo_str.len) + { + if (parse_phostport (sockinfo_str.s, sockinfo_str.len, &host.s, + &host.len, &port, &proto )< 0) + { + LM_ERR("bad format for stored sockinfo string [%.*s]\n", + sockinfo_str.len, sockinfo_str.s); + return -1; + } + dlg.send_sock = grep_sock_info(&host, (unsigned short) port, + (unsigned short) proto); + } + } + dlg.db_flag = NO_UPDATEDB_FLAG; + shm_dlg = b2b_dlg_copy(&dlg); + if(shm_dlg == NULL) + { + LM_ERR("Failed to create new dialog structure\n"); + return -1; + } + b2b_key= b2b_htable_insert(htable,shm_dlg,hash_index, ts, type, 1, 0); + if(b2b_key == NULL) + { + LM_ERR("Failed to insert new record\n"); + return -1; + } + pkg_free(b2b_key); + + if (vals[14].s.len) { + if (shm_str_dup(&shm_dlg->storage, &vals[14].s) < 0) { + LM_ERR("oom!\n"); + return -1; + } + } + + memset(&leg, 0, sizeof(dlg_leg_t)); + leg.tag = vals[23].s; + if(leg.tag.s) { + leg.cseq = vals[24].i; + leg.contact = vals[25].s; + leg.route_set = vals[26].s; + + new_leg = b2b_dup_leg(&leg, SHM_MEM_TYPE); + if(new_leg== NULL) + { + LM_ERR("Failed to construct b2b leg structure\n"); + return -1; + } + shm_dlg->legs = new_leg; + } + + return 0; +} + +int b2b_entities_restore_db(void) +{ + db_res_t *result= NULL; + db_row_t *rows = NULL; + db_val_t *row_vals= NULL; + int i; + int nr_rows; + int no_rows = 10; + int_str_t vals[DB_COLS_NO]; + if(b2be_db == NULL) { LM_DBG("NULL database connection\n"); @@ -460,135 +778,68 @@ int b2b_entities_restore(void) for(i=0; i= server_hsize) { - LM_ERR("Hash Index [%d] too large! Increase the 'server_hsize'" - "parameter!\n", hash_index); - goto error; - } + memset(vals, 0, sizeof vals); + + vals[0].i = row_vals[0].val.int_val; + vals[2].s.s = (char*)row_vals[2].val.string_val; + vals[2].s.len = vals[2].s.s?strlen(vals[2].s.s):0; + vals[3].s.s = (char*)row_vals[3].val.string_val; + vals[3].s.len = vals[3].s.s?strlen(vals[3].s.s):0; + + vals[15].i = row_vals[15].val.int_val; + vals[4].s.s = (char*)row_vals[4].val.string_val; + vals[4].s.len = vals[4].s.s?strlen(vals[4].s.s):0; + vals[5].s.s = (char*)row_vals[5].val.string_val; + vals[5].s.len = strlen(vals[5].s.s); + vals[6].s.s = (char*)row_vals[6].val.string_val; + vals[6].s.len = vals[6].s.s?strlen(vals[6].s.s):0; + vals[7].s.s = (char*)row_vals[7].val.string_val; + vals[7].s.len = strlen(vals[7].s.s); + vals[8].s.s = (char*)row_vals[8].val.string_val; + vals[8].s.len = vals[8].s.s?strlen(vals[8].s.s):0; + vals[1].s.s = (char*)row_vals[1].val.string_val; + vals[1].s.len = vals[1].s.s?strlen(vals[1].s.s):0; + vals[16].i = row_vals[16].val.int_val; + vals[17].i = row_vals[17].val.int_val; + vals[9].s.s = (char*)row_vals[9].val.string_val; + vals[9].s.len = vals[9].s.s?strlen(vals[9].s.s):0; + vals[10].s.s = (char*)row_vals[10].val.string_val; + vals[10].s.len = vals[10].s.s?strlen(vals[10].s.s):0; + vals[21].s.s = (char*)row_vals[21].val.string_val; + vals[21].s.len = vals[21].s.s?strlen(vals[21].s.s):0; + vals[22].s.s = (char*)row_vals[22].val.string_val; + vals[22].s.len = vals[22].s.s?strlen(vals[22].s.s):0; + vals[18].i = row_vals[18].val.int_val; + vals[19].i = row_vals[19].val.int_val; + vals[20].i = row_vals[20].val.int_val; + vals[12].s.s = (char*)row_vals[12].val.string_val; + vals[12].s.len = vals[12].s.s?strlen(vals[12].s.s):0; + vals[13].s.s = (char*)row_vals[13].val.string_val; + vals[13].s.len = vals[13].s.s?strlen(vals[13].s.s):0; + vals[11].s.s = (char*)row_vals[11].val.string_val; + vals[11].s.len = vals[11].s.s?strlen(vals[11].s.s):0; + + if (!VAL_NULL(&row_vals[14])) { + vals[14].s = VAL_BLOB(&row_vals[14]); + } else { + vals[14].s.s = NULL; + vals[14].s.len = 0; } - else - { - htable = client_htable; - if(b2b_parse_key(&dlg.callid, &hash_index, &local_index, NULL) < 0) - { - LM_ERR("Wrong format for b2b key [%.*s]\n", dlg.callid.len, dlg.callid.s); - goto error; - } + vals[23].s.s = (char*)row_vals[23].val.string_val; + vals[23].s.len = vals[23].s.s?strlen(vals[23].s.s):0; - if (hash_index >= client_hsize) { - LM_DBG("Hash Index [%d] too large! Increase the 'client_hsize'" - "parameter!\n", hash_index); - goto error; - } - } - dlg.id = local_index; - dlg.state = row_vals[15].val.int_val; - dlg.ruri.s = (char*)row_vals[4].val.string_val; - dlg.ruri.len = dlg.ruri.s?strlen(dlg.ruri.s):0; - dlg.from_uri.s = (char*)row_vals[5].val.string_val; - dlg.from_uri.len = strlen(dlg.from_uri.s); - dlg.from_dname.s = (char*)row_vals[6].val.string_val; - dlg.from_dname.len = dlg.from_dname.s?strlen(dlg.from_dname.s):0; - dlg.to_uri.s = (char*)row_vals[7].val.string_val; - dlg.to_uri.len = strlen(dlg.to_uri.s); - dlg.to_dname.s = (char*)row_vals[8].val.string_val; - dlg.to_dname.len = dlg.to_dname.s?strlen(dlg.to_dname.s):0; - dlg.tag[0].s = (char*)row_vals[1].val.string_val; - dlg.tag[0].len = dlg.tag[0].s?strlen(dlg.tag[0].s):0; - dlg.cseq[0] = row_vals[16].val.int_val; - dlg.cseq[1] = row_vals[17].val.int_val; - dlg.route_set[0].s = (char*)row_vals[9].val.string_val; - dlg.route_set[0].len = dlg.route_set[0].s?strlen(dlg.route_set[0].s):0; - dlg.route_set[1].s = (char*)row_vals[10].val.string_val; - dlg.route_set[1].len = dlg.route_set[1].s?strlen(dlg.route_set[1].s):0; - dlg.contact[0].s = (char*)row_vals[21].val.string_val; - dlg.contact[0].len = dlg.contact[0].s?strlen(dlg.contact[0].s):0; - dlg.contact[1].s = (char*)row_vals[22].val.string_val; - dlg.contact[1].len = dlg.contact[1].s?strlen(dlg.contact[1].s):0; - dlg.last_method = row_vals[18].val.int_val; - dlg.last_reply_code = row_vals[19].val.int_val; - dlg.last_invite_cseq = row_vals[20].val.int_val; - dlg.param.s = (char*)row_vals[12].val.string_val; - dlg.param.len = dlg.param.s?strlen(dlg.param.s):0; - dlg.mod_name.s = (char*)row_vals[13].val.string_val; - dlg.mod_name.len = dlg.mod_name.s?strlen(dlg.mod_name.s):0; - sockinfo_str.s = (char*)row_vals[11].val.string_val; - if(sockinfo_str.s) - { - sockinfo_str.len = strlen(sockinfo_str.s); - if(sockinfo_str.len) - { - if (parse_phostport (sockinfo_str.s, sockinfo_str.len, &host.s, - &host.len, &port, &proto )< 0) - { - LM_ERR("bad format for stored sockinfo string [%.*s]\n", - sockinfo_str.len, sockinfo_str.s); - goto error; - } - dlg.send_sock = grep_sock_info(&host, (unsigned short) port, - (unsigned short) proto); - } - } - dlg.db_flag = NO_UPDATEDB_FLAG; - shm_dlg = b2b_dlg_copy(&dlg); - if(shm_dlg == NULL) - { - LM_ERR("Failed to create new dialog structure\n"); - goto error; - } - b2b_key= b2b_htable_insert(htable,shm_dlg,hash_index, ts, type, 1, 0); - if(b2b_key == NULL) - { - LM_ERR("Failed to insert new record\n"); - goto error; - } - pkg_free(b2b_key); - - if (!VAL_NULL(row_vals+14)) { - if (shm_str_dup(&shm_dlg->storage, &(VAL_BLOB(row_vals+14))) < 0) { - LM_ERR("oom!\n"); - goto error; - } + if (vals[23].s.s) { + vals[24].i = row_vals[24].val.int_val; + vals[25].s.s = (char*)row_vals[25].val.string_val; + vals[25].s.len = vals[25].s.s?strlen(vals[25].s.s):0; + vals[26].s.s = (char*)row_vals[26].val.string_val; + vals[26].s.len = vals[26].s.s?strlen(vals[26].s.s):0; } - memset(&leg, 0, sizeof(dlg_leg_t)); - leg.tag.s= (char*)row_vals[23].val.string_val; - if(!leg.tag.s) - continue; - leg.tag.len = strlen(leg.tag.s); - leg.cseq = row_vals[24].val.int_val; - leg.contact.s = (char*)row_vals[25].val.string_val; - leg.contact.len = leg.contact.s?strlen(leg.contact.s):0; - leg.route_set.s = (char*)row_vals[26].val.string_val; - leg.route_set.len = leg.route_set.s?strlen(leg.route_set.s):0; - - new_leg = b2b_dup_leg(&leg, SHM_MEM_TYPE); - if(new_leg== NULL) - { - LM_ERR("Failed to construct b2b leg structure\n"); + if (load_entity(vals) < 0) goto error; - } - shm_dlg->legs = new_leg; } /* any more data to be fetched ?*/ @@ -614,6 +865,115 @@ int b2b_entities_restore(void) return -1; } +static int get_val_from_dict(int idx, int is_str, cdb_dict_t *dict, + int_str_t *vals) +{ + cdb_key_t key; + cdb_pair_t *pair; + + key.is_pk = 0; + key.name = *qcols[idx]; + + pair = cdb_dict_fetch(&key, dict); + if (!pair) { + LM_ERR("Field '%.*s' not found\n", key.name.len, key.name.s); + return -1; + } + + if (is_str) { + if (pair->val.type == CDB_STR) { + vals[idx].s = pair->val.val.st; + } else if (pair->val.type != CDB_NULL) { + LM_ERR("Unexpected type [%d] for field '%.*s'\n", + pair->val.type, key.name.len, key.name.s); + return -1; + } + } else { + if (pair->val.type == CDB_INT32) { + vals[idx].i = pair->val.val.i32; + } else if (pair->val.type != CDB_NULL) { + LM_ERR("Unexpected type [%d] for field '%.*s'\n", + pair->val.type, key.name.len, key.name.s); + return -1; + } + } + + return 0; +} + +int b2b_entities_restore_cdb(void) +{ + cdb_res_t res; + cdb_row_t *row; + struct list_head *_; + cdb_pair_t *pair; + int_str_t vals[DB_COLS_NO]; + + if (b2be_cdbf.map_get(b2be_cdb, NULL, &res) != 0) + LM_ERR("Failed to retrieve map keys\n"); + + list_for_each (_, &res.rows) { + row = list_entry(_, cdb_row_t, list); + /* we have a single pair per row, that contains a dict + * with all the fields */ + pair = list_last_entry(&row->dict, cdb_pair_t, list); + + if (pair->key.name.len <= cdb_key_prefix.len || + memcmp(pair->key.name.s, cdb_key_prefix.s, cdb_key_prefix.len)) + continue; + + memset(vals, 0, sizeof vals); + + get_val_from_dict(0, 0, &pair->val.val.dict, vals); + get_val_from_dict(2, 1, &pair->val.val.dict, vals); + get_val_from_dict(3, 1, &pair->val.val.dict, vals); + + get_val_from_dict(15, 0, &pair->val.val.dict, vals); + get_val_from_dict(4, 1, &pair->val.val.dict, vals); + get_val_from_dict(5, 1, &pair->val.val.dict, vals); + get_val_from_dict(6, 1, &pair->val.val.dict, vals); + get_val_from_dict(7, 1, &pair->val.val.dict, vals); + get_val_from_dict(8, 1, &pair->val.val.dict, vals); + get_val_from_dict(1, 1, &pair->val.val.dict, vals); + get_val_from_dict(16, 0, &pair->val.val.dict, vals); + get_val_from_dict(17, 0, &pair->val.val.dict, vals); + get_val_from_dict(9, 1, &pair->val.val.dict, vals); + get_val_from_dict(10, 1, &pair->val.val.dict, vals); + get_val_from_dict(21, 1, &pair->val.val.dict, vals); + get_val_from_dict(22, 1, &pair->val.val.dict, vals); + get_val_from_dict(18, 0, &pair->val.val.dict, vals); + get_val_from_dict(19, 0, &pair->val.val.dict, vals); + get_val_from_dict(20, 0, &pair->val.val.dict, vals); + get_val_from_dict(12, 1, &pair->val.val.dict, vals); + get_val_from_dict(13, 1, &pair->val.val.dict, vals); + get_val_from_dict(11, 1, &pair->val.val.dict, vals); + + get_val_from_dict(14, 1, &pair->val.val.dict, vals); + get_val_from_dict(23, 1, &pair->val.val.dict, vals); + + get_val_from_dict(24, 0, &pair->val.val.dict, vals); + + get_val_from_dict(25, 1, &pair->val.val.dict, vals); + get_val_from_dict(26, 1, &pair->val.val.dict, vals); + + if (load_entity(vals) < 0) { + cdb_free_rows(&res); + return -1; + } + } + + cdb_free_rows(&res); + + return 0; +} + +int b2b_entities_restore(void) +{ + if (db_url.s) + return b2b_entities_restore_db(); + else + return b2b_entities_restore_cdb(); +} void b2b_entities_dump(int no_lock) { @@ -629,48 +989,68 @@ void b2b_entities_dump(int no_lock) /* delete only one entity */ void b2b_entity_db_delete(int type, b2b_dlg_t* dlg) { - if(!b2be_db) - return; - - - if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); - return; - } - qvals[0].val.int_val = type; qvals[1].val.str_val = dlg->tag[0]; qvals[2].val.str_val = dlg->tag[1]; qvals[3].val.str_val = dlg->callid; - /* if the state is terminated delete the record */ - if(b2be_dbf.delete(b2be_db, qcols, 0, qvals, 4)< 0) - { - LM_ERR("Sql delete failed\n"); + if (b2be_cdb_url.s) { + if(!b2be_cdb) + return; + + b2b_entity_cdb_delete(type, dlg); + } else { + if(!b2be_db) + return; + + if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return; + } + + /* if the state is terminated delete the record */ + if(b2be_dbf.delete(b2be_db, qcols, 0, qvals, 4)< 0) + { + LM_ERR("Sql delete failed\n"); + } } } /* delete all entities belonging to a tuple */ void b2b_db_delete(str param) { - - if(!b2be_db) - return; + str *cdb_subkey; qvals[12].val.str_val = param; - if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); - return; - } + if (b2be_cdb_url.s) { + if(!b2be_cdb) + return; - if(b2be_dbf.delete(b2be_db, qcols+12, 0, qvals+12, 1)< 0) - { - LM_ERR("Sql delete failed\n"); - } -} + cdb_subkey = get_b2be_map_subkey(¶m); + if (!cdb_subkey) { + LM_ERR("Failed to build map key\n"); + return; + } + if (b2be_cdbf.map_remove(b2be_cdb, NULL, cdb_subkey) < 0) + LM_ERR("Failed to delete from cachedb\n"); + pkg_free(cdb_subkey->s); + } else { + if(!b2be_db) + return; + if(b2be_dbf.use_table(b2be_db, &b2be_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return; + } + + if(b2be_dbf.delete(b2be_db, qcols+12, 0, qvals+12, 1)< 0) + { + LM_ERR("Sql delete failed\n"); + } + } +} diff --git a/modules/b2b_entities/doc/b2b_entities_admin.xml b/modules/b2b_entities/doc/b2b_entities_admin.xml index f70779b4ca0..de6af661017 100644 --- a/modules/b2b_entities/doc/b2b_entities_admin.xml +++ b/modules/b2b_entities/doc/b2b_entities_admin.xml @@ -169,6 +169,42 @@ modparam("b2b_entities", "db_url", "mysql://opensips:opensipsrw@127.0.0.1/opensi +
+ <varname>cachedb_url</varname> (str) + + URL of a NoSQL database to be used. Only Redis is supported + at the moment. + + + Set <varname>cachedb_url</varname> parameter + +... +modparam("b2b_entities", "cachedb_url", "redis://localhost:6379/") +... + + +
+ +
+ <varname>cachedb_key_prefix</varname> (string) + + Prefix to use for every key set in the NoSQL database. + + + + Default value is b2be$. + + + + Set <varname>cachedb_key_prefix</varname> parameter + +... +modparam("b2b_entities", "cachedb_key_prefix", "b2b") +... + + +
+
<varname>update_period</varname> (int) diff --git a/modules/b2b_logic/b2b_logic.c b/modules/b2b_logic/b2b_logic.c index 09e5e1a70c9..0b98b67bc9a 100644 --- a/modules/b2b_logic/b2b_logic.c +++ b/modules/b2b_logic/b2b_logic.c @@ -151,11 +151,16 @@ static struct to_body b2bl_from; static char b2bl_from_buf[B2BL_FROM_BUF_LEN + 1]; str db_url= {0, 0}; +str cdb_url; +str cdb_key_prefix = str_init("b2bl$"); db_con_t *b2bl_db = NULL; db_func_t b2bl_dbf; str b2bl_dbtable= str_init("b2b_logic"); str init_callid_hdr={0, 0}; +cachedb_funcs b2bl_cdbf; +cachedb_con *b2bl_cdb; + str server_address = {0, 0}; int b2bl_db_mode = WRITE_BACK; int unsigned b2bl_th_init_timeout = 60; @@ -240,6 +245,8 @@ static param_export_t params[]= {"use_init_sdp", INT_PARAM, &use_init_sdp }, {"contact_user", INT_PARAM, &contact_user }, {"db_url", STR_PARAM, &db_url.s }, + {"cachedb_url", STR_PARAM, &cdb_url.s }, + {"cachedb_key_prefix", STR_PARAM, &cdb_key_prefix.s }, {"db_table", STR_PARAM, &b2bl_dbtable.s }, {"max_duration", INT_PARAM, &max_duration }, /* @@ -378,8 +385,18 @@ static int mod_init(void) return -1; } - if(b2bl_db_mode) - init_db_url(db_url, 1); + if(b2bl_db_mode) { + if (!cdb_url.s) { + init_db_url(db_url, 1); + if (!db_url.s) + b2bl_db_mode = NO_DB; + } else if (db_url.s) { + LM_ERR("Both 'db_url' and 'cachedb_url' defined\n"); + return -1; + } else { + cdb_url.len = strlen(cdb_url.s); + } + } if(b2bl_db_mode && db_url.s) { @@ -423,9 +440,40 @@ static int mod_init(void) if(b2bl_db) b2bl_dbf.close(b2bl_db); b2bl_db = NULL; + } else if (b2bl_db_mode && cdb_url.s) { + if (cachedb_bind_mod(&cdb_url, &b2bl_cdbf) < 0) { + LM_ERR("cannot bind functions for cachedb_url %.*s\n", + cdb_url.len, cdb_url.s); + return -1; + } + + if (!CACHEDB_CAPABILITY(&b2bl_cdbf, CACHEDB_CAP_MAP)) { + LM_ERR("not enough capabilities for cachedb_url %.*s\n", + cdb_url.len, cdb_url.s); + return -1; + } + + b2bl_cdb = b2bl_cdbf.init(&cdb_url); + if (!b2bl_cdb) { + LM_ERR("connecting to database failed\n"); + return -1; + } + + cdb_key_prefix.len = strlen(cdb_key_prefix.s); + + b2bl_db_init(); + + /* reload data */ + if(b2b_logic_restore() < 0) + { + LM_ERR("Failed to restore data from database\n"); + return -1; + } + + if(b2bl_cdb) + b2bl_cdbf.destroy(b2bl_cdb); + b2bl_cdb = NULL; } - else - b2bl_db_mode = 0; if (b2bl_key_avp_param.s) b2bl_key_avp_param.len = strlen(b2bl_key_avp_param.s); @@ -667,15 +715,25 @@ void b2bl_clean(unsigned int ticks, void* param) static void mod_destroy(void) { - if (b2bl_db_mode==WRITE_BACK && b2bl_dbf.init) { + if (b2bl_db_mode==WRITE_BACK) { + if (b2bl_dbf.init) { - b2bl_db = b2bl_dbf.init(&db_url); - if(!b2bl_db) - { - LM_ERR("connecting to database failed\n"); - } else { - b2b_logic_dump(1); - b2bl_dbf.close(b2bl_db); + b2bl_db = b2bl_dbf.init(&db_url); + if(!b2bl_db) + { + LM_ERR("connecting to database failed\n"); + } else { + b2b_logic_dump(1); + b2bl_dbf.close(b2bl_db); + } + } else if (b2bl_cdbf.init) { + b2bl_cdb = b2bl_cdbf.init(&cdb_url); + if (!b2bl_cdb) { + LM_ERR("connecting to database failed\n"); + } else { + b2b_logic_dump(1); + b2bl_cdbf.destroy(b2bl_cdb); + } } } @@ -687,19 +745,34 @@ static int child_init(int rank) if (b2bl_db_mode==0) return 0; - if (b2bl_dbf.init==0) - { - LM_CRIT("child_init: database not bound\n"); - return -1; - } + if (db_url.s) { + if (b2bl_dbf.init==0) + { + LM_CRIT("child_init: database not bound\n"); + return -1; + } - b2bl_db = b2bl_dbf.init(&db_url); - if(!b2bl_db) - { - LM_ERR("connecting to database failed\n"); - return -1; + b2bl_db = b2bl_dbf.init(&db_url); + if(!b2bl_db) + { + LM_ERR("connecting to database failed\n"); + return -1; + } + LM_DBG("child %d: Database connection opened successfully\n", rank); + } else { + if (!b2bl_cdbf.init) { + LM_ERR("cachedb functions not initialized\n"); + return -1; + } + + b2bl_cdb = b2bl_cdbf.init(&cdb_url); + if (!b2bl_cdb) { + LM_ERR("connecting to database failed\n"); + return -1; + } + + LM_DBG("child %d: cachedb connection opened successfully\n", rank); } - LM_DBG("child %d: Database connection opened successfully\n", rank); return 0; } diff --git a/modules/b2b_logic/b2b_logic.h b/modules/b2b_logic/b2b_logic.h index e8452c5e9f2..a6b293632ed 100644 --- a/modules/b2b_logic/b2b_logic.h +++ b/modules/b2b_logic/b2b_logic.h @@ -29,6 +29,7 @@ #include "../../str.h" #include "../../db/db.h" +#include "../../cachedb/cachedb.h" #include "../../timer.h" #include "../b2b_entities/b2be_load.h" @@ -111,8 +112,12 @@ extern str server_address; extern unsigned int max_duration; extern str init_callid_hdr; extern str db_url; +extern str cdb_url; +extern str cdb_key_prefix; extern db_con_t *b2bl_db ; extern db_func_t b2bl_dbf; +extern cachedb_funcs b2bl_cdbf; +extern cachedb_con *b2bl_cdb; extern str b2bl_dbtable; extern char* b2bl_db_buf; extern int b2bl_db_mode; diff --git a/modules/b2b_logic/b2bl_db.c b/modules/b2b_logic/b2bl_db.c index a26561d1f5d..472834c224a 100644 --- a/modules/b2b_logic/b2bl_db.c +++ b/modules/b2b_logic/b2bl_db.c @@ -28,6 +28,7 @@ #include #include "../../db/db.h" +#include "../../lib/osips_malloc.h" #include "b2b_logic.h" #include "b2bl_db.h" #include "entity_storage.h" @@ -107,36 +108,86 @@ void b2bl_db_init(void) qvals[19].type= DB_STR; } +static inline str *get_b2bl_map_key(str *tuple_key) +{ + static str key = {0,0}; + + /* map key format: [prefix][tuple_key] */ + key.len = cdb_key_prefix.len + tuple_key->len; + key.s = pkg_malloc(key.len); + if (!key.s) { + LM_ERR("no more pkg memory\n"); + return NULL; + } + + memcpy(key.s, cdb_key_prefix.s, cdb_key_prefix.len); + memcpy(key.s + cdb_key_prefix.len, tuple_key->s, tuple_key->len); + + return &key; +} + void b2bl_db_delete(b2bl_tuple_t* tuple) { + str *cdb_key; + if(!tuple || !tuple->key || b2bl_db_mode==NO_DB || (b2bl_db_mode==WRITE_BACK && tuple->db_flag==INSERTDB_FLAG)) return; LM_DBG("Delete key = %.*s\n", tuple->key->len, tuple->key->s); - if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); - return; - } - qvals[0].val.str_val = *tuple->key; - if(b2bl_dbf.delete(b2bl_db, qcols, 0, qvals, 1) < 0) - { - LM_ERR("Failed to delete from database table [%.*s]\n", - tuple->key->len, tuple->key->s); + if (db_url.s) { + if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return; + } + + if(b2bl_dbf.delete(b2bl_db, qcols, 0, qvals, 1) < 0) + { + LM_ERR("Failed to delete from database table [%.*s]\n", + tuple->key->len, tuple->key->s); + } + } else { + cdb_key = get_b2bl_map_key(&qvals[0].val.str_val); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + return; + } + + if (b2bl_cdbf.map_remove(b2bl_cdb, cdb_key, NULL) != 0) + LM_ERR("Failed to delete from cachedb\n"); + + pkg_free(cdb_key->s); } } +void cdb_add_n_pairs(cdb_dict_t *pairs, int idx_start, int idx_end) +{ + int i; + + for (i = idx_start; i <= idx_end; i++) + if (qvals[i].nul || (qvals[i].type == DB_STR && !qvals[i].val.str_val.s)) + cdb_dict_add_null(pairs, qcols[i]->s, qcols[i]->len); + else if (qvals[i].type == DB_STR) + cdb_dict_add_str(pairs, qcols[i]->s, qcols[i]->len, + &qvals[i].val.str_val); + else if (qvals[i].type == DB_INT) + cdb_dict_add_int32(pairs, qcols[i]->s, qcols[i]->len, + qvals[i].val.int_val); +} + void b2b_logic_dump(int no_lock) { b2bl_tuple_t* tuple; - int i; + int i, j; int n_insert_cols; + cdb_dict_t cdb_pairs; + str *cdb_key; - if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) + if(db_url.s && b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) { LM_ERR("sql use table failed\n"); return; @@ -207,29 +258,80 @@ void b2b_logic_dump(int no_lock) qvals[18].val.str_val = tuple->bridge_entities[2]->from_uri; qvals[19].val.str_val = tuple->bridge_entities[2]->key; } - n_insert_cols = DB_COLS_NO; /* insert into database */ if(tuple->db_flag == INSERTDB_FLAG) { - if(b2bl_dbf.insert(b2bl_db, qcols, qvals, n_insert_cols)< 0) - { - LM_ERR("Sql insert failed\n"); - if(!no_lock) - lock_release(&b2bl_htable[i].lock); - return; + if (cdb_url.s) { + cdb_dict_init(&cdb_pairs); + + cdb_key = get_b2bl_map_key(&qvals[0].val.str_val); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + if(!no_lock) + lock_release(&b2bl_htable[i].lock); + return; + } + + cdb_add_n_pairs(&cdb_pairs, 0, n_insert_cols - 1); + + if(!tuple->bridge_entities[2]) { + for(j = n_insert_cols; j < n_insert_cols + 5; j++) + qvals[j].nul = 1; + + cdb_add_n_pairs(&cdb_pairs,n_insert_cols,n_insert_cols+4); + + for(j = n_insert_cols; j < n_insert_cols + 5; j++) + qvals[j].nul = 0; + } + + if (b2bl_cdbf.map_set(b2bl_cdb, cdb_key, NULL, &cdb_pairs)) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + } else { + n_insert_cols = DB_COLS_NO; + + if(b2bl_dbf.insert(b2bl_db, qcols, qvals, n_insert_cols)< 0) + { + LM_ERR("Sql insert failed\n"); + if(!no_lock) + lock_release(&b2bl_htable[i].lock); + return; + } } } else { - /*do update */ - if(b2bl_dbf.update(b2bl_db, qcols, 0, qvals, qcols+n_query_update, - qvals+n_query_update, 1, DB_COLS_NO - n_query_update)< 0) - { - LM_ERR("Sql update failed\n"); - if(!no_lock) - lock_release(&b2bl_htable[i].lock); - return; + if (cdb_url.s) { + cdb_dict_init(&cdb_pairs); + + cdb_key = get_b2bl_map_key(&qvals[0].val.str_val); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + if(!no_lock) + lock_release(&b2bl_htable[i].lock); + return; + } + + cdb_add_n_pairs(&cdb_pairs, n_query_update, n_insert_cols-1); + + if (b2bl_cdbf.map_set(b2bl_cdb, cdb_key, NULL, &cdb_pairs)) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + } else { + /*do update */ + if(b2bl_dbf.update(b2bl_db,qcols,0,qvals,qcols+n_query_update, + qvals+n_query_update, 1, DB_COLS_NO - n_query_update)< 0) + { + LM_ERR("Sql update failed\n"); + if(!no_lock) + lock_release(&b2bl_htable[i].lock); + return; + } } } tuple->db_flag = NO_UPDATEDB_FLAG; @@ -345,19 +447,83 @@ static int b2bl_add_tuple(b2bl_tuple_t* tuple) return -1; } -int b2b_logic_restore(void) +static int load_tuple(int_str_t *vals) { - int i; - int nr_rows; int _time; - db_res_t *result= NULL; - db_row_t *rows = NULL; - db_val_t *row_vals= NULL; b2bl_tuple_t tuple; str b2bl_key; str scenario_id; b2bl_entity_id_t bridge_entities[3]; + memset(&tuple, 0, sizeof(b2bl_tuple_t)); + + b2bl_key = vals[0].s; + + tuple.key = &b2bl_key; + if(vals[1].s.s) + { + scenario_id = vals[1].s; + + if (!str_strcmp(&scenario_id, const_str(B2B_TOP_HIDING_SCENARY))) + tuple.scenario_id = B2B_TOP_HIDING_ID_PTR; + else + tuple.scenario_id = &scenario_id; + } else { + tuple.scenario_id = B2B_INTERNAL_ID_PTR; + } + memset(bridge_entities, 0, 3*sizeof(b2bl_entity_id_t)); + if(vals[2].s.s) + tuple.sdp = vals[2].s; + tuple.state = vals[3].i; + _time = (int)time(NULL); + if (vals[4].i <= _time) + tuple.lifetime = 1; + else + tuple.lifetime=vals[4].i - _time + get_ticks(); + + bridge_entities[0].type = vals[5].i; + bridge_entities[0].scenario_id = vals[6].s; + bridge_entities[0].to_uri = vals[7].s; + bridge_entities[0].from_uri = vals[8].s; + bridge_entities[0].key = vals[9].s; + + bridge_entities[1].type = vals[10].i; + bridge_entities[1].scenario_id = vals[11].s; + bridge_entities[1].to_uri = vals[12].s; + bridge_entities[1].from_uri = vals[13].s; + bridge_entities[1].key = vals[14].s; + + if(vals[19].s.s) + { + bridge_entities[2].type = vals[15].i; + bridge_entities[2].scenario_id = vals[16].s; + bridge_entities[2].to_uri = vals[17].s; + bridge_entities[2].from_uri = vals[18].s; + bridge_entities[2].key = vals[19].s; + } + + tuple.bridge_entities[0] = &bridge_entities[0]; + tuple.bridge_entities[1] = &bridge_entities[1]; + tuple.bridge_entities[2] = &bridge_entities[2]; + + if(b2bl_add_tuple(&tuple) < 0) + { + LM_ERR("Failed to add new tuple\n"); + return -1; + } + + return 0; +} + +int b2b_logic_restore_db(void) +{ + int i; + int nr_rows; + db_res_t *result= NULL; + db_row_t *rows = NULL; + db_val_t *row_vals= NULL; + int_str_t vals[DB_COLS_NO]; + if(b2bl_db == NULL) { LM_DBG("NULL database connection\n"); @@ -404,91 +570,57 @@ int b2b_logic_restore(void) for(i=0; ival.type == CDB_STR) { + vals[idx].s = pair->val.val.st; + } else if (pair->val.type != CDB_NULL) { + LM_ERR("Unexpected type [%d] for field '%.*s'\n", + pair->val.type, key.name.len, key.name.s); + return -1; + } + } else { + if (pair->val.type == CDB_INT32) { + vals[idx].i = pair->val.val.i32; + } else if (pair->val.type != CDB_NULL) { + LM_ERR("Unexpected type [%d] for field '%.*s'\n", + pair->val.type, key.name.len, key.name.s); + return -1; + } + } + + return 0; +} + +int b2b_logic_restore_cdb(void) +{ + cdb_res_t res; + cdb_row_t *row; + struct list_head *_; + cdb_pair_t *pair; + int_str_t vals[DB_COLS_NO]; + + if (b2bl_cdbf.map_get(b2bl_cdb, NULL, &res) != 0) + LM_ERR("Failed to retrieve map keys\n"); + + list_for_each (_, &res.rows) { + row = list_entry(_, cdb_row_t, list); + /* we have a single pair per row, that contains a dict + * with all the fields */ + pair = list_last_entry(&row->dict, cdb_pair_t, list); + + if (pair->key.name.len <= cdb_key_prefix.len || + memcmp(pair->key.name.s, cdb_key_prefix.s, cdb_key_prefix.len)) + continue; + + memset(vals, 0, sizeof vals); + + get_val_from_dict(0, 1, &pair->val.val.dict, vals); + get_val_from_dict(1, 1, &pair->val.val.dict, vals); + get_val_from_dict(2, 1, &pair->val.val.dict, vals); + get_val_from_dict(3, 0, &pair->val.val.dict, vals); + get_val_from_dict(4, 0, &pair->val.val.dict, vals); + + get_val_from_dict(5, 0, &pair->val.val.dict, vals); + get_val_from_dict(6, 1, &pair->val.val.dict, vals); + get_val_from_dict(7, 1, &pair->val.val.dict, vals); + get_val_from_dict(8, 1, &pair->val.val.dict, vals); + get_val_from_dict(9, 1, &pair->val.val.dict, vals); + + get_val_from_dict(10, 0, &pair->val.val.dict, vals); + get_val_from_dict(11, 1, &pair->val.val.dict, vals); + get_val_from_dict(12, 1, &pair->val.val.dict, vals); + get_val_from_dict(13, 1, &pair->val.val.dict, vals); + get_val_from_dict(14, 1, &pair->val.val.dict, vals); + + get_val_from_dict(15, 0, &pair->val.val.dict, vals); + get_val_from_dict(16, 1, &pair->val.val.dict, vals); + get_val_from_dict(17, 1, &pair->val.val.dict, vals); + get_val_from_dict(18, 1, &pair->val.val.dict, vals); + get_val_from_dict(19, 1, &pair->val.val.dict, vals); + + if (load_tuple(vals) < 0) { + cdb_free_rows(&res); + return -1; + } + } + + cdb_free_rows(&res); + + return 0; +} + +int b2b_logic_restore(void) +{ + if (db_url.s) + return b2b_logic_restore_db(); + else + return b2b_logic_restore_cdb(); +} + void b2bl_db_insert(b2bl_tuple_t* tuple) { int ci; - int i; + int i, j; + cdb_dict_t cdb_pairs; + str *cdb_key; qvals[0].val.str_val = *tuple->key; if (tuple->scenario_id == B2B_TOP_HIDING_ID_PTR) { @@ -546,15 +782,43 @@ void b2bl_db_insert(b2bl_tuple_t* tuple) qvals[ci++].val.str_val = tuple->bridge_entities[i]->key; } - if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); - return; - } + if (cdb_url.s) { + cdb_dict_init(&cdb_pairs); - if(b2bl_dbf.insert(b2bl_db, qcols, qvals, ci)< 0) - { - LM_ERR("Sql insert failed\n"); + cdb_key = get_b2bl_map_key(&qvals[0].val.str_val); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + return; + } + + cdb_add_n_pairs(&cdb_pairs, 0, ci - 1); + + if(!tuple->bridge_entities[2]) { + for(j = ci; j < ci + 5; j++) + qvals[j].nul = 1; + + cdb_add_n_pairs(&cdb_pairs, ci, ci + 4); + + for(j = ci; j < ci + 5; j++) + qvals[j].nul = 0; + } + + if (b2bl_cdbf.map_set(b2bl_cdb, cdb_key, NULL, &cdb_pairs) != 0) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + } else { + if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return; + } + + if(b2bl_dbf.insert(b2bl_db, qcols, qvals, ci)< 0) + { + LM_ERR("Sql insert failed\n"); + } } } @@ -562,6 +826,8 @@ void b2bl_db_update(b2bl_tuple_t* tuple) { int ci; int i; + cdb_dict_t cdb_pairs; + str *cdb_key; if(!tuple->key) { LM_ERR("No key found\n"); @@ -587,16 +853,33 @@ void b2bl_db_update(b2bl_tuple_t* tuple) LM_DBG("UPDATE %.*s\n", qvals[ci-1].val.str_val.len, qvals[ci-1].val.str_val.s); } - if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) - { - LM_ERR("sql use table failed\n"); - return; - } + if (cdb_url.s) { + cdb_dict_init(&cdb_pairs); - if(b2bl_dbf.update(b2bl_db, qcols, 0, qvals, qcols+n_query_update, - qvals+n_query_update, 1, ci - n_query_update)< 0) - { - LM_ERR("Sql update failed\n"); + cdb_key = get_b2bl_map_key(&qvals[0].val.str_val); + if (!cdb_key) { + LM_ERR("Failed to build map key\n"); + return; + } + + cdb_add_n_pairs(&cdb_pairs, n_query_update, ci - 1); + + if (b2bl_cdbf.map_set(b2bl_cdb, cdb_key, NULL, &cdb_pairs) != 0) + LM_ERR("cachedb set failed\n"); + + pkg_free(cdb_key->s); + cdb_free_entries(&cdb_pairs, NULL); + } else { + if(b2bl_dbf.use_table(b2bl_db, &b2bl_dbtable)< 0) + { + LM_ERR("sql use table failed\n"); + return; + } + + if(b2bl_dbf.update(b2bl_db, qcols, 0, qvals, qcols+n_query_update, + qvals+n_query_update, 1, ci - n_query_update)< 0) + { + LM_ERR("Sql update failed\n"); + } } } - diff --git a/modules/b2b_logic/doc/b2b_logic_admin.xml b/modules/b2b_logic/doc/b2b_logic_admin.xml index 05e3a8fde62..3cd119d74a8 100644 --- a/modules/b2b_logic/doc/b2b_logic_admin.xml +++ b/modules/b2b_logic/doc/b2b_logic_admin.xml @@ -280,6 +280,40 @@ modparam("b2b_logic", "db_url", "mysql://opensips:opensipsrw@127.0.0.1/opensips"
+
+ <varname>cachedb_url</varname> (str) + + URL of a NoSQL database to be used. Only Redis is supported + at the moment. + + + Set <varname>cachedb_url</varname> parameter + +... +modparam("b2b_logic", "cachedb_url", "redis://localhost:6379/") +... + + +
+
+ <varname>cachedb_key_prefix</varname> (string) + + Prefix to use for every key set in the NoSQL database. + + + + Default value is b2bl$. + + + + Set <varname>cachedb_key_prefix</varname> parameter + +... +modparam("b2b_logic", "cachedb_key_prefix", "b2b") +... + + +
<varname>update_period</varname> (int) diff --git a/modules/cachedb_redis/cachedb_redis.c b/modules/cachedb_redis/cachedb_redis.c index b544a3ddc3a..8d732816a51 100644 --- a/modules/cachedb_redis/cachedb_redis.c +++ b/modules/cachedb_redis/cachedb_redis.c @@ -134,6 +134,9 @@ static int mod_init(void) cde.cdb_func.add = redis_add; cde.cdb_func.sub = redis_sub; cde.cdb_func.raw_query = redis_raw_query; + cde.cdb_func.map_get = redis_map_get; + cde.cdb_func.map_set = redis_map_set; + cde.cdb_func.map_remove = redis_map_remove; cde.cdb_func.capability = 0; diff --git a/modules/cachedb_redis/cachedb_redis_dbase.c b/modules/cachedb_redis/cachedb_redis_dbase.c index 86a7615a920..2105a793269 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.c +++ b/modules/cachedb_redis/cachedb_redis_dbase.c @@ -458,13 +458,13 @@ void redis_destroy(cachedb_con *con) { * * On error, a negative code is returned */ -static int redis_run_command(cachedb_con *connection, redisReply **rpl, - str *key, char *cmd_fmt, ...) +static int _redis_run_command(cachedb_con *connection, redisReply **rpl, str *key, + int argc, const char **argv, const size_t *argvlen, + char *cmd_fmt, va_list ap) { redis_con *con = NULL, *first; cluster_node *node; redisReply *reply = NULL; - va_list ap; int i, last_err = 0; first = ((redis_con *)connection->data)->current; @@ -491,10 +491,12 @@ static int redis_run_command(cachedb_con *connection, redisReply **rpl, } } - va_start(ap, cmd_fmt); - for (i = QUERY_ATTEMPTS; i; i--) { - reply = redisvCommand(node->context, cmd_fmt, ap); + if (argc) + reply = redisCommandArgv(node->context, argc, argv, argvlen); + else + reply = redisvCommand(node->context, cmd_fmt, ap); + if (reply == NULL || reply->type == REDIS_REPLY_ERROR) { LM_INFO("Redis query failed: %p %.*s (%s)\n", reply,reply?(unsigned)reply->len:7,reply?reply->str:"FAILURE", @@ -509,8 +511,6 @@ static int redis_run_command(cachedb_con *connection, redisReply **rpl, } else break; } - va_end(ap); - if (i==0) { LM_ERR("giving up on query to %s:%d\n", con->host, con->port); last_err = -1; @@ -535,6 +535,27 @@ static int redis_run_command(cachedb_con *connection, redisReply **rpl, return last_err; } +static int redis_run_command(cachedb_con *connection, redisReply **rpl, + str *key, char *cmd_fmt, ...) +{ + int rc; + va_list ap; + + va_start(ap, cmd_fmt); + rc = _redis_run_command(connection, rpl, key, 0, NULL, NULL, cmd_fmt, ap); + va_end(ap); + + return rc; +} + +static int redis_run_command_argv(cachedb_con *connection, redisReply **rpl, + str *key, int argc, const char **argv, const size_t *argvlen) +{ + va_list ap; + + return _redis_run_command(connection, rpl, key, argc, argv, argvlen, NULL, ap); +} + int redis_get(cachedb_con *connection,str *attr,str *val) { redisReply *reply; @@ -545,7 +566,8 @@ int redis_get(cachedb_con *connection,str *attr,str *val) return -1; } - rc = redis_run_command(connection, &reply, attr, "GET %b", attr->s, attr->len); + rc = redis_run_command(connection, &reply, attr, "GET %b", + attr->s, (size_t)attr->len); if (rc != 0) goto out_err; @@ -991,3 +1013,298 @@ int redis_raw_query(cachedb_con *connection,str *attr,cdb_raw_entry ***rpl,int e return 1; } + +int redis_map_get(cachedb_con *con, const str *key, cdb_res_t *res) +{ + redisReply *scan_reply = NULL, *get_reply = NULL; + str null_key = {0,0}; + int rc; + int scan_cursor = 0; + str s; + int i,j; + cdb_row_t *cdb_row; + cdb_key_t cdb_key; + cdb_pair_t *hfield, *pair; + + if (!res || !con) { + LM_ERR("null parameter\n"); + return -1; + } + + cdb_res_init(res); + + /* iterate over all keys, return a cdb_pair_t for every key */ + do { + rc = redis_run_command(con, &scan_reply, &null_key, + "SCAN %d COUNT %d TYPE hash", scan_cursor, MAP_GET_SCAN_COUNT); + if (rc != 0) + goto err_free_reply; + + s.len = scan_reply->element[0]->len; + s.s = scan_reply->element[0]->str; + if (str2sint(&s, &scan_cursor) != 0) { + LM_ERR("Cursor returned by SCAN command is not an integer\n"); + goto err_free_reply; + } + + for (i = 0; i < scan_reply->element[1]->elements; i++) { + /* get the all the map fields for this key */ + s.len = scan_reply->element[1]->element[i]->len; + s.s = scan_reply->element[1]->element[i]->str; + + rc = redis_run_command(con, &get_reply, &s, "HGETALL %b", + s.s, (size_t)s.len); + if (rc != 0) + goto err_free_reply; + + if (get_reply->elements == 0) + continue; + + cdb_row = pkg_malloc(sizeof *cdb_row); + if (!cdb_row) { + LM_ERR("no more pkg memory\n"); + goto err_free_reply; + } + INIT_LIST_HEAD(&cdb_row->dict); + + cdb_key.name = s; + cdb_key.is_pk = 1; + pair = cdb_mk_pair(&cdb_key, NULL); + if (!pair) { + LM_ERR("no more pkg memory\n"); + goto err_free_row; + } + pair->val.type = CDB_DICT; + INIT_LIST_HEAD(&pair->val.val.dict); + + for (j = 0; j < get_reply->elements; j+=2) { + /* in the array returned by HGETALL every + * field name is followed by its' value */ + cdb_key.name.len = get_reply->element[j]->len; + cdb_key.name.s = get_reply->element[j]->str; + cdb_key.is_pk = 0; + + hfield = cdb_mk_pair(&cdb_key, NULL); + if (!hfield) { + LM_ERR("no more pkg memory\n"); + goto err_free_pair; + } + + s.len = get_reply->element[j+1]->len; + s.s = get_reply->element[j+1]->str; + + switch (s.s[0]) { + case HASH_FIELD_VAL_NULL: + hfield->val.type = CDB_NULL; + break; + case HASH_FIELD_VAL_STR: + hfield->val.type = CDB_STR; + s.s++; + s.len--; + if (pkg_str_dup(&hfield->val.val.st, &s) < 0) { + LM_ERR("no more pkg memory\n"); + pkg_free(hfield); + goto err_free_pair; + } + break; + case HASH_FIELD_VAL_INT32: + hfield->val.type = CDB_INT32; + s.s++; + s.len--; + if (str2sint(&s, (int *)&hfield->val.val.i32) < 0) { + LM_ERR("Expected hash field value to be an integer\n"); + pkg_free(hfield); + goto err_free_pair; + } + break; + default: + LM_DBG("Unexpected type [%c] for hash field, skipping\n", s.s[0]); + pkg_free(hfield); + continue; + } + + cdb_dict_add(hfield, &pair->val.val.dict); + } + + if (!list_empty(&pair->val.val.dict)) { + cdb_dict_add(pair, &cdb_row->dict); + res->count++; + list_add_tail(&cdb_row->list, &res->rows); + } + + freeReplyObject(get_reply); + get_reply = NULL; + } + + freeReplyObject(scan_reply); + scan_reply = NULL; + } while (scan_cursor); + + return 0; + +err_free_pair: + pkg_free(pair); +err_free_row: + cdb_free_entries(&cdb_row->dict, osips_pkg_free); + pkg_free(cdb_row); +err_free_reply: + if (get_reply) + freeReplyObject(get_reply); + if (scan_reply) + freeReplyObject(scan_reply); + return rc; +} + +int redis_map_set(cachedb_con *con, const str *key, const str *subkey, + const cdb_dict_t *pairs) +{ + int argc = 0; + const char *argv[MAP_SET_MAX_FIELDS+2]; + size_t argvlen[MAP_SET_MAX_FIELDS+2]; + cdb_pair_t *pair; + struct list_head *_; + static str valbuf; + int offset = 0; + char *int_buf; + int len; + int rc; + redisReply *reply; + + if (!con || !key) { + LM_ERR("null parameter\n"); + return -1; + } + + argv[0] = "HSET"; + argvlen[0] = sizeof("HSET")-1; + argv[1] = key->s; + argvlen[1] = key->len; + argc = 2; + + list_for_each (_, pairs) { + pair = list_entry(_, cdb_pair_t, list); + + argv[argc] = pair->key.name.s; + argvlen[argc] = pair->key.name.len; + argc++; + + if (argc > MAP_SET_MAX_FIELDS) { + LM_ERR("Trying to set too many fields(%d)\n", argc); + return -1; + } + + switch (pair->val.type) { + case CDB_NULL: + len = 0; + break; + case CDB_INT32: + int_buf = sint2str((long)pair->val.val.i32, &len); + break; + case CDB_STR: + len = pair->val.val.st.len; + break; + default: + LM_DBG("Unexpected type [%d] for hash field\n", pair->val.type); + return -1; + } + + if (pkg_str_extend(&valbuf, offset+len+1) < 0) + return -1; + + switch (pair->val.type) { + case CDB_NULL: + valbuf.s[offset] = HASH_FIELD_VAL_NULL; + break; + case CDB_INT32: + valbuf.s[offset] = HASH_FIELD_VAL_INT32; + memcpy(valbuf.s+offset+1, int_buf, len); + break; + case CDB_STR: + valbuf.s[offset] = HASH_FIELD_VAL_STR; + memcpy(valbuf.s+offset+1, pair->val.val.st.s, len); + break; + default: + LM_DBG("Unexpected type [%d] for hash field\n", pair->val.type); + return -1; + } + + argv[argc] = valbuf.s+offset; + argvlen[argc] = len+1; + argc++; + + offset += len+1; + } + + rc = redis_run_command_argv(con, &reply, (str *)key, + argc, argv, argvlen); + if (rc != 0) + return rc; + + freeReplyObject(reply); + reply = NULL; + + if (subkey) { + rc = redis_run_command(con, &reply, (str*)subkey, "SADD %b %b", + subkey->s, (size_t)subkey->len, key->s, (size_t)key->len); + if (rc != 0) + return rc; + + freeReplyObject(reply); + } + + return 0; +} + +int redis_map_remove(cachedb_con *con, const str *key, const str *subkey) +{ + int rc; + redisReply *reply; + int i; + str s; + + LM_DBG("DDD MAP REMOVE key=%.*s subkey=%.*s\n", key ? key->len : 0, + key ? key->s : NULL, subkey ? subkey->len : 0, subkey ? subkey->s : NULL); + + if (!con || (!key && !subkey)) { + LM_ERR("null parameter\n"); + return -1; + } + + if (!subkey) + return redis_remove(con, (str*)key); + + if (key) { + /* key based delete, but also remove the member "key" + * from the Set at "subkey" */ + rc = redis_run_command(con, &reply, (str*)subkey, "SREM %b %b", + subkey->s, (size_t)subkey->len, key->s, (size_t)key->len); + if (rc < 0) + return rc; + + freeReplyObject(reply); + + return redis_remove(con, (str*)key); + } else { + /* subkey based delete - delete all the keys that are members + * of the Set at "subkey" */ + rc = redis_run_command(con, &reply, (str*)subkey, "SMEMBERS %b", + subkey->s, (size_t)subkey->len); + if (rc != 0) + return rc; + + for (i = 0; i < reply->elements; i++) { + s.s = reply->element[i]->str; + s.len = reply->element[i]->len; + + rc = redis_remove(con, &s); + if (rc < 0) { + freeReplyObject(reply); + return -1; + } + } + + freeReplyObject(reply); + + return redis_remove(con, (str*)subkey); + } +} diff --git a/modules/cachedb_redis/cachedb_redis_dbase.h b/modules/cachedb_redis/cachedb_redis_dbase.h index dcadf4ba704..c6a54335c57 100644 --- a/modules/cachedb_redis/cachedb_redis_dbase.h +++ b/modules/cachedb_redis/cachedb_redis_dbase.h @@ -51,6 +51,14 @@ typedef struct cluster_nodes { #define CACHEDB_REDIS_DEFAULT_TIMEOUT 5000 +#define MAP_GET_SCAN_COUNT 1000 + +#define HASH_FIELD_VAL_NULL '0' +#define HASH_FIELD_VAL_STR '1' +#define HASH_FIELD_VAL_INT32 '2' + +#define MAP_SET_MAX_FIELDS 128 + extern int redis_query_tout; extern int redis_connnection_tout; extern int shutdown_on_error; @@ -96,6 +104,10 @@ int redis_add(cachedb_con *con,str *attr,int val,int expires,int *new_val); int redis_sub(cachedb_con *con,str *attr,int val,int expires,int *new_val); int redis_get_counter(cachedb_con *connection,str *attr,int *val); int redis_raw_query(cachedb_con *connection,str *attr,cdb_raw_entry ***reply,int expected_kv_no,int *reply_no); +int redis_map_get(cachedb_con *con, const str *key, cdb_res_t *res); +int redis_map_set(cachedb_con *con, const str *key, const str *subkey, + const cdb_dict_t *pairs); +int redis_map_remove(cachedb_con *con, const str *key, const str *subkey); #endif /* CACHEDBREDIS_DBASE_H */