Skip to content

Commit

Permalink
cachedb_mongodb: Rewrite "update" command to use bulk operations
Browse files Browse the repository at this point in the history
  • Loading branch information
liviuchircu committed Mar 3, 2017
1 parent 7d61dfa commit adc1346
Showing 1 changed file with 53 additions and 9 deletions.
62 changes: 53 additions & 9 deletions modules/cachedb_mongodb/cachedb_mongodb_dbase.c
Expand Up @@ -721,10 +721,13 @@ int mongo_raw_count(cachedb_con *connection,bson_t *raw_query,cdb_raw_entry ***r
int mongo_raw_update(cachedb_con *con, bson_t *raw_query, bson_iter_t *ns)
{
mongoc_collection_t *col;
mongoc_bulk_operation_t *bulk = NULL;
bson_iter_t iter, uiter, sub_iter;
bson_error_t error;
bson_t query, update;
bson_t query, update, reply;
const bson_value_t *v;
int ret, count = 0;
char *str;

if (bson_iter_type(ns) != BSON_TYPE_UTF8) {
LM_ERR("collection name must be a string (%d)!\n", bson_iter_type(ns));
Expand All @@ -734,8 +737,31 @@ int mongo_raw_update(cachedb_con *con, bson_t *raw_query, bson_iter_t *ns)
col = mongoc_client_get_collection(MONGO_CLIENT(con), MONGO_DB_STR(con),
bson_iter_utf8(ns, NULL));

if (!bson_iter_init_find(&iter, raw_query, "updates") ||
!BSON_ITER_HOLDS_ARRAY(&iter)) {
LM_ERR("missing or non-array 'updates' field in update command!\n");
return -1;
}

if (bson_iter_recurse(&iter, &sub_iter)) {
while (bson_iter_next(&sub_iter)) {
count++;
}
}

if (count == 0) {
LM_DBG("nothing to update!\n");
goto out;
}

bulk = mongoc_collection_create_bulk_operation(col, false, NULL);
if (!bulk) {
LM_ERR("failed to create bulk op!\n");
goto out_err;
}

if (bson_iter_init_find(&iter, raw_query, "updates") &&
BSON_ITER_HOLDS_ARRAY(&iter) && bson_iter_recurse(&iter, &uiter)) {
bson_iter_recurse(&iter, &uiter)) {
while (bson_iter_next(&uiter)) {
bson_iter_recurse(&uiter, &sub_iter);
if (!bson_iter_find(&sub_iter, "q")) {
Expand All @@ -753,18 +779,36 @@ int mongo_raw_update(cachedb_con *con, bson_t *raw_query, bson_iter_t *ns)
v = bson_iter_value(&sub_iter);
bson_init_static(&update, v->value.v_doc.data, v->value.v_doc.data_len);

if (!mongoc_collection_update(col, MONGOC_UPDATE_UPSERT,
&query, &update, NULL, &error)) {
LM_ERR("failed to run update. error: %d.%d: %s\n",
error.domain, error.code, error.message);
return -1;
}
mongoc_bulk_operation_update(bulk, &query, &update, true);
}
}

mongoc_collection_destroy(col);
ret = mongoc_bulk_operation_execute(bulk, &reply, &error);
if (!ret) {
LM_ERR("failed bulk update\nerror: %d.%d: %s\n",
error.domain, error.code, error.message);
goto out_err;
}

if (is_printable(L_DBG)) {
str = bson_as_json(&reply, NULL);
LM_DBG("reply received: %s\n", str);
bson_free(str);
}

out:
if (bulk) {
mongoc_bulk_operation_destroy(bulk);
}
mongoc_collection_destroy(col);
return 0;

out_err:
if (bulk) {
mongoc_bulk_operation_destroy(bulk);
}
mongoc_collection_destroy(col);
return -1;
}

int mongo_raw_insert(cachedb_con *con, bson_t *raw_query, bson_iter_t *ns)
Expand Down

0 comments on commit adc1346

Please sign in to comment.