Skip to content

Commit

Permalink
Implement get with lock operation
Browse files Browse the repository at this point in the history
Change-Id: I9f45616f78af0752f5fcfecf9b2a21652897ade1
Reviewed-on: http://review.couchbase.org/16717
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Reviewed-by: Chris Anderson <jchris@couchbase.com>
  • Loading branch information
avsej committed Jun 4, 2012
1 parent 8d5153a commit 11d035f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 9 deletions.
68 changes: 61 additions & 7 deletions ext/couchbase_ext/couchbase_ext.c
Expand Up @@ -115,6 +115,7 @@ struct key_traits_st
int quiet;
int mgat;
int is_array;
int lock;
VALUE force_format;
};

Expand Down Expand Up @@ -146,6 +147,7 @@ static ID sym_add,
sym_hostname,
sym_increment,
sym_initial,
sym_lock,
sym_marshal,
sym_method,
sym_password,
Expand Down Expand Up @@ -617,6 +619,7 @@ cb_args_scan_keys(long argc, VALUE argv, struct bucket_st *bucket, struct key_tr
traits->keys_ary = rb_ary_new();
traits->quiet = bucket->quiet;
traits->mgat = 0;
traits->lock = 0;

if (argc > 0) {
/* keys with custom options */
Expand All @@ -638,6 +641,15 @@ cb_args_scan_keys(long argc, VALUE argv, struct bucket_st *bucket, struct key_tr
traits->explicit_ttl = 1;
exp = NUM2ULONG(ttl);
}
/* boolean or number of seconds to lock */
ttl = rb_hash_aref(opts, sym_lock);
if (ttl != Qnil) {
traits->lock = RTEST(ttl);
exp = 0; /* use server default expiration */
if (TYPE(ttl) == T_FIXNUM) {
exp = NUM2ULONG(ttl);
}
}
}
nn = RARRAY_LEN(argv);
if (nn == 1 && TYPE(RARRAY_PTR(argv)[0]) == T_ARRAY) {
Expand Down Expand Up @@ -2399,6 +2411,8 @@ cb_bucket_decr(int argc, VALUE *argv, VALUE self)
*
* @since 1.0.0
*
* @see http://couchbase.com/docs/couchbase-manual-2.0/couchbase-architecture-apis-memcached-protocol-additions.html#couchbase-architecture-apis-memcached-protocol-additions-getl
*
* @overload get(*keys, options = {})
* @param keys [String, Symbol, Array] One or several keys to fetch
* @param options [Hash] Options for operation.
Expand All @@ -2415,6 +2429,15 @@ cb_bucket_decr(int argc, VALUE *argv, VALUE self)
* @option options [Symbol] :format (nil) Explicitly choose the decoder
* for this key (+:plain+, +:document+, +:marshal+). See
* {Bucket#default_format}.
* @option options [Fixnum, Boolean] :lock Lock the keys for time span.
* If this parameter is +true+ the key(s) will be locked for default
* timeout. Also you can use number to setup your own timeout in
* seconds. If it will be lower that zero or exceed the maximum, the
* server will use default value. You can determine actual default and
* maximum values calling {Bucket#stats} without arguments and
* inspecting keys "ep_getl_default_timeout" and "ep_getl_max_timeout"
* correspondingly. See overloaded hash syntax to specify custom timeout
* per each key.
*
* @yieldparam ret [Result] the result of operation in asynchronous mode
* (valid attributes: +error+, +operation+, +key+, +value+, +flags+,
Expand Down Expand Up @@ -2465,6 +2488,19 @@ cb_bucket_decr(int argc, VALUE *argv, VALUE self)
* end
* end
*
* @example Get and lock key using default timeout
* c.get("foo", :lock => true)
*
* @example Determine lock timeout parameters
* c.stats.values_at("ep_getl_default_timeout", "ep_getl_max_timeout")
* #=> [{"127.0.0.1:11210"=>"15"}, {"127.0.0.1:11210"=>"30"}]
*
* @example Get and lock key using custom timeout
* c.get("foo", :lock => 3)
*
* @example Get and lock multiple keys using custom timeout
* c.get("foo", "bar", :lock => 3)
*
* @overload get(keys, options = {})
* When the method receive hash map, it will behave like it receive list
* of keys (+keys.keys+), but also touch each key setting expiry time to
Expand All @@ -2484,15 +2520,19 @@ cb_bucket_decr(int argc, VALUE *argv, VALUE self)
* @example Extended get and touch multiple keys
* c.get({"foo" => 10, "bar" => 20}, :extended => true)
* #=> {"foo" => [val1, flags1, cas1], "bar" => [val2, flags2, cas2]}
*
* @example Get and lock multiple keys for chosen period in seconds
* c.get("foo" => 10, "bar" => 20, :lock => true)
* #=> {"foo" => val1, "bar" => val2}
*/
static VALUE
cb_bucket_get(int argc, VALUE *argv, VALUE self)
{
struct bucket_st *bucket = DATA_PTR(self);
struct context_st *ctx;
VALUE args, rv, proc, exc, keys;
size_t nn, ii, ll;
libcouchbase_error_t err;
size_t nn, ii, ll = 0;
libcouchbase_error_t err = LIBCOUCHBASE_SUCCESS;
struct key_traits_st *traits;
int extended, mgat, is_array;
long seqno;
Expand Down Expand Up @@ -2524,11 +2564,24 @@ cb_bucket_get(int argc, VALUE *argv, VALUE self)
ctx->exception = Qnil;
seqno = bucket->seqno;
bucket->seqno += nn;
err = libcouchbase_mget(bucket->handle, (const void *)ctx,
traits->nkeys, (const void * const *)traits->keys,
traits->lens, (traits->explicit_ttl) ? traits->ttls : NULL);
for (ii = 0, ll = 0; ii < traits->nkeys; ++ii) {
ll += traits->lens[ii];
if (traits->lock) {
for (ii = 0; ii < traits->nkeys; ++ii) {
err = libcouchbase_getl(bucket->handle, (const void *)ctx,
(const void *)traits->keys[ii], traits->lens[ii],
traits->ttls + ii);
if (err != LIBCOUCHBASE_SUCCESS) {
break;
}
}
} else {
err = libcouchbase_mget(bucket->handle, (const void *)ctx,
traits->nkeys, (const void * const *)traits->keys,
traits->lens, (traits->explicit_ttl) ? traits->ttls : NULL);
}
if (err == LIBCOUCHBASE_SUCCESS) {
for (ii = 0; ii < traits->nkeys; ++ii) {
ll += traits->lens[ii];
}
}
free(traits->keys);
free(traits->lens);
Expand Down Expand Up @@ -4473,6 +4526,7 @@ Init_couchbase_ext(void)
sym_hostname = ID2SYM(rb_intern("hostname"));
sym_increment = ID2SYM(rb_intern("increment"));
sym_initial = ID2SYM(rb_intern("initial"));
sym_lock = ID2SYM(rb_intern("lock"));
sym_marshal = ID2SYM(rb_intern("marshal"));
sym_method = ID2SYM(rb_intern("method"));
sym_password = ID2SYM(rb_intern("password"));
Expand Down
2 changes: 1 addition & 1 deletion ext/couchbase_ext/extconf.rb
Expand Up @@ -108,7 +108,7 @@ def define(macro, value = nil)
if RbConfig::CONFIG['target_os'] =~ /mingw32/
have_library("vbucket", "vbucket_config_create", "libvbucket/vbucket.h") or abort "You should install libvbucket >= 1.8.0.2"
end
have_library("couchbase", "libcouchbase_make_couch_request", "libcouchbase/couchbase.h") or abort "You should install libcouchbase >= 1.1.0dp"
have_library("couchbase", "libcouchbase_getl", "libcouchbase/couchbase.h") or abort "You should install libcouchbase >= 1.1.0dp3"
have_header("mach/mach_time.h")
have_header("stdint.h") or abort "Failed to locate stdint.h"
have_header("sys/time.h")
Expand Down
6 changes: 5 additions & 1 deletion test/setup.rb
Expand Up @@ -50,7 +50,11 @@ def start
:username => name,
:bucket => name,
:password => password)
connection.flush
begin
connection.flush
rescue Couchbase::Error::NotSupported
# on recent server flush is disabled
end
end
end
def stop; end
Expand Down
50 changes: 50 additions & 0 deletions test/test_get.rb
Expand Up @@ -349,4 +349,54 @@ def test_consistent_behaviour_for_arrays
end
end

def test_get_with_lock_trivial
if @mock.real?
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port)
connection.set(uniq_id, "foo")
assert_equal "foo", connection.get(uniq_id, :lock => 1)
assert_raises Couchbase::Error::KeyExists do
connection.set(uniq_id, "bar")
end
sleep(2)
connection.set(uniq_id, "bar")
else
skip("implement GETL in CouchbaseMock.jar")
end
end

def test_multi_get_with_lock
if @mock.real?
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port)
connection.set(uniq_id(1), "foo1")
connection.set(uniq_id(2), "foo2")
assert_equal ["foo1", "foo2"], connection.get([uniq_id(1), uniq_id(2)], :lock => 1)
assert_raises Couchbase::Error::KeyExists do
connection.set(uniq_id(1), "bar")
end
assert_raises Couchbase::Error::KeyExists do
connection.set(uniq_id(2), "bar")
end
else
skip("implement GETL in CouchbaseMock.jar")
end
end

def test_multi_get_with_custom_locks
if @mock.real?
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port)
connection.set(uniq_id(1), "foo1")
connection.set(uniq_id(2), "foo2")
expected = {uniq_id(1) => "foo1", uniq_id(2) => "foo2"}
assert_equal expected, connection.get({uniq_id(1) => 1, uniq_id(2) => 2}, :lock => true)
assert_raises Couchbase::Error::KeyExists do
connection.set(uniq_id(1), "foo")
end
assert_raises Couchbase::Error::KeyExists do
connection.set(uniq_id(2), "foo")
end
else
skip("implement GETL in CouchbaseMock.jar")
end
end

end

0 comments on commit 11d035f

Please sign in to comment.