Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion lib/with_advisory_lock/concern.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ def advisory_lock_exists?(lock_name)
if conn.advisory_lock_stack.include?(lock_stack_item)
true
else
# Try to acquire lock with zero timeout to test if it's held
# For PostgreSQL, try non-blocking query first to avoid race conditions
if conn.respond_to?(:advisory_lock_exists_for?)
query_result = conn.advisory_lock_exists_for?(lock_name)
return query_result unless query_result.nil?
end

# Fall back to the original implementation
result = conn.with_advisory_lock_if_needed(lock_name, { timeout_seconds: 0 })
!result.lock_was_acquired?
end
Expand Down
13 changes: 7 additions & 6 deletions lib/with_advisory_lock/core_advisory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def with_advisory_lock_if_needed(lock_name, options = {}, &block)

private

def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &block)
def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &)
timeout_seconds = options.fetch(:timeout_seconds, nil)
shared = options.fetch(:shared, false)
transaction = options.fetch(:transaction, false)
Expand All @@ -61,18 +61,18 @@ def advisory_lock_and_yield(lock_name, lock_str, lock_stack_item, options, &bloc

# MySQL supports database-level timeout in GET_LOCK, skip Ruby-level polling
if supports_database_timeout? || timeout_seconds&.zero?
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &block)
yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, timeout_seconds, &)
else
yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction,
timeout_seconds, &block)
timeout_seconds, &)
end
end

def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction,
timeout_seconds, &block)
timeout_seconds, &)
give_up_at = timeout_seconds ? Time.now + timeout_seconds : nil
while give_up_at.nil? || Time.now < give_up_at
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &block)
r = yield_with_lock(lock_keys, lock_name, lock_str, lock_stack_item, shared, transaction, 0, &)
return r if r.lock_was_acquired?

# Randomizing sleep time may help reduce contention.
Expand All @@ -82,7 +82,8 @@ def yield_with_lock_and_timeout(lock_keys, lock_name, lock_str, lock_stack_item,
end

def yield_with_lock(lock_keys, lock_name, _lock_str, lock_stack_item, shared, transaction, timeout_seconds = nil)
if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction, timeout_seconds: timeout_seconds)
if try_advisory_lock(lock_keys, lock_name: lock_name, shared: shared, transaction: transaction,
timeout_seconds: timeout_seconds)
begin
advisory_lock_stack.push(lock_stack_item)
result = block_given? ? yield : nil
Expand Down
30 changes: 14 additions & 16 deletions lib/with_advisory_lock/mysql_advisory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def try_advisory_lock(lock_keys, lock_name:, shared:, transaction:, timeout_seco

# MySQL GET_LOCK supports native timeout:
# - timeout_seconds = nil: wait indefinitely (-1)
# - timeout_seconds = 0: try once, no wait (0)
# - timeout_seconds = 0: try once, no wait (0)
# - timeout_seconds > 0: wait up to timeout_seconds
mysql_timeout = case timeout_seconds
when nil then -1
when 0 then 0
else timeout_seconds.to_i
end
when nil then -1
when 0 then 0
else timeout_seconds.to_i
end

execute_successful?("GET_LOCK(#{quote(lock_keys.first)}, #{mysql_timeout})")
end
Expand All @@ -31,16 +31,16 @@ def release_advisory_lock(lock_keys, lock_name:, **)
# If the connection is broken, the lock is automatically released by MySQL
# No need to fail the release operation
connection_lost = case e.cause
when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError
true
when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError
true
else
e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i
end
when defined?(Mysql2::Error::ConnectionError) && Mysql2::Error::ConnectionError
true
when defined?(Trilogy::ConnectionError) && Trilogy::ConnectionError
true
else
e.message =~ /Lost connection|MySQL server has gone away|Connection refused/i
end

return if connection_lost

raise
end

Expand All @@ -58,7 +58,5 @@ def supports_database_timeout?
def execute_successful?(mysql_function)
select_value("SELECT #{mysql_function}") == 1
end

# (Removed the `unique_column_name` method as it is unused.)
end
end
27 changes: 23 additions & 4 deletions lib/with_advisory_lock/postgresql_advisory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ def release_advisory_lock(*args)
rescue ActiveRecord::StatementInvalid => e
# If the connection is broken, the lock is automatically released by PostgreSQL
# No need to fail the release operation
if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/
return
end

return if e.cause.is_a?(PG::ConnectionBad) || e.message =~ /PG::ConnectionBad/

raise unless e.message =~ ERROR_MESSAGE_REGEX

begin
Expand All @@ -58,6 +56,27 @@ def supports_database_timeout?
false
end

# Non-blocking check for advisory lock existence to avoid race conditions
# This queries pg_locks directly instead of trying to acquire the lock
def advisory_lock_exists_for?(lock_name, shared: false)
lock_keys = lock_keys_for(lock_name)

query = <<~SQL.squish
SELECT 1 FROM pg_locks
WHERE locktype = 'advisory'
AND database = (SELECT oid FROM pg_database WHERE datname = CURRENT_DATABASE())
AND classid = #{lock_keys.first}
AND objid = #{lock_keys.last}
AND mode = '#{shared ? 'ShareLock' : 'ExclusiveLock'}'
LIMIT 1
SQL

select_value(query).present?
rescue ActiveRecord::StatementInvalid
# If pg_locks is not accessible, fall back to nil to indicate we should use the default method
nil
end

private

def advisory_try_lock_function(transaction_scope, shared)
Expand Down
8 changes: 4 additions & 4 deletions test/with_advisory_lock/lock_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ module LockTestCases
# but we can test the error handling logic by testing with invalid connection state
assert_not_nil model_class.current_advisory_lock
end

# After the block, current_advisory_lock should be nil regardless
assert_nil model_class.current_advisory_lock
end
Expand Down Expand Up @@ -173,18 +173,18 @@ def setup
# This test verifies that MySQL bypasses Ruby-level polling
# when timeout is specified, relying on GET_LOCK's native timeout
lock_name = 'mysql_timeout_test'

# Hold a lock in another connection - need to use the same prefixed name as the gem
other_conn = model_class.connection_pool.checkout
lock_keys = other_conn.lock_keys_for(lock_name)
other_conn.select_value("SELECT GET_LOCK(#{other_conn.quote(lock_keys.first)}, 0)")

begin
# Attempt to acquire with a short timeout - should fail quickly
start_time = Time.now
result = model_class.with_advisory_lock(lock_name, timeout_seconds: 1) { 'success' }
elapsed = Time.now - start_time

# Should return false and complete within reasonable time (< 3 seconds)
# If it were using Ruby polling, it would take longer
assert_not result
Expand Down
118 changes: 118 additions & 0 deletions test/with_advisory_lock/postgresql_race_condition_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# frozen_string_literal: true

require 'test_helper'
require 'concurrent'

class PostgreSQLRaceConditionTest < GemTestCase
self.use_transactional_tests = false

def model_class
Tag
end

setup do
@lock_name = 'race_condition_test'
end

test 'advisory_lock_exists? does not create false positives in multi-threaded environment' do
# Ensure no lock exists initially
assert_not model_class.advisory_lock_exists?(@lock_name)

results = Concurrent::Array.new

# Create a thread pool with multiple workers checking simultaneously
# This would previously cause race conditions where threads would falsely
# report the lock exists due to another thread's existence check
pool = Concurrent::FixedThreadPool.new(20)
promises = 20.times.map do
Concurrent::Promise.execute(executor: pool) do
model_class.connection_pool.with_connection do
# Each thread checks multiple times to increase chance of race condition
5.times do
result = model_class.advisory_lock_exists?(@lock_name)
results << result
sleep(0.001) # Small delay to encourage interleaving
end
end
end
end

# Wait for all promises to complete
Concurrent::Promise.zip(*promises).wait!
pool.shutdown
pool.wait_for_termination

# All checks should report false since no lock was ever acquired
assert results.all? { |r| r == false },
"Race condition detected: #{results.count(true)} false positives out of #{results.size} checks"
end

test 'advisory_lock_exists? correctly detects when lock is held by another connection' do
lock_acquired = Concurrent::AtomicBoolean.new(false)
lock_released = Concurrent::AtomicBoolean.new(false)

# Promise 1: Acquire and hold the lock
holder_promise = Concurrent::Promise.execute do
model_class.connection_pool.with_connection do
model_class.with_advisory_lock(@lock_name) do
lock_acquired.make_true

# Wait until we've confirmed the lock is detected
sleep(0.01) until lock_released.true?
end
end
end

# Wait for lock to be acquired
sleep(0.01) until lock_acquired.true?

# Promise 2: Check if lock exists (should be true)
checker_promise = Concurrent::Promise.execute do
model_class.connection_pool.with_connection do
# Check multiple times to ensure consistency
10.times do
assert model_class.advisory_lock_exists?(@lock_name),
'Failed to detect existing lock'
sleep(0.01)
end
end
end

# Let the checker run
checker_promise.wait!

# Release the lock
lock_released.make_true
holder_promise.wait!

# Verify lock is released
assert_not model_class.advisory_lock_exists?(@lock_name)
end

test 'new non-blocking implementation is being used for PostgreSQL' do
# This test verifies that our new implementation is actually being called
# We can check this by looking at whether the connection responds to our new method
model_class.connection_pool.with_connection do |conn|
assert conn.respond_to?(:advisory_lock_exists_for?),
'PostgreSQL connection should have advisory_lock_exists_for? method'

# Test the method directly
conn.lock_keys_for(@lock_name)
result = conn.advisory_lock_exists_for?(@lock_name)
assert_not_nil result, 'advisory_lock_exists_for? should return true/false, not nil'
assert [true, false].include?(result), 'advisory_lock_exists_for? should return boolean'
end
end

test 'fallback works if pg_locks access fails' do
# Test that the system gracefully falls back to the old implementation
# if pg_locks query fails (e.g., due to permissions)
model_class.connection_pool.with_connection do |_conn|
# We can't easily simulate pg_locks failure, but we can verify
# the method handles exceptions gracefully
assert_nothing_raised do
model_class.advisory_lock_exists?('test_lock_fallback')
end
end
end
end
14 changes: 7 additions & 7 deletions with_advisory_lock.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,23 @@ Gem::Specification.new do |spec|

spec.post_install_message = <<~MESSAGE
⚠️ IMPORTANT: Total rewrite in Rust/COBOL! ⚠️

Now that I got your attention...
This version contains a complete internal rewrite. While the public API

This version contains a complete internal rewrite. While the public API#{' '}
remains the same, please test thoroughly before upgrading production systems.

New features:
- Mixed adapters are now fully supported! You can use PostgreSQL and MySQL
in the same application with different models.

Breaking changes:
- SQLite support has been removed
- MySQL 5.7 is no longer supported (use MySQL 8+)
- Rails 7.1 is no longer supported (use Rails 7.2+)
- Private APIs have been removed (Base, DatabaseAdapterSupport, etc.)
If your code relies on private APIs or unsupported databases, lock to an

If your code relies on private APIs or unsupported databases, lock to an#{' '}
older version or update your code accordingly.
MESSAGE

Expand Down