Skip to content

Commit

Permalink
Allow to retry Couchbase::Bucket#cas on collitions
Browse files Browse the repository at this point in the history
Extend Couchbase::Bucket#cas to take a `:retry` Fixnum option that
specifies the maximum number of times the method should retry the
entire get/update/set operation when a Couchbase::Error::KeyExists
error is encountered due to a concurrent update from another writer
between its #get and #set calls.

Change-Id: Ibf7ebac9c63e460e05957004458e9e02ae803f89
Reviewed-on: http://review.couchbase.org/30336
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information
scottwb authored and avsej committed Nov 15, 2013
1 parent d449c96 commit 4c9b676
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 4 deletions.
35 changes: 31 additions & 4 deletions lib/couchbase/bucket.rb
Expand Up @@ -35,12 +35,23 @@ class Bucket
#
# @see http://couchbase.com/docs/memcached-api/memcached-api-protocol-text_cas.html
#
# Setting the +:retry+ option to a positive number will cause this method
# to rescue the {Couchbase::Error::KeyExists} error that happens when
# an update collision is detected, and automatically get a fresh copy
# of the value and retry the block. This will repeat as long as there
# continues to be conflicts, up to the maximum number of retries specified.
# For asynchronous mode, this means the block will be yielded once for
# the initial {Bucket#get}, once for the final {Bucket#set} (successful
# or last failure), and zero or more additional {Bucket#get} retries
# in between, up to the maximum allowed by the +:retry+ option.
#
# @param [String, Symbol] key
#
# @param [Hash] options the options for "swap" part
# @option options [Fixnum] :ttl (self.default_ttl) the time to live of this key
# @option options [Symbol] :format (self.default_format) format of the value
# @option options [Fixnum] :flags (self.default_flags) flags for this key
# @option options [Fixnum] :retry (0) maximum number of times to autmatically retry upon update collision
#
# @yieldparam [Object, Result] value old value in synchronous mode and
# +Result+ object in asynchronous mode.
Expand Down Expand Up @@ -80,16 +91,32 @@ class Bucket
#
# @return [Fixnum] the CAS of new value
def cas(key, options = {})
retries_remaining = options.delete(:retry) || 0
if async?
block = Proc.new
get(key) do |ret|
val = block.call(ret) # get new value from caller
set(ret.key, val, options.merge(:cas => ret.cas, :flags => ret.flags), &block)
set(ret.key, val, options.merge(:cas => ret.cas, :flags => ret.flags)) do |set_ret|
if set_ret.error.is_a?(Couchbase::Error::KeyExists) && (retries_remaining > 0)
cas(key, options.merge(:retry => retries_remaining - 1), &block)
else
block.call(set_ret)
end
end
end
else
val, flags, ver = get(key, :extended => true)
val = yield(val) # get new value from caller
set(key, val, options.merge(:cas => ver, :flags => flags))
begin
val, flags, ver = get(key, :extended => true)
val = yield(val) # get new value from caller
set(key, val, options.merge(:cas => ver, :flags => flags))
rescue Couchbase::Error::KeyExists
if retries_remaining > 0
retries_remaining -= 1
retry
else
raise
end
end
end
end
alias :compare_and_swap :cas
Expand Down
111 changes: 111 additions & 0 deletions test/test_cas.rb
Expand Up @@ -57,6 +57,52 @@ def test_compare_and_swap_collision
end
end

def test_compare_and_swap_retry
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
connection.set(uniq_id, {"bar" => 1})
calls = 0
connection.cas(uniq_id, :retry => 1) do |val|
calls += 1
if calls == 1
# Simulate collision with a separate writer. This will
# change the CAS value to be different than what #cas just loaded.
# Only do this the first time this block is executed.
connection.set(uniq_id, {"bar" => 2})
end

# Complete the modification we desire, which should fail when set.
val["baz"] = 3
val
end
assert_equal 2, calls
val = connection.get(uniq_id)
expected = {"bar" => 2, "baz" => 3}
assert_equal expected, val
end

def test_compare_and_swap_too_many_retries
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
connection.set(uniq_id, {"bar" => 0})
calls = 0
assert_raises(Couchbase::Error::KeyExists) do
connection.cas(uniq_id, :retry => 10) do |val|
calls += 1

# Simulate collision with a separate writer. This will
# change the CAS value to be different than what #cas just loaded.
# Do it every time so we just keep retrying and failing.
connection.set(uniq_id, {"bar" => calls})

# Complete the modification we desire, which should fail when set.
val["baz"] = 3
val
end
end
assert_equal 11, calls
end

def test_compare_and_swap_async
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
Expand Down Expand Up @@ -113,6 +159,71 @@ def test_compare_and_swap_async_collision
assert_equal 2, calls
end

def test_compare_and_swap_async_retry
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
connection.set(uniq_id, {"bar" => 1})
calls = 0
connection.run do |conn|
conn.cas(uniq_id, :retry => 1) do |ret|
calls += 1
case ret.operation
when :get
new_val = ret.value

if calls == 1
# Simulate collision with a separate writer. This will
# change the CAS value to be different than what #cas just loaded.
# Only do this the first time this block is executed.
connection.set(uniq_id, {"bar" => 2})
end

# Complete the modification we desire, which should fail when set.
new_val["baz"] = 3
new_val
when :set
assert ret.success?
else
flunk "Unexpected operation: #{ret.operation.inspect}"
end
end
end
assert_equal 3, calls
val = connection.get(uniq_id)
expected = {"bar" => 2, "baz" => 3}
assert_equal expected, val
end

def test_compare_and_swap_async_too_many_retries
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
connection.set(uniq_id, {"bar" => 0})
calls = 0
connection.run do |conn|
conn.cas(uniq_id, :retry => 10) do |ret|
calls += 1
case ret.operation
when :get
new_val = ret.value

# Simulate collision with a separate writer. This will
# change the CAS value to be different than what #cas just loaded.
# Do it every time so we just keep retrying and failing.
connection.set(uniq_id, {"bar" => calls})

# Complete the modification we desire, which should fail when set.
new_val["baz"] = 3
new_val
when :set
assert ret.error.is_a? Couchbase::Error::KeyExists
else
flunk "Unexpected operation: #{ret.operation.inspect}"
end
end
end
assert_equal 12, calls
end

def test_flags_replication
connection = Couchbase.new(:hostname => @mock.host, :port => @mock.port,
:default_format => :document)
Expand Down

0 comments on commit 4c9b676

Please sign in to comment.