Skip to content

Commit

Permalink
cachedb: Simplify the column-oriented API
Browse files Browse the repository at this point in the history
Since some NoSQL backends support upsert-enabled "set" and "unset" operations
on multiple key/value pairs using a single query, we can merge the "set_cols()"
and "unset_cols()" functions into a single one: "update()". Current
backends known to support this are MongoDB and Cassandra.

Any backends which do not support the above will have to implement the update()
endpoint using two queries instead of one.
  • Loading branch information
liviuchircu committed Feb 28, 2018
1 parent b45c1bd commit ce54ac8
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 80 deletions.
54 changes: 40 additions & 14 deletions cachedb/cachedb.h
Expand Up @@ -66,14 +66,11 @@ typedef int (cachedb_remove_f)(cachedb_con *con,str *attr);
typedef int (cachedb_add_f)(cachedb_con *con,str *attr,int val,int expires,int *new_val);
typedef int (cachedb_sub_f)(cachedb_con *con,str *attr,int val,int expires,int *new_val);

typedef int (cachedb_get_rows_f)(cachedb_con *con, const cdb_filter_t *filter,
cdb_res_t *res);
typedef int (cachedb_set_cols_f)(cachedb_con *con,
const cdb_filter_t *row_filter,
const cdb_dict_t *pairs);
typedef int (cachedb_unset_cols_f)(cachedb_con *con,
const cdb_filter_t *row_filter,
const cdb_key_t *keys, int nk);
typedef int (cachedb_query_f)(cachedb_con *con, const cdb_filter_t *filter,
cdb_res_t *res);
typedef int (cachedb_update_f)(cachedb_con *con,
const cdb_filter_t *row_filter,
const cdb_dict_t *pairs);

/* bi-dimensional array will be returned */
typedef int (cachedb_raw_f)(cachedb_con *con,str *query,cdb_raw_entry ***reply,int expected_key_no,int *reply_no);
Expand All @@ -98,17 +95,46 @@ typedef struct cachedb_funcs_t {
cachedb_raw_f *raw_query;
cachedb_truncate_f *truncate;

/* API for column-oriented NoSQL databases (Cassandra, Mongo) */
cachedb_get_rows_f *get_rows;
cachedb_set_cols_f *set_cols;
cachedb_unset_cols_f *unset_cols;
/* TODO: can we adapt these ^ to also work with Redis? */

cachedb_query_trans_f *db_query_trans;
cachedb_free_trans_f *db_free_trans;
cachedb_insert_trans_f *db_insert_trans;
cachedb_delete_trans_f *db_delete_trans;
cachedb_update_trans_f *db_update_trans;

/*
* Endpoints specific to "column-oriented" NoSQL DBs (Cassandra, Mongo)
* Support for these endpoints can be verified via CACHEDB_CAP_COL_ORIENTED
*/

/**
* query() - SQL-like select function.
* @con: The cacheDB connection to use.
* @filter: NULL, one or more AND'ed filters for the query.
* @res: Will contain zero or more results.
*/
cachedb_query_f *query;

/**
* update() - SQL-like update function with "set", "unset" and TTL support.
* @con: The cacheDB connection to use.
* @row_filter: NULL, one or more AND'ed filters for the update.
* @pairs: A list of columns (and values) to set or unset.
*
* In addition to behaving like the SQL equivalent, the update() function
* shall _always_ perform an "UPSERT" operation wherever possible,
* i.e. it will insert any missing rows or columns (keys) without failing.
*
* Regarding the TTL support -- the input allows for maximal flexibility,
* allowing calling code to set a TTL per either each key/value or
* key.subkey/value pairs. From here onwards, it is up to the cacheDB
* implementation to decide how to use this information. For example, some
* backends may only support row-level TTLs and set a TTL equal to the
* max TTL between all input and existing DB TTL (e.g. MongoDB), others
* may actually fully support dictionary-level TTLs (e.g. Cassandra).
*/
cachedb_update_f *update;
/* TODO: can we also implement these ^ with Redis, or can we adapt them? */

int capability;
} cachedb_funcs;

Expand Down
24 changes: 9 additions & 15 deletions cachedb/cachedb_cap.h
Expand Up @@ -39,14 +39,11 @@ typedef enum {
CACHEDB_CAP_SUB = 1<<4,
CACHEDB_CAP_BINARY_VALUE = 1<<5,
CACHEDB_CAP_RAW = 1<<6,
CACHEDB_CAP_TRUNCATE = 1<<7,

CACHEDB_CAP_GET_ROWS = 1<<7,
CACHEDB_CAP_SET_COLS = 1<<8,
CACHEDB_CAP_UNSET_COLS = 1<<9,
CACHEDB_CAP_COL_ORIENTED =
(CACHEDB_CAP_GET_ROWS|CACHEDB_CAP_SET_COLS|CACHEDB_CAP_UNSET_COLS),

CACHEDB_CAP_TRUNCATE = 1<<10,
CACHEDB_CAP_QUERY = 1<<8,
CACHEDB_CAP_UPDATE = 1<<9,
CACHEDB_CAP_COL_ORIENTED = (CACHEDB_CAP_QUERY|CACHEDB_CAP_UPDATE),
} cachedb_cap;

#define CACHEDB_CAPABILITY(cdbf,cpv) (((cdbf)->capability & (cpv)) == (cpv))
Expand Down Expand Up @@ -83,15 +80,12 @@ static inline int check_cachedb_api(cachedb_engine *cde)
if (cde->cdb_func.truncate)
cde->cdb_func.capability |= CACHEDB_CAP_TRUNCATE;

if (cde->cdb_func.get_rows)
cde->cdb_func.capability |= CACHEDB_CAP_GET_ROWS;
if (cde->cdb_func.set_cols)
cde->cdb_func.capability |= CACHEDB_CAP_SET_COLS;
if (cde->cdb_func.unset_cols)
cde->cdb_func.capability |= CACHEDB_CAP_UNSET_COLS;
if (cde->cdb_func.query)
cde->cdb_func.capability |= CACHEDB_CAP_QUERY;
if (cde->cdb_func.update)
cde->cdb_func.capability |= CACHEDB_CAP_UPDATE;

if (cde->cdb_func.get_rows && cde->cdb_func.set_cols
&& cde->cdb_func.unset_cols)
if (cde->cdb_func.query && cde->cdb_func.update)
cde->cdb_func.capability |= CACHEDB_CAP_COL_ORIENTED;

return 0;
Expand Down
8 changes: 6 additions & 2 deletions cachedb/cachedb_types.h
Expand Up @@ -90,9 +90,13 @@ typedef struct {

typedef struct {
cdb_key_t key;
str subkey; /* may be used during "SET" to refer to a sub-dictionary key */
/* may be used during an update() to refer to a sub-dictionary key */
str subkey;
cdb_val_t val;
int ttl; /* seconds; may be used during "SET"; 0 means "no ttl set" */
/* seconds; may be set during an update(); 0 means "no ttl set" */
int ttl;
/* set to 1 during an update() in order to unset the given key */
char unset;

struct list_head list;
} cdb_kv_t;
Expand Down
62 changes: 31 additions & 31 deletions cachedb/test/test_backends.c
Expand Up @@ -137,7 +137,7 @@ static inline cdb_dict_t *nth_dict(const cdb_res_t *res, int nth)
return NULL;
}

static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)
static int test_query_filters(cachedb_funcs *api, cachedb_con *con)
{
cdb_key_t key;
str sa = str_init("A"), sb = str_init("B"), sc = str_init("C"),
Expand All @@ -150,16 +150,16 @@ static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)
memset(&pair, 0, sizeof pair);

init_str(&key.name, "tgr_1");
ok(api->set(con, &key.name, &sa, 0) == 0, "test_get_rows: set A");
ok(api->set(con, &key.name, &sa, 0) == 0, "test_query: set A");

init_str(&key.name, "tgr_2");
ok(api->set(con, &key.name, &sb, 0) == 0, "test_get_rows: set B");
ok(api->set(con, &key.name, &sb, 0) == 0, "test_query: set B");

init_str(&key.name, "tgr_3");
ok(api->set(con, &key.name, &sc, 0) == 0, "test_get_rows: set C");
ok(api->set(con, &key.name, &sc, 0) == 0, "test_query: set C");

init_str(&key.name, "tgr_4");
ok(api->set(con, &key.name, &sd, 0) == 0, "test_get_rows: set D");
ok(api->set(con, &key.name, &sd, 0) == 0, "test_query: set D");

memset(&key, 0, sizeof key);
init_str(&key.name, "opensips");
Expand All @@ -171,8 +171,8 @@ static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)
/* single filter tests */

filter = cdb_append_filter(NULL, &key, CDB_OP_LE, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 4 items");
ok(res.count == 4, "test_get_rows: have 4 items");
ok(api->query(con, filter, &res) == 0, "test_query: get 4 items");
ok(res.count == 4, "test_query: have 4 items");
init_str(&pair.key.name, "opensips");
pair.val.val.st = sa; ok(res_has_kv(&res, &pair), "has A");
pair.val.val.st = sb; ok(res_has_kv(&res, &pair), "has B");
Expand All @@ -184,8 +184,8 @@ static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)
cdb_free_filters(filter);

filter = cdb_append_filter(NULL, &key, CDB_OP_LT, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 3 items");
ok(res.count == 3, "test_get_rows: have 3 items");
ok(api->query(con, filter, &res) == 0, "test_query: get 3 items");
ok(res.count == 3, "test_query: have 3 items");
pair.val.val.st = sa; ok(res_has_kv(&res, &pair), "has A");
pair.val.val.st = sb; ok(res_has_kv(&res, &pair), "has B");
pair.val.val.st = sc; ok(res_has_kv(&res, &pair), "has C");
Expand All @@ -195,16 +195,16 @@ static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)

init_str(&isv.s, "A");
filter = cdb_append_filter(NULL, &key, CDB_OP_LT, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 0 items");
ok(res.count == 0, "test_get_rows: have 0 items");
ok(api->query(con, filter, &res) == 0, "test_query: get 0 items");
ok(res.count == 0, "test_query: have 0 items");
pair.val.val.st = sa; ok(!res_has_kv(&res, &pair), "!has A");
pair.val.val.st = sc; ok(!res_has_kv(&res, &pair), "!has C");
cdb_free_rows(&res);
cdb_free_filters(filter);

filter = cdb_append_filter(NULL, &key, CDB_OP_LE, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 1 item");
ok(res.count == 1, "test_get_rows: have 1 item");
ok(api->query(con, filter, &res) == 0, "test_query: get 1 item");
ok(res.count == 1, "test_query: have 1 item");
pair.val.val.st = sa; ok(res_has_kv(&res, &pair), "has A");
pair.val.val.st = sb; ok(!res_has_kv(&res, &pair), "!has B");
cdb_free_rows(&res);
Expand All @@ -214,35 +214,35 @@ static int test_get_rows_filters(cachedb_funcs *api, cachedb_con *con)

init_str(&isv.s, "D");
filter = cdb_append_filter(NULL, &key, CDB_OP_LT, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 3 items");
ok(res.count == 3, "test_get_rows: have 3 items");
ok(api->query(con, filter, &res) == 0, "test_query: get 3 items");
ok(res.count == 3, "test_query: have 3 items");
init_str(&isv.s, "A");
filter = cdb_append_filter(filter, &key, CDB_OP_GE, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 3 items");
ok(res.count == 3, "test_get_rows: have 3 items");
ok(api->query(con, filter, &res) == 0, "test_query: get 3 items");
ok(res.count == 3, "test_query: have 3 items");
init_str(&isv.s, "B");
filter = cdb_append_filter(filter, &key, CDB_OP_GT, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 1 item");
ok(res.count == 1, "test_get_rows: have 1 item");
ok(api->query(con, filter, &res) == 0, "test_query: get 1 item");
ok(res.count == 1, "test_query: have 1 item");
init_str(&isv.s, "C");
filter = cdb_append_filter(filter, &key, CDB_OP_EQ, &isv);
ok(api->get_rows(con, filter, &res) == 0, "test_get_rows: get 1 item");
ok(res.count == 1, "test_get_rows: have 1 item");
ok(api->query(con, filter, &res) == 0, "test_query: get 1 item");
ok(res.count == 1, "test_query: have 1 item");
cdb_free_rows(&res);
cdb_free_filters(filter);

return 1;
}

static int test_get_rows(cachedb_funcs *api, cachedb_con *con,
static int test_query(cachedb_funcs *api, cachedb_con *con,
const cdb_dict_t *pairs)
{
cdb_res_t res;

if (!ok(api->get_rows(con, NULL, &res) == 0, "get_rows: NULL filter"))
if (!ok(api->query(con, NULL, &res) == 0, "query: NULL filter"))
return 0;

ok(res.count == 2, "get_rows: 2 results");
ok(res.count == 2, "query: 2 results");
dbg_cdb_dict("pairs: ", pairs);
dbg_cdb_dict("res 1: ", nth_dict(&res, 1));
dbg_cdb_dict("res 2: ", nth_dict(&res, 2));
Expand All @@ -254,8 +254,8 @@ static int test_get_rows(cachedb_funcs *api, cachedb_con *con,
return 1;
}

static int test_set_cols(cachedb_funcs *api, cachedb_con *con,
cdb_dict_t *out_pairs)
static int test_update(cachedb_funcs *api, cachedb_con *con,
cdb_dict_t *out_pairs)
{
cdb_filter_t *filter;
int_str_t isv;
Expand Down Expand Up @@ -310,14 +310,14 @@ static int test_set_cols(cachedb_funcs *api, cachedb_con *con,
cdb_dict_add(pair, &dict_pair->val.val.dict);
cdb_dict_add(dict_pair, out_pairs);

ok(api->set_cols(con, filter, out_pairs) == 0, "test_set_cols #1");
ok(api->update(con, filter, out_pairs) == 0, "test_update #1");

cdb_free_filters(filter);
cdb_pkey_init(&key, "aor");
init_str(&isv.s, "bar@opensips.org");
filter = cdb_append_filter(NULL, &key, CDB_OP_EQ, &isv);

ok(api->set_cols(con, filter, out_pairs) == 0, "test_set_cols #2");
ok(api->update(con, filter, out_pairs) == 0, "test_update #2");

cdb_free_filters(filter);

Expand All @@ -331,14 +331,14 @@ static int test_column_ops(cachedb_funcs *api, cachedb_con *con)
if (CACHEDB_CAPABILITY(api, CACHEDB_CAP_TRUNCATE))
ok(api->truncate(con) == 0, "truncate");

if (!ok(test_get_rows_filters(api, con), "test get_rows filters"))
if (!ok(test_query_filters(api, con), "test query filters"))
return 0;

if (CACHEDB_CAPABILITY(api, CACHEDB_CAP_TRUNCATE))
ok(api->truncate(con) == 0, "truncate");

if (!ok(test_set_cols(api, con, &cols), "test set_cols")
|| !ok(test_get_rows(api, con, &cols), "test get_rows"))
if (!ok(test_update(api, con, &cols), "test update")
|| !ok(test_query(api, con, &cols), "test query"))
return 0;

cdb_free_entries(&cols);
Expand Down
5 changes: 2 additions & 3 deletions modules/cachedb_mongodb/cachedb_mongodb.c
Expand Up @@ -117,9 +117,8 @@ static int mod_init(void)
cde.cdb_func.remove = mongo_con_remove;
cde.cdb_func.add = mongo_con_add;
cde.cdb_func.sub = mongo_con_sub;
cde.cdb_func.get_rows = mongo_con_get_rows;
cde.cdb_func.set_cols = mongo_con_set_cols;
cde.cdb_func.unset_cols = mongo_con_unset_cols;
cde.cdb_func.query = mongo_con_query;
cde.cdb_func.update = mongo_con_update;
cde.cdb_func.raw_query = mongo_con_raw_query;
cde.cdb_func.truncate = mongo_truncate;
cde.cdb_func.db_query_trans = mongo_db_query_trans;
Expand Down
12 changes: 3 additions & 9 deletions modules/cachedb_mongodb/cachedb_mongodb_dbase.c
Expand Up @@ -1814,7 +1814,7 @@ int mongo_cdb_filter_to_bson(const cdb_filter_t *filter, bson_t *cur)
return 0;
}

int mongo_con_get_rows(cachedb_con *con, const cdb_filter_t *filter,
int mongo_con_query(cachedb_con *con, const cdb_filter_t *filter,
cdb_res_t *res)
{
bson_t child, bson_filter = BSON_INITIALIZER;
Expand Down Expand Up @@ -1985,8 +1985,8 @@ int mongo_cdb_dict_to_bson(const cdb_dict_t *dict, bson_t *out_doc)
return 0;
}

int mongo_con_set_cols(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_dict_t *pairs)
int mongo_con_update(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_dict_t *pairs)
{
struct list_head *_;
bson_t filter = BSON_INITIALIZER, update = BSON_INITIALIZER;
Expand Down Expand Up @@ -2090,9 +2090,3 @@ int mongo_con_set_cols(cachedb_con *con, const cdb_filter_t *row_filter,
bson_destroy(&update);
return ret;
}

int mongo_con_unset_cols(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_key_t *cols, int nc)
{
return 0;
}
10 changes: 4 additions & 6 deletions modules/cachedb_mongodb/cachedb_mongodb_dbase.h
Expand Up @@ -93,10 +93,8 @@ int mongo_db_update_trans(cachedb_con *con, const str *table,
const db_val_t *_uv, const int _n, const int _un);
int mongo_truncate(cachedb_con *con);

int mongo_con_get_rows(cachedb_con *con, const cdb_filter_t *row_filter,
cdb_res_t *res);
int mongo_con_set_cols(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_dict_t *pairs);
int mongo_con_unset_cols(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_key_t *cols, int nc);
int mongo_con_query(cachedb_con *con, const cdb_filter_t *row_filter,
cdb_res_t *res);
int mongo_con_update(cachedb_con *con, const cdb_filter_t *row_filter,
const cdb_dict_t *pairs);
#endif /* CACHEDBMONGO_DBASE_H */

0 comments on commit ce54ac8

Please sign in to comment.