Skip to content

Commit

Permalink
Allow Lockable to have custom lock key; use FOR UPDATE SKIP LOCKED
Browse files Browse the repository at this point in the history
- `FOR UPDATE SKIP LOCKED`: found implementation in rihanna that is similar: https://github.com/samsondav/rihanna/blob/1840d2226fd2bcb77635f995f431b26e8f2dfc69/lib/rihanna/job.ex#L364-L367
  • Loading branch information
bensheldon committed Jun 23, 2021
1 parent aa8e216 commit c02d32f
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 27 deletions.
5 changes: 1 addition & 4 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,7 @@ def self.queue_parser(string)
def self.perform_with_advisory_lock
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
# TODO: Determine why some records are fetched without an advisory lock at all
break unless good_job&.executable?

good_job.perform
good_job&.perform
end
end

Expand Down
57 changes: 34 additions & 23 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Lockable
RecordAlreadyAdvisoryLockedError = Class.new(StandardError)

included do
cattr_accessor(:lockable_column, instance_accessor: false) { primary_key }

# Attempt to acquire an advisory lock on the selected records and
# return only those records for which a lock could be acquired.
# @!method advisory_lock
Expand All @@ -41,9 +43,14 @@ module Lockable

composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))

# In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED
# because this causes the query to skip jobs that were completed (and deleted)
# by another session in the time since the table snapshot was taken.
# In rare cases under high concurrency levels, leaving this out can result in double executions.
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(lockable_column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.lock(Arel.sql("FOR UPDATE SKIP LOCKED"))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand All @@ -66,8 +73,8 @@ module Lockable
join_sql = <<~SQL.squish
LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{quoted_primary_key}::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(lockable_column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || #{quoted_table_name}.#{connection.quote_column_name(lockable_column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL

joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }]))
Expand Down Expand Up @@ -152,25 +159,25 @@ def supports_cte_materialization_specifiers?
# you are done with {#advisory_unlock} (or {#advisory_unlock!} to release
# all remaining locks).
# @return [Boolean] whether the lock was acquired.
def advisory_lock
def advisory_lock(key = lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
WHERE pg_try_advisory_lock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint)
WHERE pg_try_advisory_lock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Lock', binds).any?
end

# Releases an advisory lock on this record if it is locked by this database
# session. Note that advisory locks stack, so you must call
# {#advisory_unlock} and {#advisory_lock} the same number of times.
# @return [Boolean] whether the lock was released.
def advisory_unlock
def advisory_unlock(key = lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
WHERE pg_advisory_unlock(('x'||substr(md5($1 || $2::text), 1, 16))::bit(64)::bigint)
WHERE pg_advisory_unlock(('x'||substr(md5($1::text), 1, 16))::bit(64)::bigint)
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Unlock', binds).any?
end

Expand All @@ -179,8 +186,8 @@ def advisory_unlock
# database session.
# @raise [RecordAlreadyAdvisoryLockedError]
# @return [Boolean] +true+
def advisory_lock!
result = advisory_lock
def advisory_lock!(key = lockable_key)
result = advisory_lock(key)
result || raise(RecordAlreadyAdvisoryLockedError)
end

Expand All @@ -196,51 +203,55 @@ def advisory_lock!
# record.with_advisory_lock do
# do_something_with record
# end
def with_advisory_lock
def with_advisory_lock(key = lockable_key)
raise ArgumentError, "Must provide a block" unless block_given?

advisory_lock!
advisory_lock!(key)
yield
ensure
advisory_unlock unless $ERROR_INFO.is_a? RecordAlreadyAdvisoryLockedError
end

# Tests whether this record has an advisory lock on it.
# @return [Boolean]
def advisory_locked?
def advisory_locked?(key = lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key], [nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Advisory Locked?', binds).any?
end

# Tests whether this record is locked by the current database session.
# @return [Boolean]
def owns_advisory_lock?
def owns_advisory_lock?(key = lockable_key)
query = <<~SQL.squish
SELECT 1 AS one
FROM pg_locks
WHERE pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5($1 || $2::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($3 || $4::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5($1::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5($2::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.pid = pg_backend_pid()
SQL
binds = [[nil, self.class.table_name], [nil, send(self.class.primary_key)], [nil, self.class.table_name], [nil, send(self.class.primary_key)]]
binds = [[nil, key], [nil, key]]
self.class.connection.exec_query(pg_or_jdbc_query(query), 'GoodJob::Lockable Owns Advisory Lock?', binds).any?
end

# Releases all advisory locks on the record that are held by the current
# database session.
# @return [void]
def advisory_unlock!
advisory_unlock while advisory_locked?
def advisory_unlock!(key = lockable_key)
advisory_unlock(key) while advisory_locked?
end

def lockable_key
[self.class.table_name, self[self.class.lockable_column]].join
end

private
Expand Down
1 change: 1 addition & 0 deletions spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
FOR UPDATE SKIP LOCKED
)
ORDER BY "good_jobs"."priority" DESC
SQL
Expand Down

0 comments on commit c02d32f

Please sign in to comment.