Skip to content

Commit

Permalink
Merge branch 'feature/multikeys' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
Ali Yakamercan committed Aug 21, 2012
2 parents 2fa5bcc + 2fac12f commit 9a3e5f0
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 42 deletions.
29 changes: 20 additions & 9 deletions c_src/callbacks.h
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ struct libcouchbase_callback {
libcouchbase_error_t error; libcouchbase_error_t error;
size_t size; size_t size;
void *data; void *data;
void *key;
size_t nkey;
int flag; int flag;
int cas; int cas;
}; };


struct libcouchbase_callback_m {
int currKey;
struct libcouchbase_callback** ret;
};
//libcouchbase callbacks //libcouchbase callbacks
static void error_callback(libcouchbase_t instance, static void error_callback(libcouchbase_t instance,
libcouchbase_error_t error, libcouchbase_error_t error,
Expand All @@ -35,16 +41,21 @@ static void get_callback(libcouchbase_t instance,
libcouchbase_cas_t cas) libcouchbase_cas_t cas)
{ {
(void)key; (void)nkey; (void)flags; (void)cas; (void)key; (void)nkey; (void)flags; (void)cas;
struct libcouchbase_callback *cb; struct libcouchbase_callback_m *cbm;
cb = (struct libcouchbase_callback *)cookie; cbm = (struct libcouchbase_callback_m *)cookie;
cb->error = error; cbm->ret[cbm->currKey] = malloc(sizeof(struct libcouchbase_callback));
cb->flag = flags == 0 ? 1 : flags; cbm->ret[cbm->currKey]->key = malloc(nkey);
cb->cas = cas; memcpy(cbm->ret[cbm->currKey]->key, key, nkey);
cbm->ret[cbm->currKey]->nkey = nkey;
cbm->ret[cbm->currKey]->error = error;
cbm->ret[cbm->currKey]->flag = flags == 0 ? 1 : flags;
cbm->ret[cbm->currKey]->cas = cas;
if (error == LIBCOUCHBASE_SUCCESS) { if (error == LIBCOUCHBASE_SUCCESS) {
cb->data = malloc(nbytes); cbm->ret[cbm->currKey]->data = malloc(nbytes);
memcpy(cb->data, bytes, nbytes); memcpy(cbm->ret[cbm->currKey]->data, bytes, nbytes);
cb->size = nbytes; cbm->ret[cbm->currKey]->size = nbytes;
} }
cbm->currKey += 1;
} }


static void arithmetic_callback(libcouchbase_t instance, static void arithmetic_callback(libcouchbase_t instance,
Expand All @@ -64,7 +75,7 @@ static void arithmetic_callback(libcouchbase_t instance,
if (error == LIBCOUCHBASE_SUCCESS) { if (error == LIBCOUCHBASE_SUCCESS) {
cb->data = malloc(20*sizeof(char)); cb->data = malloc(20*sizeof(char));
memset(cb->data, 0, 20); memset(cb->data, 0, 20);
sprintf(cb->data, "%lu", value); sprintf(cb->data, "%llu", value);
cb->size = strlen(cb->data); cb->size = strlen(cb->data);
} }
} }
Expand Down
89 changes: 64 additions & 25 deletions c_src/cberl_nif.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ NIF(cberl_nif_store)


assert_badarg(enif_get_uint(env, argv[4], &flags), env); assert_badarg(enif_get_uint(env, argv[4], &flags), env);
assert_badarg(enif_get_int(env, argv[5], &exp), env); assert_badarg(enif_get_int(env, argv[5], &exp), env);
assert_badarg(enif_get_uint64(env, argv[6], &cas), env); assert_badarg(enif_get_uint64(env, argv[6], (unsigned long*)&cas), env);


enif_mutex_lock(handle->mutex); enif_mutex_lock(handle->mutex);
ret = libcouchbase_store(handle->instance, ret = libcouchbase_store(handle->instance,
Expand Down Expand Up @@ -163,18 +163,26 @@ NIF(cberl_nif_store)
NIF(cberl_nif_mget) NIF(cberl_nif_mget)
{ {
handle_t * handle; handle_t * handle;
struct libcouchbase_callback cb; struct libcouchbase_callback_m cb;
unsigned int numkeys; unsigned int numkeys;
void** keys;
size_t* nkeys;
int exp; int exp;


libcouchbase_error_t ret; libcouchbase_error_t ret;

ERL_NIF_TERM* results;
ERL_NIF_TERM* currKey;
ERL_NIF_TERM returnValue;
ERL_NIF_TERM tail;
ErlNifBinary *databin;


assert_badarg(enif_get_resource(env, argv[0], cberl_handle, (void **) &handle), env); assert_badarg(enif_get_resource(env, argv[0], cberl_handle, (void **) &handle), env);
assert_badarg(enif_get_list_length(env, argv[1], &numkeys), env); assert_badarg(enif_get_list_length(env, argv[1], &numkeys), env);
void** keys = malloc(sizeof(char*) * numkeys); keys = malloc(sizeof(char*) * numkeys);
size_t* nkeys = malloc(sizeof(size_t) * numkeys); nkeys = malloc(sizeof(size_t) * numkeys);
ERL_NIF_TERM* currKey; currKey = malloc(sizeof(ERL_NIF_TERM));
ERL_NIF_TERM tail = argv[1]; tail = argv[1];
unsigned int keylen; unsigned int keylen;
int i = 0; int i = 0;
while(0 != enif_get_list_cell(env, tail, currKey, &tail)) { while(0 != enif_get_list_cell(env, tail, currKey, &tail)) {
Expand All @@ -184,31 +192,62 @@ NIF(cberl_nif_mget)
assert_badarg(enif_get_string(env, *currKey, keys[i], nkeys[i], ERL_NIF_LATIN1), env); assert_badarg(enif_get_string(env, *currKey, keys[i], nkeys[i], ERL_NIF_LATIN1), env);
i++; i++;
} }

assert_badarg(enif_get_int(env, argv[2], &exp), env); assert_badarg(enif_get_int(env, argv[2], &exp), env);


cb.currKey = 0;
cb.ret = malloc(sizeof(struct libcouchbase_callback*) * numkeys);

enif_mutex_lock(handle->mutex); enif_mutex_lock(handle->mutex);
ret = libcouchbase_mget(handle->instance, ret = libcouchbase_mget(handle->instance,
&cb, &cb,
numkeys, numkeys,
(const void*const*)keys, (const void*const*)keys,
nkeys, nkeys,
exp == 0 ? NULL : (libcouchbase_time_t*)&exp); exp == 0 ? NULL : (libcouchbase_time_t*)&exp);
free(nkeys);
for(i =0; i< numkeys; i++) {
free(keys[i]);
}
free(keys);


if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
if(cb.error != LIBCOUCHBASE_SUCCESS) {
return return_lcb_error(env, cb.error);
results = malloc(sizeof(ERL_NIF_TERM) * numkeys);
i = 0;
for(; i < numkeys; i++) {
if (cb.ret[i]->error == LIBCOUCHBASE_SUCCESS) {
databin = malloc(sizeof(ErlNifBinary));
enif_alloc_binary(cb.ret[i]->size, databin);
memcpy(databin->data, cb.ret[i]->data, cb.ret[i]->size);
results[i] = enif_make_tuple4(env,
enif_make_int(env, cb.ret[i]->cas),
enif_make_int(env, cb.ret[i]->flag),
enif_make_string_len(env, cb.ret[i]->key, cb.ret[i]->nkey - 1, ERL_NIF_LATIN1),
enif_make_binary(env, databin));
free(cb.ret[i]->data);
free(databin);
} else {
results[i] = enif_make_tuple2(env,
enif_make_string_len(env, cb.ret[i]->key, cb.ret[i]->nkey - 1, ERL_NIF_LATIN1),
return_lcb_error(env, cb.ret[i]->error));
}
free(cb.ret[i]->key);
free(cb.ret[i]);
free(keys[i]);
} }
return enif_make_tuple2(env, a_ok, return_value(env, &cb)); returnValue = enif_make_list_from_array(env, results, numkeys);

free(results);
free(cb.ret);
free(currKey);
free(keys);
free(nkeys);

return enif_make_tuple2(env, a_ok, returnValue);
} }


NIF(cberl_nif_getl) { NIF(cberl_nif_getl) {
Expand Down Expand Up @@ -236,7 +275,7 @@ NIF(cberl_nif_getl) {
free(key); free(key);
if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
Expand Down Expand Up @@ -275,7 +314,7 @@ NIF(cberl_nif_unlock)


if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
Expand Down Expand Up @@ -318,7 +357,7 @@ NIF(cberl_nif_mtouch)
free(key); free(key);
if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
Expand All @@ -345,10 +384,10 @@ NIF(cberl_nif_arithmetic) {
nkey += 1; nkey += 1;
key = (char *) malloc(nkey); key = (char *) malloc(nkey);
assert_badarg(enif_get_string(env, argv[1], key, nkey, ERL_NIF_LATIN1), env); assert_badarg(enif_get_string(env, argv[1], key, nkey, ERL_NIF_LATIN1), env);
assert_badarg(enif_get_int64(env, argv[2], &delta), env); assert_badarg(enif_get_int64(env, argv[2], (long*)&delta), env);
assert_badarg(enif_get_uint64(env, argv[3], &exp), env); assert_badarg(enif_get_uint64(env, argv[3], (unsigned long *)&exp), env);
assert_badarg(enif_get_int(env, argv[4], &create), env); assert_badarg(enif_get_int(env, argv[4], &create), env);
assert_badarg(enif_get_uint64(env, argv[5], &initial), env); assert_badarg(enif_get_uint64(env, argv[5], (unsigned long *)&initial), env);
enif_mutex_lock(handle->mutex); enif_mutex_lock(handle->mutex);
ret = libcouchbase_arithmetic(handle->instance, ret = libcouchbase_arithmetic(handle->instance,
&cb, &cb,
Expand All @@ -361,7 +400,7 @@ NIF(cberl_nif_arithmetic) {
free(key); free(key);
if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
Expand Down Expand Up @@ -396,7 +435,7 @@ NIF(cberl_nif_remove) {
free(key); free(key);
if (ret != LIBCOUCHBASE_SUCCESS) { if (ret != LIBCOUCHBASE_SUCCESS) {
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
return_lcb_error(env, ret); return return_lcb_error(env, ret);
} }
libcouchbase_wait(handle->instance); libcouchbase_wait(handle->instance);
enif_mutex_unlock(handle->mutex); enif_mutex_unlock(handle->mutex);
Expand Down
26 changes: 18 additions & 8 deletions src/cberl.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
-export([append/4, prepend/4, mtouch/3]). -export([append/4, prepend/4, mtouch/3]).
-export([incr/3, incr/4, incr/5, decr/3, decr/4, decr/5]). -export([incr/3, incr/4, incr/5, decr/3, decr/4, decr/5]).
%retrieval operations %retrieval operations
-export([get_and_touch/3, get_and_lock/3, get/2, unlock/3]). -export([get_and_touch/3, get_and_lock/3, mget/2, get/2, unlock/3]).




%% @equiv new("localhost:8091", "", "", "") %% @equiv new("localhost:8091", "", "", "")
Expand Down Expand Up @@ -125,11 +125,14 @@ decr(Instance, Key, OffSet, Default, Exp) ->


-spec get_and_touch(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}. -spec get_and_touch(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}.
get_and_touch(Instance, Key, Exp) -> get_and_touch(Instance, Key, Exp) ->
mget(Instance, Key, Exp). mget(Instance, [Key], Exp).


-spec get(instance(), key()) -> {ok, integer(), value()} | {error, _}. -spec get(instance(), key()) -> {ok, integer(), value()} | {error, _}.
get(Instance, Key) -> get(Instance, Key) ->
mget(Instance, Key, 0). hd(mget(Instance, [Key], 0)).

mget(Instance, Keys) ->
mget(Instance, Keys, 0).


-spec get_and_lock(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}. -spec get_and_lock(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}.
get_and_lock(Instance, Key, Exp) -> get_and_lock(Instance, Key, Exp) ->
Expand Down Expand Up @@ -169,12 +172,19 @@ store(#instance{handle = Handle, transcoder = Transcoder}, Op, Key, Value, Trans
%% Exp When the object should expire %% Exp When the object should expire
%% pass a negative number for infinity %% pass a negative number for infinity
-spec mget(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}. -spec mget(instance(), key(), integer()) -> {ok, integer(), value()} | {error, _}.
mget(#instance{handle = Handle, transcoder = Transcoder}, Key, Exp) -> mget(#instance{handle = Handle, transcoder = Transcoder}, Keys, Exp) ->
case cberl_nif:mget(Handle, [Key], Exp) of case cberl_nif:mget(Handle, Keys, Exp) of
{error, Error} -> {error, Error}; {error, Error} -> {error, Error};
{ok, {Cas, Flag, Value}} -> {ok, Results} ->
DecodedValue = Transcoder:decode_value(Flag, Value), lists:map(fun(Result) ->
{ok, Cas, DecodedValue} case Result of
{Cas, Flag, Key, Value} ->
DecodedValue = Transcoder:decode_value(Flag, Value),
{Key, Cas, DecodedValue};
{_Key, {error, _Error}} ->
Result
end
end, Results)
end. end.


%% @doc Get an item with a lock that has a timeout %% @doc Get an item with a lock that has a timeout
Expand Down

0 comments on commit 9a3e5f0

Please sign in to comment.