Permalink
Browse files

make the keyspace threadsafe and use a connection pool to make it wor…

…k with sidekiq
  • Loading branch information...
1 parent 32a364f commit e8a60474110a18e3fdc8ca79f8f7ff8bb9e516e0 Aubrey Holland committed Oct 10, 2012
Showing with 107 additions and 47 deletions.
  1. +1 −0 .gitignore
  2. +4 −11 Gemfile.lock
  3. +1 −0 cequel.gemspec
  4. +1 −0 lib/cequel.rb
  5. +0 −1 lib/cequel/data_set.rb
  6. +86 −21 lib/cequel/keyspace.rb
  7. +9 −9 lib/cequel/model.rb
  8. +1 −1 lib/cequel/version.rb
  9. +1 −1 spec/examples/model/spec_helper.rb
  10. +3 −3 spec/support/helpers.rb
View
@@ -1,2 +1,3 @@
.yardoc
doc
+*.gem
View
@@ -1,10 +1,11 @@
PATH
remote: .
specs:
- cequel (0.4.2)
+ cequel (0.4.3)
activemodel (~> 3.1)
activesupport (~> 3.1)
cassandra-cql (~> 1.0)
+ connection_pool (~> 0.9.2)
i18n
GEM
@@ -17,17 +18,10 @@ GEM
i18n (~> 0.6)
multi_json (~> 1.0)
builder (3.0.3)
- cassandra-cql (1.0.4)
+ cassandra-cql (1.1.1)
simple_uuid (>= 0.2.0)
thrift_client (>= 0.7.1)
- columnize (0.3.6)
- debugger (1.2.0)
- columnize (>= 0.3.1)
- debugger-linecache (~> 1.1.1)
- debugger-ruby_core_source (~> 1.1.3)
- debugger-linecache (1.1.2)
- debugger-ruby_core_source (>= 1.1.1)
- debugger-ruby_core_source (1.1.3)
+ connection_pool (0.9.2)
diff-lcs (1.1.3)
geminabox (0.7.0)
builder
@@ -68,7 +62,6 @@ PLATFORMS
DEPENDENCIES
cequel!
- debugger
geminabox
redcarpet (< 2.0)
rocco
View
@@ -21,6 +21,7 @@ DESC
s.add_runtime_dependency 'activesupport', '~> 3.1'
s.add_runtime_dependency 'activemodel', '~> 3.1'
s.add_runtime_dependency 'cassandra-cql', '~> 1.0'
+ s.add_runtime_dependency 'connection_pool', '~> 0.9.2'
s.add_runtime_dependency 'i18n'
s.add_development_dependency 'rspec', '~> 2.0'
s.add_development_dependency 'yard', '~> 0.6'
View
@@ -1,5 +1,6 @@
require 'active_support/core_ext'
require 'cassandra-cql'
+require 'connection_pool'
require 'cequel/batch'
require 'cequel/errors'
View
@@ -60,7 +60,6 @@ def update(data, options = {})
append(generate_upsert_options(options)).
append(" SET " << data.keys.map { |k| "? = ?" }.join(', '), *data.to_a.flatten).
append(*row_specifications_cql)
-
@keyspace.write(*statement.args)
rescue EmptySubquery
# Noop -- no rows to update
View
@@ -6,24 +6,71 @@ module Cequel
class Keyspace
#
- # Set a logger for logging queries. Queries logged at INFO level
- #
- attr_writer :logger, :slowlog, :slowlog_threshold, :connection
-
- #
# @api private
# @see Cequel.connect
#
- def initialize(configuration = {})
- @name = configuration[:keyspace]
+ def initialize(configuration={})
+ end
+
+ def self.connection=(connection)
+ @connection = connection
+ end
+
+ def self.configure(configuration = {})
+ @configuration = configuration
@hosts = configuration[:host] || configuration[:hosts]
@thrift_options = configuration[:thrift].try(:symbolize_keys) || {}
+ @keyspace = configuration[:keyspace]
+ # reset the connections
+ @connection_pool = nil
+ end
+
+ def self.configuration
+ @configuration
end
- def connection
- @connection ||= CassandraCQL::Database.new(
- @hosts, {:keyspace => @name}, @thrift_options
- )
+ def self.logger=(logger)
+ @logger = logger
+ end
+
+ def self.logger
+ @logger
+ end
+
+ def self.slowlog=(slowlog)
+ @slowlog = slowlog
+ end
+
+ def self.slowlog
+ @slowlog
+ end
+
+ def self.slowlog_threshold=(slowlog_threshold)
+ @slowlog_threshold = slowlog_threshold
+ end
+
+ def self.slowlog_threshold
+ @slowlog_threshold
+ end
+
+ def self.connection_pool
+ @connection_pool ||= ConnectionPool.new(:size => 50, :timeout => 5) do
+ @connection || CassandraCQL::Database.new(
+ @hosts,
+ { :keyspace => @keyspace },
+ @thrift_options
+ )
+ end
+ end
+
+ def self.clear_active_connections!
+ @connection_pool = nil
+ end
+
+ def with_connection(&block)
+ self.class.connection_pool.with do |conn|
+ block.call(conn)
+ end
end
#
@@ -44,7 +91,9 @@ def [](column_family_name)
#
def execute(statement, *bind_vars)
log('CQL', statement, *bind_vars) do
- connection.execute(statement, *bind_vars)
+ with_connection do |conn|
+ conn.execute(statement, *bind_vars)
+ end
end
end
@@ -55,8 +104,8 @@ def execute(statement, *bind_vars)
# @param (see #execute)
#
def write(statement, *bind_vars)
- if @batch
- @batch.execute(statement, *bind_vars)
+ if get_batch
+ get_batch.execute(statement, *bind_vars)
else
execute(statement, *bind_vars)
end
@@ -76,17 +125,31 @@ def write(statement, *bind_vars)
# end
#
def batch(options = {})
- old_batch, @batch = @batch, Batch.new(self, options)
+ old_batch = get_batch
+ new_batch = Batch.new(self, options)
+ set_batch(new_batch)
yield
- @batch.apply
+ new_batch.apply
ensure
- @batch = old_batch
+ set_batch(old_batch)
end
private
+ def get_batch
+ ::Thread.current[batch_key]
+ end
+
+ def set_batch(batch)
+ ::Thread.current[batch_key] = batch
+ end
+
+ def batch_key
+ :"cequel-batch-#{object_id}"
+ end
+
def log(label, statement, *bind_vars)
- return yield unless @logger || @slowlog
+ return yield unless self.class.logger || self.class.slowlog
response = nil
time = Benchmark.ms { response = yield }
generate_message = proc do
@@ -95,9 +158,11 @@ def log(label, statement, *bind_vars)
CassandraCQL::Statement.sanitize(statement, bind_vars)
)
end
- @logger.debug(&generate_message) if @logger
- threshold = @slowlog_threshold || 2000
- @slowlog.warn(&generate_message) if @slowlog && time >= threshold
+ self.class.logger.debug(&generate_message) if self.class.logger
+ threshold = self.class.slowlog_threshold || 2000
+ if self.class.slowlog && time >= threshold
+ self.class.slowlog.warn(&generate_message)
+ end
response
end
View
@@ -63,27 +63,27 @@ module Model
end
def self.keyspace
- @keyspace ||= Cequel.connect(@configuration).tap do |keyspace|
- keyspace.logger = @logger if @logger
- keyspace.slowlog = @slowlog if @slowlog
- keyspace.slowlog_threshold = @slowlog_threshold if @slowlog_threshold
- end
+ @keyspace ||= Cequel.connect
end
def self.configure(configuration)
- @configuration = configuration
+ Cequel::Keyspace.configure(configuration)
+ end
+
+ def self.configuration
+ Cequel::Keyspace.configuration
end
def self.logger=(logger)
- @logger = logger
+ Cequel::Keyspace.logger = logger
end
def self.slowlog=(slowlog)
- @slowlog = slowlog
+ Cequel::Keyspace.slowlog = slowlog
end
def self.slowlog_threshold=(slowlog_threshold)
- @slowlog_threshold = slowlog_threshold
+ Cequel::Keyspace.slowlog_threshold = slowlog_threshold
end
def initialize
View
@@ -1,3 +1,3 @@
module Cequel
- VERSION = '0.4.2'
+ VERSION = '0.4.3'
end
@@ -7,6 +7,6 @@
RSpec.configure do |config|
config.before :each do
- Cequel::Model.keyspace.connection = connection
+ Cequel::Keyspace.connection = connection
end
end
View
@@ -11,13 +11,13 @@ def result_stub(*results)
end
def connection
+ Connection.stub(:keyspace=)
Connection
end
def cequel
- @cequel ||= Cequel::Keyspace.new({}).tap do |keyspace|
- keyspace.connection = connection
- end
+ Cequel::Keyspace.connection = connection
+ @cequel ||= Cequel::Keyspace.new({})
end
end

0 comments on commit e8a6047

Please sign in to comment.