Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ConnectionPool patch to make it fiber aware (instead of driver-level

pooling)
  • Loading branch information...
commit 970427cfc1843aa3ac734d2318ecb969dfc4ee44 1 parent 1773953
@igrigorik authored
View
18 lib/active_record/connection_adapters/em_mysqlplus_adapter.rb
@@ -17,15 +17,13 @@ def initialize(connection, logger, host_parameters, connection_parameters, confi
end
def connect
- @connection = EventMachine::Synchrony::ConnectionPool.new(size: @config[:fiber_pool]) do
- EventMachine::MySQL.new({
- :host => @hostname,
- :port => @port,
- :database => @config[:database],
- :password => @config[:password],
- :socket => @config[:socket]
- })
- end
+ @connection = EventMachine::MySQL.new({
+ :host => @hostname,
+ :port => @port,
+ :database => @config[:database],
+ :password => @config[:password],
+ :socket => @config[:socket]
+ })
configure_connection
@connection
@@ -51,4 +49,4 @@ def self.em_mysqlplus_connection(config) # :nodoc:
ConnectionAdapters::EmMysqlAdapter.new(nil, logger, [host, port], [database, username, password], config)
end
end
-end
+end
View
102 lib/active_record/patches.rb
@@ -0,0 +1,102 @@
+module ActiveRecord
+ module ConnectionAdapters
+
+ def self.fiber_pools
+ @fiber_pools ||= []
+ end
+ def self.register_fiber_pool(fp)
+ fiber_pools << fp
+ end
+
+ class FiberedMonitor
+ class Queue
+ def initialize
+ @queue = []
+ end
+
+ def wait(timeout)
+ t = timeout || 5
+ fiber = Fiber.current
+ x = EM::Timer.new(t) do
+ @queue.delete(fiber)
+ fiber.resume(false)
+ end
+ @queue << fiber
+ returning Fiber.yield do
+ x.cancel
+ end
+ end
+
+ def signal
+ fiber = @queue.pop
+ fiber.resume(true) if fiber
+ end
+ end
+
+ def synchronize
+ yield
+ end
+
+ def new_cond
+ Queue.new
+ end
+ end
+
+ # ActiveRecord's connection pool is based on threads. Since we are working
+ # with EM and a single thread, multiple fiber design, we need to provide
+ # our own connection pool that keys off of Fiber.current so that different
+ # fibers running in the same thread don't try to use the same connection.
+ class ConnectionPool
+ def initialize(spec)
+ @spec = spec
+
+ # The cache of reserved connections mapped to threads
+ @reserved_connections = {}
+
+ # The mutex used to synchronize pool access
+ @connection_mutex = FiberedMonitor.new
+ @queue = @connection_mutex.new_cond
+
+ # default 5 second timeout unless on ruby 1.9
+ @timeout = spec.config[:wait_timeout] || 5
+
+ # default max pool size to 5
+ @size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
+
+ @connections = []
+ @checked_out = []
+ end
+
+ private
+
+ def current_connection_id #:nodoc:
+ Fiber.current.object_id
+ end
+
+ # Remove stale fibers from the cache.
+ def remove_stale_cached_threads!(cache, &block)
+ keys = Set.new(cache.keys)
+
+ ActiveRecord::ConnectionAdapters.fiber_pools.each do |pool|
+ pool.busy_fibers.each_pair do |object_id, fiber|
+ keys.delete(object_id)
+ end
+ end
+# puts "Pruning stale connections: #{f.busy_fibers.size} #{f.fibers.size} #{keys.inspect}"
+ keys.each do |key|
+ next unless cache.has_key?(key)
+ block.call(key, cache[key])
+ cache.delete(key)
+ end
+ end
+
+ def checkout_and_verify(c)
+ @checked_out << c
+ c.run_callbacks :checkout
+ c.verify!
+ c
+ end
+ end
+
+ end
+end
View
3  lib/em-activerecord.rb
@@ -1,3 +1,4 @@
$:.unshift(File.dirname(__FILE__) + '/../lib')
-require 'active_record/connection_adapters/em_mysqlplus_adapter'
+require 'active_record/connection_adapters/em_mysqlplus_adapter'
+require 'active_record/patches'
View
24 spec/activerecord_spec.rb
@@ -15,9 +15,6 @@
ActiveRecord::Base.time_zone_aware_attributes = true
Time.zone = 'UTC'
-class Widget < ActiveRecord::Base; end
-
-
describe "ActiveRecord Driver for EM-MySQLPlus" do
it "should establish AR connection" do
EventMachine.run {
@@ -30,4 +27,23 @@ class Widget < ActiveRecord::Base; end
}.resume
}
end
-end
+
+ it "should use fiber aware ConnectionPool" do
+ EventMachine.run {
+ results = []
+
+ 3.times do |n|
+ Fiber.new {
+ ActiveRecord::Base.establish_connection
+ results.push ActiveRecord::Base.connection.query('select sleep(1)')
+ }.resume
+ end
+
+ EM.add_timer(1.5) {
+ results.size.should == 3
+ EventMachine.stop
+ }
+ }
+ end
+
+end
View
3  spec/database.yml
@@ -1,5 +1,4 @@
test:
adapter: em_mysqlplus
database: widgets
- pool: 1
- fiber_pool: 1
+ pool: 3
Please sign in to comment.
Something went wrong with that request. Please try again.