From 68501a34fa9ca7ac94d2ba071468c1901471dc6b Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Mon, 16 Oct 2023 07:27:21 -0700 Subject: [PATCH] Remove `pg_advisory_unlock_all()` after job is run; only verify mutable `finished_at` is blank before performing job --- .../concerns/good_job/advisory_lockable.rb | 14 ++++++----- app/models/good_job/execution.rb | 7 ++++-- .../good_job/advisory_lockable_spec.rb | 9 +++++-- spec/app/models/good_job/execution_spec.rb | 11 ++++---- spec/integration/scheduler_spec.rb | 25 ++++++++++--------- spec/test_app/config/database.yml | 2 +- 6 files changed, 39 insertions(+), 29 deletions(-) diff --git a/app/models/concerns/good_job/advisory_lockable.rb b/app/models/concerns/good_job/advisory_lockable.rb index 4acf46c78..a6325f823 100644 --- a/app/models/concerns/good_job/advisory_lockable.rb +++ b/app/models/concerns/good_job/advisory_lockable.rb @@ -53,7 +53,7 @@ module AdvisoryLockable composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' '))) query = cte_table.project(cte_table[:id]) .with(composed_cte) - .where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }]))) + .where(Arel.sql("#{function}(('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)")) limit = original_query.arel.ast.limit query.limit = limit.value if limit.present? @@ -74,14 +74,12 @@ module AdvisoryLockable # @example Get the records that have a session awaiting a lock: # MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false) scope :joins_advisory_locks, (lambda do |column: _advisory_lockable_column| - join_sql = <<~SQL.squish + joins(<<~SQL.squish) LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory' AND pg_locks.objsubid = 1 - AND pg_locks.classid = ('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int - AND pg_locks.objid = (('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int + AND pg_locks.classid = ('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int + AND pg_locks.objid = (('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int SQL - - joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }])) end) # Joins the current query with Postgres's +pg_locks+ table AND SELECTs the resulting columns @@ -151,6 +149,10 @@ module AdvisoryLockable # can (as in {Lockable.advisory_lock}) and only pass those that could be # locked to the block. # + # If the Active Record Relation has WHERE conditions that have the potential + # to be updated/changed elsewhere, be sure to verify the conditions are still + # satisfied, or check the lock status, as an unlocked and out-of-date record could be returned. + # # @param column [String, Symbol] name of advisory lock or unlock function # @param function [String, Symbol] Postgres Advisory Lock function name to use # @param unlock_session [Boolean] Whether to unlock all advisory locks in the session afterwards diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 5dee9df5c..8b8d69af5 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -256,12 +256,13 @@ def self.enqueue_args(active_job, overrides = {}) def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) execution = nil result = nil - unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions| + unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions| execution = executions.first break if execution.blank? unless execution.executable? result = ExecutionResult.new(value: nil, unexecutable: true) + execution = nil break end @@ -491,7 +492,9 @@ def perform # Tests whether this job is safe to be executed by this thread. # @return [Boolean] def executable? - self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) + reload.finished_at.blank? + rescue ActiveRecord::RecordNotFound + false end def make_discrete diff --git a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb index d67bbeb7a..758996e6f 100644 --- a/spec/app/models/concerns/good_job/advisory_lockable_spec.rb +++ b/spec/app/models/concerns/good_job/advisory_lockable_spec.rb @@ -5,6 +5,7 @@ RSpec.describe GoodJob::AdvisoryLockable do let(:model_class) { GoodJob::Execution } let!(:execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default") } + let!(:another_execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default") } describe '.advisory_lock' do around do |example| @@ -88,17 +89,21 @@ end it 'returns first row of the query with a lock' do + execution.update!(queue_name: "aaaaaa") + another_execution.update!(queue_name: "bbbbbb") + expect(execution).not_to be_advisory_locked - result_execution = model_class.advisory_lock.first + result_execution = model_class.order(queue_name: :asc).limit(1).advisory_lock.first expect(result_execution).to eq execution expect(execution).to be_advisory_locked + expect(another_execution).not_to be_advisory_locked execution.advisory_unlock end it 'can lock an alternative column' do expect(execution).not_to be_advisory_locked - result_execution = model_class.advisory_lock(column: :queue_name).first + result_execution = model_class.order(created_at: :asc).limit(1).advisory_lock(column: :queue_name).first expect(result_execution).to eq execution expect(execution).to be_advisory_locked(key: "good_jobs-default") expect(execution).not_to be_advisory_locked # on default key diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index ba0f987eb..c05a31a9c 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -196,13 +196,12 @@ def job_params let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) } it "orders by queue order" do - 2.times do - described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) + described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution| + expect(execution).to eq queue_one_job + end + described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution| + expect(execution).to eq queue_two_job end - expect(described_class.order(finished_at: :asc).to_a).to eq([ - queue_one_job, - queue_two_job, - ]) end end end diff --git a/spec/integration/scheduler_spec.rb b/spec/integration/scheduler_spec.rb index a46bdd4fa..de68e92d3 100644 --- a/spec/integration/scheduler_spec.rb +++ b/spec/integration/scheduler_spec.rb @@ -34,6 +34,8 @@ def perform(*_args, **_kwargs) PgLock.advisory_lock.owns.each do |pg_lock| puts " - #{pg_lock.attributes.to_json}" end + elsif locks_count < expected_locks_per_thread + puts "Doesn't own a lock for #{active_job_id}" end RUN_JOBS << [provider_job_id, job_id, thread_name] @@ -61,23 +63,22 @@ def perform(*args, **kwargs) end context 'when there are a large number of jobs' do - let(:number_of_jobs) { 500 } - let(:max_threads) { 5 } + let(:number_of_jobs) { 1_000 } + let(:max_threads) { 10 } it 'pops items off of the queue and runs them' do expect(ActiveJob::Base.queue_adapter).to be_execute_externally - GoodJob::Execution.transaction do - number_of_jobs.times do |i| - TestJob.perform_later(i) - end + GoodJob::Job.logger.silence do + jobs = Array.new(number_of_jobs) { |i| TestJob.new(i) } + TestJob.queue_adapter.enqueue_all(jobs) end performer = GoodJob::JobPerformer.new('*') scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads) max_threads.times { scheduler.create_thread } - sleep_until(max: 30, increments_of: 0.5) { GoodJob::Execution.unfinished.count.zero? } + wait_until(max: 60, increments_of: 0.5) { expect(GoodJob::Execution.unfinished.count).to be_zero } scheduler.shutdown expect(GoodJob::Execution.unfinished.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Execution.unfinished.map(&:id)}" } @@ -89,8 +90,9 @@ def perform(*args, **kwargs) rerun_provider_job_ids = jobs_tally.select { |_key, value| value > 1 }.keys rerun_jobs = RUN_JOBS.select { |(provider_job_id, _job_id, _thread_name)| rerun_provider_job_ids.include? provider_job_id } - "Expected run jobs(#{RUN_JOBS.size}) to equal number of jobs (#{number_of_jobs}). Instead ran jobs multiple times:\n#{PP.pp(rerun_jobs, '')}" + "Expected run jobs(#{RUN_JOBS.size}) to equal number of jobs (#{number_of_jobs}). Instead ran jobs multiple times:\n#{rerun_jobs.join("\n")}" } + expect(GoodJob::DiscreteExecution.count).to eq number_of_jobs end end @@ -101,10 +103,9 @@ def perform(*args, **kwargs) it 'executes all jobs' do expect(ActiveJob::Base.queue_adapter).to be_execute_externally - GoodJob::Execution.transaction do - number_of_jobs.times do |i| - TestJob.perform_later(i) - end + GoodJob::Job.logger.silence do + jobs = Array.new(number_of_jobs) { |i| TestJob.new(i) } + TestJob.queue_adapter.enqueue_all(jobs) end performer = GoodJob::JobPerformer.new('*') diff --git a/spec/test_app/config/database.yml b/spec/test_app/config/database.yml index 9e1d914b2..3fba25c7e 100644 --- a/spec/test_app/config/database.yml +++ b/spec/test_app/config/database.yml @@ -20,7 +20,7 @@ default: &default host: localhost # For details on connection pooling, see Rails configuration guide # https://guides.rubyonrails.org/configuring.html#database-pooling - pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 10 } %> + pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 20 } %> development: <<: *default