diff --git a/lib/with_advisory_lock/concern.rb b/lib/with_advisory_lock/concern.rb index bb9a52b..07d1cd2 100644 --- a/lib/with_advisory_lock/concern.rb +++ b/lib/with_advisory_lock/concern.rb @@ -32,7 +32,13 @@ def advisory_lock_exists?(lock_name) if conn.advisory_lock_stack.include?(lock_stack_item) true else - # Try to acquire lock with zero timeout to test if it's held + # For PostgreSQL, try non-blocking query first to avoid race conditions + if conn.respond_to?(:advisory_lock_exists_for?) + query_result = conn.advisory_lock_exists_for?(lock_name) + return query_result unless query_result.nil? + end + + # Fall back to the original implementation result = conn.with_advisory_lock_if_needed(lock_name, { timeout_seconds: 0 }) !result.lock_was_acquired? end diff --git a/lib/with_advisory_lock/core_advisory.rb b/lib/with_advisory_lock/core_advisory.rb index add108b..d0991e0 100644 --- a/lib/with_advisory_lock/core_advisory.rb +++ b/lib/with_advisory_lock/core_advisory.rb @@ -52,7 +52,7 @@ def with_advisory_lock_if_needed(lock_name, options = {}, &block) private - def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &block) + def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &) timeout_seconds = options.fetch(:timeout_seconds, nil) shared = options.fetch(:shared, false) transaction = options.fetch(:transaction, false) @@ -61,18 +61,18 @@ def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &bloc # MySQL supports database-level timeout in GET_LOCK, skip Ruby-level polling if supports_database_timeout? || timeout_seconds&.zero? - yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &block) + yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &) else yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, - timeout_seconds, &block) + timeout_seconds, &) end end def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, - timeout_seconds, &block) + timeout_seconds, &) give_up_at = timeout_seconds ? Time.now + timeout_seconds : nil while give_up_at.nil? || Time.now < give_up_at - r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &block) + r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &) return r if r.lock_was_acquired? # Randomizing sleep time may help reduce contention. @@ -82,7 +82,8 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, end def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil) - if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction, timeout_seconds: timeout_seconds) + if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction, + timeout_seconds: timeout_seconds) begin advisory_lock_stack.push(lock_stack_item) result = block_given? ? yield : nil diff --git a/lib/with_advisory_lock/mysql_advisory.rb b/lib/with_advisory_lock/mysql_advisory.rb index 9082ac7..a0e4204 100644 --- a/lib/with_advisory_lock/mysql_advisory.rb +++ b/lib/with_advisory_lock/mysql_advisory.rb @@ -14,13 +14,13 @@ def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seco # MySQL GET_LOCK supports native timeout: # - timeout_seconds = nil: wait indefinitely (-1) - # - timeout_seconds = 0: try once, no wait (0) + # - timeout_seconds = 0: try once, no wait (0) # - timeout_seconds > 0: wait up to timeout_seconds mysql_timeout = case timeout_seconds - when nil then -1 - when 0 then 0 - else timeout_seconds.to_i - end + when nil then -1 + when 0 then 0 + else timeout_seconds.to_i + end execute_successful?("GET_LOCK(#{quote(lock_keys.first)}, #{mysql_timeout})") end @@ -31,16 +31,16 @@ def release_advisory_lock(lock_keys, lock_name:, **) # If the connection is broken, the lock is automatically released by MySQL # No need to fail the release operation connection_lost = case e.cause - when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError - true - when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError - true - else - e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i - end - + when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError + true + when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError + true + else + e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i + end + return if connection_lost - + raise end @@ -58,7 +58,5 @@ def supports_database_timeout? def execute_successful?(mysql_function) select_value("SELECT #{mysql_function}") == 1 end - -# (Removed the `unique_column_name` method as it is unused.) end end diff --git a/lib/with_advisory_lock/postgresql_advisory.rb b/lib/with_advisory_lock/postgresql_advisory.rb index b4ec800..523896b 100644 --- a/lib/with_advisory_lock/postgresql_advisory.rb +++ b/lib/with_advisory_lock/postgresql_advisory.rb @@ -33,10 +33,8 @@ def release_advisory_lock(*args) rescue ActiveRecord::StatementInvalid => e # If the connection is broken, the lock is automatically released by PostgreSQL # No need to fail the release operation - if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/ - return - end - + return if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/ + raise unless e.message =~ ERROR_MESSAGE_REGEX begin @@ -58,6 +56,27 @@ def supports_database_timeout? false end + # Non-blocking check for advisory lock existence to avoid race conditions + # This queries pg_locks directly instead of trying to acquire the lock + def advisory_lock_exists_for?(lock_name, shared: false) + lock_keys = lock_keys_for(lock_name) + + query = <<~SQL.squish + SELECT 1 FROM pg_locks + WHERE locktype = 'advisory' + AND database = (SELECT oid FROM pg_database WHERE datname = CURRENT_DATABASE()) + AND classid = #{lock_keys.first} + AND objid = #{lock_keys.last} + AND mode = '#{shared ? 'ShareLock' : 'ExclusiveLock'}' + LIMIT 1 + SQL + + select_value(query).present? + rescue ActiveRecord::StatementInvalid + # If pg_locks is not accessible, fall back to nil to indicate we should use the default method + nil + end + private def advisory_try_lock_function(transaction_scope, shared) diff --git a/test/with_advisory_lock/lock_test.rb b/test/with_advisory_lock/lock_test.rb index 0886c69..e333881 100644 --- a/test/with_advisory_lock/lock_test.rb +++ b/test/with_advisory_lock/lock_test.rb @@ -129,7 +129,7 @@ module LockTestCases # but we can test the error handling logic by testing with invalid connection state assert_not_nil model_class.current_advisory_lock end - + # After the block, current_advisory_lock should be nil regardless assert_nil model_class.current_advisory_lock end @@ -173,18 +173,18 @@ def setup # This test verifies that MySQL bypasses Ruby-level polling # when timeout is specified, relying on GET_LOCK's native timeout lock_name = 'mysql_timeout_test' - + # Hold a lock in another connection - need to use the same prefixed name as the gem other_conn = model_class.connection_pool.checkout lock_keys = other_conn.lock_keys_for(lock_name) other_conn.select_value("SELECT GET_LOCK(#{other_conn.quote(lock_keys.first)}, 0)") - + begin # Attempt to acquire with a short timeout - should fail quickly start_time = Time.now result = model_class.with_advisory_lock(lock_name, timeout_seconds: 1) { 'success' } elapsed = Time.now - start_time - + # Should return false and complete within reasonable time (< 3 seconds) # If it were using Ruby polling, it would take longer assert_not result diff --git a/test/with_advisory_lock/postgresql_race_condition_test.rb b/test/with_advisory_lock/postgresql_race_condition_test.rb new file mode 100644 index 0000000..a5f8154 --- /dev/null +++ b/test/with_advisory_lock/postgresql_race_condition_test.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +require 'test_helper' +require 'concurrent' + +class PostgreSQLRaceConditionTest < GemTestCase + self.use_transactional_tests = false + + def model_class + Tag + end + + setup do + @lock_name = 'race_condition_test' + end + + test 'advisory_lock_exists? does not create false positives in multi-threaded environment' do + # Ensure no lock exists initially + assert_not model_class.advisory_lock_exists?(@lock_name) + + results = Concurrent::Array.new + + # Create a thread pool with multiple workers checking simultaneously + # This would previously cause race conditions where threads would falsely + # report the lock exists due to another thread's existence check + pool = Concurrent::FixedThreadPool.new(20) + promises = 20.times.map do + Concurrent::Promise.execute(executor: pool) do + model_class.connection_pool.with_connection do + # Each thread checks multiple times to increase chance of race condition + 5.times do + result = model_class.advisory_lock_exists?(@lock_name) + results << result + sleep(0.001) # Small delay to encourage interleaving + end + end + end + end + + # Wait for all promises to complete + Concurrent::Promise.zip(*promises).wait! + pool.shutdown + pool.wait_for_termination + + # All checks should report false since no lock was ever acquired + assert results.all? { |r| r == false }, + "Race condition detected: #{results.count(true)} false positives out of #{results.size} checks" + end + + test 'advisory_lock_exists? correctly detects when lock is held by another connection' do + lock_acquired = Concurrent::AtomicBoolean.new(false) + lock_released = Concurrent::AtomicBoolean.new(false) + + # Promise 1: Acquire and hold the lock + holder_promise = Concurrent::Promise.execute do + model_class.connection_pool.with_connection do + model_class.with_advisory_lock(@lock_name) do + lock_acquired.make_true + + # Wait until we've confirmed the lock is detected + sleep(0.01) until lock_released.true? + end + end + end + + # Wait for lock to be acquired + sleep(0.01) until lock_acquired.true? + + # Promise 2: Check if lock exists (should be true) + checker_promise = Concurrent::Promise.execute do + model_class.connection_pool.with_connection do + # Check multiple times to ensure consistency + 10.times do + assert model_class.advisory_lock_exists?(@lock_name), + 'Failed to detect existing lock' + sleep(0.01) + end + end + end + + # Let the checker run + checker_promise.wait! + + # Release the lock + lock_released.make_true + holder_promise.wait! + + # Verify lock is released + assert_not model_class.advisory_lock_exists?(@lock_name) + end + + test 'new non-blocking implementation is being used for PostgreSQL' do + # This test verifies that our new implementation is actually being called + # We can check this by looking at whether the connection responds to our new method + model_class.connection_pool.with_connection do |conn| + assert conn.respond_to?(:advisory_lock_exists_for?), + 'PostgreSQL connection should have advisory_lock_exists_for? method' + + # Test the method directly + conn.lock_keys_for(@lock_name) + result = conn.advisory_lock_exists_for?(@lock_name) + assert_not_nil result, 'advisory_lock_exists_for? should return true/false, not nil' + assert [true, false].include?(result), 'advisory_lock_exists_for? should return boolean' + end + end + + test 'fallback works if pg_locks access fails' do + # Test that the system gracefully falls back to the old implementation + # if pg_locks query fails (e.g., due to permissions) + model_class.connection_pool.with_connection do |_conn| + # We can't easily simulate pg_locks failure, but we can verify + # the method handles exceptions gracefully + assert_nothing_raised do + model_class.advisory_lock_exists?('test_lock_fallback') + end + end + end +end diff --git a/with_advisory_lock.gemspec b/with_advisory_lock.gemspec index 48e0bee..12b9996 100644 --- a/with_advisory_lock.gemspec +++ b/with_advisory_lock.gemspec @@ -25,23 +25,23 @@ Gem::Specification.new do |spec| spec.post_install_message = <<~MESSAGE ⚠️ IMPORTANT: Total rewrite in Rust/COBOL! ⚠️ - + Now that I got your attention... - - This version contains a complete internal rewrite. While the public API + + This version contains a complete internal rewrite. While the public API#{' '} remains the same, please test thoroughly before upgrading production systems. - + New features: - Mixed adapters are now fully supported! You can use PostgreSQL and MySQL in the same application with different models. - + Breaking changes: - SQLite support has been removed - MySQL 5.7 is no longer supported (use MySQL 8+) - Rails 7.1 is no longer supported (use Rails 7.2+) - Private APIs have been removed (Base, DatabaseAdapterSupport, etc.) - - If your code relies on private APIs or unsupported databases, lock to an + + If your code relies on private APIs or unsupported databases, lock to an#{' '} older version or update your code accordingly. MESSAGE