diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 163c04ef24df0..3cb017dd0645f 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,4 +1,10 @@ -## Rails 4.1.6 (September 11, 2014) ## +* Reap connections that were checked out by now-dead threads, instead + of waiting until they disconnect by themselves. Before this change, + a suitably constructed series of short-lived threads could starve + the connection pool, without ever having more than a couple alive at + the same time. + + *Matthew Draper* * Fixed a regression where whitespaces were stripped from DISTINCT queries in PostgreSQL. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index b459b949d2e34..7ad7ebfc463d3 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -58,13 +58,11 @@ module ConnectionAdapters # * +checkout_timeout+: number of seconds to block and wait for a connection # before giving up and raising a timeout error (default 5 seconds). # * +reaping_frequency+: frequency in seconds to periodically run the - # Reaper, which attempts to find and close dead connections, which can - # occur if a programmer forgets to close a connection at the end of a - # thread or a thread dies unexpectedly. (Default nil, which means don't - # run the Reaper). - # * +dead_connection_timeout+: number of seconds from last checkout - # after which the Reaper will consider a connection reapable. (default - # 5 seconds). + # Reaper, which attempts to find and recover connections from dead + # threads, which can occur if a programmer forgets to close a + # connection at the end of a thread or a thread dies unexpectedly. + # Regardless of this setting, the Reaper will be invoked before every + # blocking wait. (Default nil, which means don't schedule the Reaper). class ConnectionPool # Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool # with which it shares a Monitor. But could be a generic Queue. @@ -222,7 +220,7 @@ def run include MonitorMixin - attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout + attr_accessor :automatic_reconnect, :checkout_timeout attr_reader :spec, :connections, :size, :reaper # Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification @@ -361,11 +359,13 @@ def checkout # calling +checkout+ on this pool. def checkin(conn) synchronize do + owner = conn.owner + conn.run_callbacks :checkin do conn.expire end - release conn + release conn, owner @available.add conn end @@ -378,22 +378,28 @@ def remove(conn) @connections.delete conn @available.delete conn - # FIXME: we might want to store the key on the connection so that removing - # from the reserved hash will be a little easier. - release conn + release conn, conn.owner @available.add checkout_new_connection if @available.any_waiting? end end - # Removes dead connections from the pool. A dead connection can occur - # if a programmer forgets to close a connection at the end of a thread + # Recover lost connections for the pool. A lost connection can occur if + # a programmer forgets to checkin a connection at the end of a thread # or a thread dies unexpectedly. def reap - synchronize do - stale = Time.now - @dead_connection_timeout - connections.dup.each do |conn| - if conn.in_use? && stale > conn.last_use && !conn.active_threadsafe? + stale_connections = synchronize do + @connections.select do |conn| + conn.in_use? && !conn.owner.alive? + end + end + + stale_connections.each do |conn| + synchronize do + if conn.active? + conn.reset! + checkin conn + else remove conn end end @@ -415,20 +421,15 @@ def acquire_connection elsif @connections.size < @size checkout_new_connection else + reap @available.poll(@checkout_timeout) end end - def release(conn) - thread_id = if @reserved_connections[current_connection_id] == conn - current_connection_id - else - @reserved_connections.keys.find { |k| - @reserved_connections[k] == conn - } - end + def release(conn, owner) + thread_id = owner.object_id - @reserved_connections.delete thread_id if thread_id + @reserved_connections.delete thread_id end def new_connection diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index 11b28a4858fcb..3550a27318144 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -71,8 +71,8 @@ class AbstractAdapter define_callbacks :checkout, :checkin attr_accessor :visitor, :pool - attr_reader :schema_cache, :last_use, :in_use, :logger - alias :in_use? :in_use + attr_reader :schema_cache, :owner, :logger + alias :in_use? :owner def self.type_cast_config_to_integer(config) if config =~ SIMPLE_INT @@ -94,9 +94,8 @@ def initialize(connection, logger = nil, pool = nil) #:nodoc: super() @connection = connection - @in_use = false + @owner = nil @instrumenter = ActiveSupport::Notifications.instrumenter - @last_use = false @logger = logger @pool = pool @schema_cache = SchemaCache.new self @@ -114,9 +113,8 @@ def schema_creation def lease synchronize do - unless in_use - @in_use = true - @last_use = Time.now + unless in_use? + @owner = Thread.current end end end @@ -127,7 +125,7 @@ def schema_cache=(cache) end def expire - @in_use = false + @owner = nil end def unprepared_visitor @@ -262,12 +260,6 @@ def disable_referential_integrity def active? end - # Adapter should redefine this if it needs a threadsafe way to approximate - # if the connection is active - def active_threadsafe? - active? - end - # Disconnects from the database if already connected, and establishes a # new connection with the database. Implementors should call super if they # override the default implementation. diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index b64d6da48e645..8f06c09dac25e 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -592,10 +592,6 @@ def active? false end - def active_threadsafe? - @connection.connect_poll != PG::PGRES_POLLING_FAILED - end - # Close then reopen the connection. def reconnect! super diff --git a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb index eb2fe5639b238..deed226eabbc1 100644 --- a/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb +++ b/activerecord/test/cases/connection_adapters/abstract_adapter_test.rb @@ -29,12 +29,6 @@ def test_lease_twice assert_not adapter.lease, 'should not lease adapter' end - def test_last_use - assert_not adapter.last_use - adapter.lease - assert adapter.last_use - end - def test_expire_mutates_in_use assert adapter.lease, 'lease adapter' assert adapter.in_use?, 'adapter is in use' diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index 1cf215017b788..cd252ad8648df 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -125,7 +125,6 @@ def test_reap_and_active @pool.checkout @pool.checkout @pool.checkout - @pool.dead_connection_timeout = 0 connections = @pool.connections.dup @@ -135,21 +134,25 @@ def test_reap_and_active end def test_reap_inactive + ready = false @pool.checkout - @pool.checkout - @pool.checkout - @pool.dead_connection_timeout = 0 - - connections = @pool.connections.dup - connections.each do |conn| - conn.extend(Module.new { def active_threadsafe?; false; end; }) + child = Thread.new do + @pool.checkout + @pool.checkout + ready = true + Thread.stop end + Thread.pass until ready + + assert_equal 3, active_connections(@pool).size + child.terminate + child.join @pool.reap - assert_equal 0, @pool.connections.length + assert_equal 1, active_connections(@pool).size ensure - connections.each(&:close) + @pool.connections.each(&:close) end def test_remove_connection diff --git a/activerecord/test/cases/reaper_test.rb b/activerecord/test/cases/reaper_test.rb index b62a41c08e59f..363320ce44d3c 100644 --- a/activerecord/test/cases/reaper_test.rb +++ b/activerecord/test/cases/reaper_test.rb @@ -64,17 +64,22 @@ def test_connection_pool_starts_reaper spec.config[:reaping_frequency] = 0.0001 pool = ConnectionPool.new spec - pool.dead_connection_timeout = 0 - conn = pool.checkout - count = pool.connections.length + conn = nil + child = Thread.new do + conn = pool.checkout + Thread.stop + end + Thread.pass while conn.nil? + + assert conn.in_use? - conn.extend(Module.new { def active_threadsafe?; false; end; }) + child.terminate - while count == pool.connections.length + while conn.in_use? Thread.pass end - assert_equal(count - 1, pool.connections.length) + assert !conn.in_use? end end end