Browse files

RCBC-46 Adding Couchbase::ConnectionPool

Primarily to facilitate an option for ActiveSupport::Cache::CouchbaseStore
to use a connection pool in place of monitor locks for thread safety.
Thread safe locks are still the default and have been moved into their
own module.

Change-Id: I0e844b39f9b26bb5845b82a9050381d54f030dbb
Reviewed-on: http://review.couchbase.org/26028
Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
Tested-by: Sergey Avseyev <sergey.avseyev@gmail.com>
  • Loading branch information...
1 parent ff21394 commit 5454381ca3aa7ee8a02e41e0d8ff87e8408504a2 @mje113 mje113 committed with avsej May 2, 2013
View
6 RELEASE_NOTES.markdown
@@ -3,6 +3,12 @@
This document is a list of user visible feature changes and important
bugfixes. Do not forget to update this doc in every important patch.
+## UNRELEASED
+
+* [major] RCBC-46 implement Couchbase::ConnectionPool to allow
+ applications (and ActiveSupport::Cache::CouchbaseStore) use it in
+ multi-threaded environment
+
## 1.2.3 (2013-04-02)
* [major] Make ActiveSupport::Cache::CouchbaseStore threadsafe
View
1 couchbase.gemspec
@@ -37,6 +37,7 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'yaji', '~> 0.3.2'
s.add_runtime_dependency 'multi_json', '~> 1.0'
+ s.add_runtime_dependency 'connection_pool', '~> 0.9.2'
s.add_development_dependency 'rake'
s.add_development_dependency 'minitest'
View
83 lib/active_support/cache/couchbase_store.rb
@@ -55,9 +55,15 @@ def initialize(*args)
options[:default_ttl] ||= options.delete(:expires_in)
options[:default_format] ||= :marshal
options[:key_prefix] ||= options.delete(:namespace)
+ options[:connection_pool] ||= options.delete(:connection_pool)
args.push(options)
- @data = ::Couchbase::Bucket.new(*args)
- @lock = Monitor.new
+
+ if options[:connection_pool]
+ @data = ::Couchbase::ConnectionPool.new(options[:connection_pool], *args)
+ else
+ @data = ::Couchbase::Bucket.new(*args)
+ @data.extend(Threadsafe)
+ end
end
# Fetches data from the cache, using the given key.
@@ -184,9 +190,7 @@ def read_multi(*names)
options[:format] = :plain
end
instrument(:read_multi, names, options) do
- @lock.synchronize do
- @data.get(names, options)
- end
+ @data.get(names, options)
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -247,9 +251,7 @@ def increment(name, amount = 1, options = nil)
options[:create] = true
instrument(:increment, name, options) do |payload|
payload[:amount] = amount if payload
- @lock.synchronize do
- @data.incr(name, amount, options)
- end
+ @data.incr(name, amount, options)
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -281,9 +283,7 @@ def decrement(name, amount = 1, options = nil)
options[:create] = true
instrument(:decrement, name, options) do |payload|
payload[:amount] = amount if payload
- @lock.synchronize do
- @data.decr(name, amount, options)
- end
+ @data.decr(name, amount, options)
end
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
@@ -297,18 +297,14 @@ def decrement(name, amount = 1, options = nil)
#
# @return [Hash]
def stats(*arg)
- @lock.synchronize do
- @data.stats(*arg)
- end
+ @data.stats(*arg)
end
protected
# Read an entry from the cache.
def read_entry(key, options) # :nodoc:
- @lock.synchronize do
- @data.get(key, options)
- end
+ @data.get(key, options)
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -325,9 +321,7 @@ def write_entry(key, value, options) # :nodoc:
if ttl = options.delete(:expires_in)
options[:ttl] ||= ttl
end
- @lock.synchronize do
- @data.send(method, key, value, options)
- end
+ @data.send(method, key, value, options)
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -336,9 +330,7 @@ def write_entry(key, value, options) # :nodoc:
# Delete an entry from the cache.
def delete_entry(key, options) # :nodoc:
- @lock.synchronize do
- @data.delete(key, options)
- end
+ @data.delete(key, options)
rescue ::Couchbase::Error::Base => e
logger.error("#{e.class}: #{e.message}") if logger
raise if @raise_errors
@@ -367,6 +359,51 @@ def expanded_key(key) # :nodoc:
key.respond_to?(:to_param) ? key.to_param : key
end
+ module Threadsafe
+ def self.extended(obj)
+ obj.init_threadsafe
+ end
+
+ def get(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def send(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def delete(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def incr(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def decr(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def stats(*)
+ @lock.synchronize do
+ super
+ end
+ end
+
+ def init_threadsafe
+ @lock = Monitor.new
+ end
+ end
end
end
end
View
3 lib/couchbase.rb
@@ -29,9 +29,12 @@
require 'couchbase/result'
require 'couchbase/cluster'
+
# Couchbase ruby client
module Couchbase
+ autoload(:ConnectionPool, 'couchbase/connection_pool')
+
class << self
# The method +connect+ initializes new Bucket instance with all arguments passed.
#
View
55 lib/couchbase/connection_pool.rb
@@ -0,0 +1,55 @@
+# Author:: Couchbase <info@couchbase.com>
+# Copyright:: 2013 Couchbase, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require 'connection_pool'
+
+module Couchbase
+ class ConnectionPool
+
+ def initialize(pool_size = 5, *args)
+ @pool = ::ConnectionPool.new(:size => pool_size) { ::Couchbase::Bucket.new(*args) }
+ end
+
+ def with
+ yield @pool.checkout
+ ensure
+ @pool.checkin
+ end
+
+ def respond_to?(id, *args)
+ super || @pool.with { |c| c.respond_to?(id, *args) }
+ end
+
+ def method_missing(name, *args, &block)
+ define_proxy_method(name)
+ send(name, *args, &block)
+ end
+
+ protected
+
+ def define_proxy_method(name)
+ self.class.class_eval <<-RUBY
+ def #{name}(*args, &block)
+ @pool.with do |connection|
+ connection.send(#{name.inspect}, *args, &block)
+ end
+ end
+ RUBY
+ end
+
+ end
+end
View
73 test/test_couchbase_connection_pool.rb
@@ -0,0 +1,73 @@
+# Author:: Couchbase <info@couchbase.com>
+# Copyright:: 2013 Couchbase, Inc.
+# License:: Apache License, Version 2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+require File.join(File.dirname(__FILE__), 'setup')
+require 'couchbase/connection_pool'
+
+class TestCouchbaseConnectionPool < MiniTest::Unit::TestCase
+
+ def setup
+ @mock = start_mock
+ @pool = ::Couchbase::ConnectionPool.new(5, :hostname => @mock.host, :port => @mock.port)
+ end
+
+ def teardown
+ stop_mock(@mock)
+ end
+
+ def test_basic_multithreaded_usage
+ @pool.set('foo', 'bar')
+
+ threads = []
+ 15.times do
+ threads << Thread.new do
+ @pool.get('foo')
+ end
+ end
+
+ result = threads.map(&:value)
+ result.each do |val|
+ assert_equal 'bar', val
+ end
+ end
+
+ def test_set_and_get
+ @pool.set('fiz', 'buzz')
+ assert_equal 'buzz', @pool.get('fiz')
+ end
+
+ def test_set_and_delete
+ @pool.set('baz', 'bar')
+ @pool.delete('baz')
+ assert_raises Couchbase::Error::NotFound do
+ @pool.get('baz')
+ end
+ end
+
+ def test_incr
+ @pool.set('counter', 0)
+ @pool.incr('counter', 1)
+ assert_equal 1, @pool.get('counter')
+ end
+
+ def test_decr
+ @pool.set('counter', 1)
+ @pool.decr('counter', 1)
+ assert_equal 0, @pool.get('counter')
+ end
+
+end
View
25 test/test_couchbase_rails_cache_store.rb
@@ -37,6 +37,12 @@ def store
:port => @mock.port)
end
+ def pool_store
+ @pool_store ||= ActiveSupport::Cache::CouchbaseStore.new(:hostname => @mock.host,
+ :port => @mock.port,
+ :connection_pool => 5)
+ end
+
def test_it_supported_methods
supported_methods = store.public_methods(false).map(&:to_sym)
assert supported_methods.include?(:fetch)
@@ -302,6 +308,25 @@ def test_it_is_threadsafe
workers.each { |w| w.join }
end
+ def test_it_can_use_connection_pool_for_thread_safety
+ workers = []
+
+ 10.times do
+ workers << Thread.new do
+ 100.times do
+ pool_store.write('a', 9)
+ pool_store.write('b', 11)
+ assert_equal 9, pool_store.read('a')
+ assert_equal({ 'a' => 9, 'b' => 11 }, pool_store.read_multi('a', 'b'))
+ assert_equal 11, pool_store.read('b')
+ assert_equal %w(a b), pool_store.read_multi('a', 'b', 'c').keys.sort
+ end
+ end
+ end
+
+ workers.each { |w| w.join }
+ end
+
private
def collect_notifications

0 comments on commit 5454381

Please sign in to comment.