From 91263436670fbe8abe31fa1ffc53833cdd724b15 Mon Sep 17 00:00:00 2001 From: Ben Sheldon Date: Wed, 23 Jun 2021 14:16:07 -0700 Subject: [PATCH] Use `FOR UPDATE SKIP LOCKED` alongside advisory lock for more reliable locking Found implementation in rihanna that is similar: https://github.com/samsondav/rihanna/blob/1840d2226fd2bcb77635f995f431b26e8f2dfc69/lib/rihanna/job.ex#L364-L367 --- lib/good_job/job.rb | 11 +---------- lib/good_job/lockable.rb | 5 +++++ spec/integration/scheduler_spec.rb | 11 +++++++---- spec/lib/good_job/job_spec.rb | 24 ------------------------ spec/lib/good_job/lockable_spec.rb | 1 + 5 files changed, 14 insertions(+), 38 deletions(-) diff --git a/lib/good_job/job.rb b/lib/good_job/job.rb index 506389c37..fdc28a6a2 100644 --- a/lib/good_job/job.rb +++ b/lib/good_job/job.rb @@ -158,10 +158,7 @@ def self.queue_parser(string) def self.perform_with_advisory_lock unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |good_jobs| good_job = good_jobs.first - # TODO: Determine why some records are fetched without an advisory lock at all - break unless good_job&.executable? - - good_job.perform + good_job&.perform end end @@ -241,12 +238,6 @@ def perform result end - # Tests whether this job is safe to be executed by this thread. - # @return [Boolean] - def executable? - self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) - end - private # @return [ExecutionResult] diff --git a/lib/good_job/lockable.rb b/lib/good_job/lockable.rb index b81670a5f..8ecdccebf 100644 --- a/lib/good_job/lockable.rb +++ b/lib/good_job/lockable.rb @@ -41,9 +41,14 @@ module Lockable composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) + # In addition to an advisory lock, there is also a FOR UPDATE SKIP LOCKED + # because this causes the query to skip jobs that were completed (and deleted) + # by another session in the time since the table snapshot was taken. + # In rare cases under high concurrency levels, leaving this out can result in double executions. query = cte_table.project(cte_table[:id]) .with(composed_cte) .where(Arel.sql(sanitize_sql_for_conditions(["pg_try_advisory_lock(('x' || substr(md5(:table_name || #{connection.quote_table_name(cte_table.name)}.#{quoted_primary_key}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .lock(Arel.sql("FOR UPDATE SKIP LOCKED")) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index 59f55b2d2..87cb1cc66 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -38,19 +38,22 @@ def perform(*args, **kwargs) end), transfer_nested_constants: true end - let(:adapter) { GoodJob::Adapter.new } + let(:adapter) { GoodJob::Adapter.new(execution_mode: :external) } context 'when there are a large number of jobs' do - let(:number_of_jobs) { 500 } + let(:number_of_jobs) { 1_000 } let(:max_threads) { 5 } let!(:good_jobs) do - number_of_jobs.times do |i| - ExampleJob.perform_later(i) + GoodJob::Job.transaction do + number_of_jobs.times do |i| + ExampleJob.perform_later(i) + end end end it 'pops items off of the queue and runs them' do + puts "TEST STARTS" performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads) max_threads.times { scheduler.create_thread } diff --git a/spec/lib/good_job/job_spec.rb b/spec/lib/good_job/job_spec.rb index 4e4e4b831..c4f9ee72b 100644 --- a/spec/lib/good_job/job_spec.rb +++ b/spec/lib/good_job/job_spec.rb @@ -177,30 +177,6 @@ def perform(result_value = nil, raise_error: false) end end - describe '#executable?' do - let(:good_job) { described_class.create! } - - it 'is true when locked' do - good_job.with_advisory_lock do - expect(good_job.executable?).to eq true - end - end - - it 'is false when job no longer exists' do - good_job.with_advisory_lock do - good_job.destroy! - expect(good_job.executable?).to eq false - end - end - - it 'is false when the job has finished' do - good_job.with_advisory_lock do - good_job.update! finished_at: Time.current - expect(good_job.executable?).to eq false - end - end - end - describe '#perform' do let(:active_job) { ExampleJob.new("a string") } let!(:good_job) { described_class.enqueue(active_job) } diff --git a/spec/lib/good_job/lockable_spec.rb b/spec/lib/good_job/lockable_spec.rb index 78ac9492d..8e88f29fe 100644 --- a/spec/lib/good_job/lockable_spec.rb +++ b/spec/lib/good_job/lockable_spec.rb @@ -35,6 +35,7 @@ FROM "rows" WHERE pg_try_advisory_lock(('x' || substr(md5('good_jobs' || "rows"."id"::text), 1, 16))::bit(64)::bigint) LIMIT 2 + FOR UPDATE SKIP LOCKED ) ORDER BY "good_jobs"."priority" DESC SQL