Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use FOR UPDATE SKIP LOCKED alongside advisory lock for more reliable locking #274

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 1 addition & 10 deletions lib/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,7 @@ def self.queue_parser(string)
def self.perform_with_advisory_lock
unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs|
good_job = good_jobs.first
# TODO: Determine why some records are fetched without an advisory lock at all
break unless good_job&.executable?

good_job.perform
good_job&.perform
end
end

Expand Down Expand Up @@ -241,12 +238,6 @@ def perform
result
end

# Tests whether this job is safe to be executed by this thread.
# @return [Boolean]
def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
end

private

# @return [ExecutionResult]
Expand Down
5 changes: 5 additions & 0 deletions lib/good_job/lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ module Lockable

composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))

# In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED
# because this causes the query to skip jobs that were completed (and deleted)
# by another session in the time since the table snapshot was taken.
# In rare cases under high concurrency levels, leaving this out can result in double executions.
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.lock(Arel.sql("FOR UPDATE SKIP LOCKED"))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand Down
11 changes: 7 additions & 4 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,22 @@ def perform(*args, **kwargs)
end), transfer_nested_constants: true
end

let(:adapter) { GoodJob::Adapter.new }
let(:adapter) { GoodJob::Adapter.new(execution_mode: :external) }

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 500 }
let(:number_of_jobs) { 1_000 }
let(:max_threads) { 5 }

let!(:good_jobs) do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
GoodJob::Job.transaction do
number_of_jobs.times do |i|
ExampleJob.perform_later(i)
end
end
end

it 'pops items off of the queue and runs them' do
puts "TEST STARTS"
performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
max_threads.times { scheduler.create_thread }
Expand Down
24 changes: 0 additions & 24 deletions spec/lib/good_job/job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,30 +177,6 @@ def perform(result_value = nil, raise_error: false)
end
end

describe '#executable?' do
let(:good_job) { described_class.create! }

it 'is true when locked' do
good_job.with_advisory_lock do
expect(good_job.executable?).to eq true
end
end

it 'is false when job no longer exists' do
good_job.with_advisory_lock do
good_job.destroy!
expect(good_job.executable?).to eq false
end
end

it 'is false when the job has finished' do
good_job.with_advisory_lock do
good_job.update! finished_at: Time.current
expect(good_job.executable?).to eq false
end
end
end

describe '#perform' do
let(:active_job) { ExampleJob.new("a string") }
let!(:good_job) { described_class.enqueue(active_job) }
Expand Down
1 change: 1 addition & 0 deletions spec/lib/good_job/lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
FROM "rows"
WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint)
LIMIT 2
FOR UPDATE SKIP LOCKED
)
ORDER BY "good_jobs"."priority" DESC
SQL
Expand Down