Permalink
Browse files

Make ActiveSupport::Cache::CouchbaseStore threadsafe.

Change-Id: Ic75f317d58186008fa80ce814264058c53f7d3bf
Reviewed-on: http://review.couchbase.org/25294
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information...
1 parent db4e26d commit 6ba3cdd5e7db0ca7af47d3f12573556caca3a2dc @mje113 mje113 committed with avsej Mar 21, 2013
Showing with 46 additions and 7 deletions.
  1. +23 −7 lib/active_support/cache/couchbase_store.rb
  2. +23 −0 test/test_couchbase_rails_cache_store.rb
@@ -19,6 +19,7 @@
require 'securerandom'
require 'active_support/core_ext/array/extract_options'
require 'active_support/cache'
+require 'monitor'
module ActiveSupport
module Cache
@@ -56,6 +57,7 @@ def initialize(*args)
options[:key_prefix] ||= options.delete(:namespace)
args.push(options)
@data = ::Couchbase::Bucket.new(*args)
+ @lock = Monitor.new
end
# Fetches data from the cache, using the given key.
@@ -182,7 +184,9 @@ def read_multi(*names)
options[:format] = :plain
end
instrument(:read_multi, names, options) do
- @data.get(names, options)
+ @lock.synchronize do
+ @data.get(names, options)
+ end
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -243,7 +247,9 @@ def increment(name, amount = 1, options = nil)
options[:create] = true
instrument(:increment, name, options) do |payload|
payload[:amount] = amount if payload
- @data.incr(name, amount, options)
+ @lock.synchronize do
+ @data.incr(name, amount, options)
+ end
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -275,7 +281,9 @@ def decrement(name, amount = 1, options = nil)
options[:create] = true
instrument(:decrement, name, options) do |payload|
payload[:amount] = amount if payload
- @data.decr(name, amount, options)
+ @lock.synchronize do
+ @data.decr(name, amount, options)
+ end
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -289,14 +297,18 @@ def decrement(name, amount = 1, options = nil)
#
# @return [Hash]
def stats(*arg)
- @data.stats(*arg)
+ @lock.synchronize do
+ @data.stats(*arg)
+ end
end
protected
# Read an entry from the cache.
def read_entry(key, options) # :nodoc:
- @data.get(key, options)
+ @lock.synchronize do
+ @data.get(key, options)
+ end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -313,7 +325,9 @@ def write_entry(key, value, options) # :nodoc:
if ttl = options.delete(:expires_in)
options[:ttl] ||= ttl
end
- @data.send(method, key, value, options)
+ @lock.synchronize do
+ @data.send(method, key, value, options)
+ end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -322,7 +336,9 @@ def write_entry(key, value, options) # :nodoc:
# Delete an entry from the cache.
def delete_entry(key, options) # :nodoc:
- @data.delete(key, options)
+ @lock.synchronize do
+ @data.delete(key, options)
+ end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -280,6 +280,29 @@ def test_it_notifies_on_decrement
assert_equal({:key => uniq_id, :amount => 1, :create => true}, decrement.payload)
end
+ # Inspiration: https://github.com/mperham/dalli/blob/master/test/test_dalli.rb#L416
+ def test_it_is_threadsafe
+ workers = []
+
+ # Have a bunch of threads perform a bunch of operations at the same time.
+ # Verify the result of each operation to ensure the request and response
+ # are not intermingled between threads.
+ 10.times do
+ workers << Thread.new do
+ 100.times do
+ store.write('a', 9)
+ store.write('b', 11)
+ assert_equal 9, store.read('a')
+ assert_equal({ 'a' => 9, 'b' => 11 }, store.read_multi('a', 'b'))
+ assert_equal 11, store.read('b')
+ assert_equal %w(a b), store.read_multi('a', 'b', 'c').keys.sort
+ end
+ end
+ end
+
+ workers.each { |w| w.join }
+ end
+
private
def collect_notifications

0 comments on commit 6ba3cdd

Please sign in to comment.