Skip to content

Commit

Permalink
Remove pg_advisory_unlock_all() after job is run; only verify mutab…
Browse files Browse the repository at this point in the history
…le `finished_at` is blank before performing job
  • Loading branch information
bensheldon committed Oct 18, 2023
1 parent eb65463 commit 68501a3
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 29 deletions.
14 changes: 8 additions & 6 deletions app/models/concerns/good_job/advisory_lockable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ module AdvisoryLockable
composed_cte = Arel::Nodes::As.new(cte_table, Arel::Nodes::SqlLiteral.new([cte_type, "(", cte_query.to_sql, ")"].join(' ')))
query = cte_table.project(cte_table[:id])
.with(composed_cte)
.where(Arel.sql(sanitize_sql_for_conditions(["#{function}(('x' || substr(md5(:table_name || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)", { table_name: table_name }])))
.where(Arel.sql("#{function}(('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{connection.quote_table_name(cte_table.name)}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64)::bigint)"))

limit = original_query.arel.ast.limit
query.limit = limit.value if limit.present?
Expand All @@ -74,14 +74,12 @@ module AdvisoryLockable
# @example Get the records that have a session awaiting a lock:
# MyLockableRecord.joins_advisory_locks.where("pg_locks.granted = ?", false)
scope :joins_advisory_locks, (lambda do |column: _advisory_lockable_column|
join_sql = <<~SQL.squish
joins(<<~SQL.squish)
LEFT JOIN pg_locks ON pg_locks.locktype = 'advisory'
AND pg_locks.objsubid = 1
AND pg_locks.classid = ('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(:table_name || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
AND pg_locks.classid = ('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(32)::int
AND pg_locks.objid = (('x' || substr(md5(#{connection.quote(table_name)} || '-' || #{quoted_table_name}.#{connection.quote_column_name(column)}::text), 1, 16))::bit(64) << 32)::bit(32)::int
SQL

joins(sanitize_sql_for_conditions([join_sql, { table_name: table_name }]))
end)

# Joins the current query with Postgres's +pg_locks+ table AND SELECTs the resulting columns
Expand Down Expand Up @@ -151,6 +149,10 @@ module AdvisoryLockable
# can (as in {Lockable.advisory_lock}) and only pass those that could be
# locked to the block.
#
# If the Active Record Relation has WHERE conditions that have the potential
# to be updated/changed elsewhere, be sure to verify the conditions are still
# satisfied, or check the lock status, as an unlocked and out-of-date record could be returned.
#
# @param column [String, Symbol] name of advisory lock or unlock function
# @param function [String, Symbol] Postgres Advisory Lock function name to use
# @param unlock_session [Boolean] Whether to unlock all advisory locks in the session afterwards
Expand Down
7 changes: 5 additions & 2 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -256,12 +256,13 @@ def self.enqueue_args(active_job, overrides = {})
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
execution = nil
result = nil
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions|
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions|
execution = executions.first
break if execution.blank?

unless execution.executable?
result = ExecutionResult.new(value: nil, unexecutable: true)
execution = nil
break
end

Expand Down Expand Up @@ -491,7 +492,9 @@ def perform
# Tests whether this job is safe to be executed by this thread.
# @return [Boolean]
def executable?
self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
reload.finished_at.blank?
rescue ActiveRecord::RecordNotFound
false
end

def make_discrete
Expand Down
9 changes: 7 additions & 2 deletions spec/app/models/concerns/good_job/advisory_lockable_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
RSpec.describe GoodJob::AdvisoryLockable do
let(:model_class) { GoodJob::Execution }
let!(:execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default") }
let!(:another_execution) { model_class.create(active_job_id: SecureRandom.uuid, queue_name: "default") }

describe '.advisory_lock' do
around do |example|
Expand Down Expand Up @@ -88,17 +89,21 @@
end

it 'returns first row of the query with a lock' do
execution.update!(queue_name: "aaaaaa")
another_execution.update!(queue_name: "bbbbbb")

expect(execution).not_to be_advisory_locked
result_execution = model_class.advisory_lock.first
result_execution = model_class.order(queue_name: :asc).limit(1).advisory_lock.first
expect(result_execution).to eq execution
expect(execution).to be_advisory_locked
expect(another_execution).not_to be_advisory_locked

execution.advisory_unlock
end

it 'can lock an alternative column' do
expect(execution).not_to be_advisory_locked
result_execution = model_class.advisory_lock(column: :queue_name).first
result_execution = model_class.order(created_at: :asc).limit(1).advisory_lock(column: :queue_name).first
expect(result_execution).to eq execution
expect(execution).to be_advisory_locked(key: "good_jobs-default")
expect(execution).not_to be_advisory_locked # on default key
Expand Down
11 changes: 5 additions & 6 deletions spec/app/models/good_job/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,12 @@ def job_params
let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) }

it "orders by queue order" do
2.times do
described_class.perform_with_advisory_lock(parsed_queues: parsed_queues)
described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution|
expect(execution).to eq queue_one_job
end
described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution|
expect(execution).to eq queue_two_job
end
expect(described_class.order(finished_at: :asc).to_a).to eq([
queue_one_job,
queue_two_job,
])
end
end
end
Expand Down
25 changes: 13 additions & 12 deletions spec/integration/scheduler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def perform(*_args, **_kwargs)
PgLock.advisory_lock.owns.each do |pg_lock|
puts " - #{pg_lock.attributes.to_json}"
end
elsif locks_count < expected_locks_per_thread
puts "Doesn't own a lock for #{active_job_id}"
end

RUN_JOBS << [provider_job_id, job_id, thread_name]
Expand Down Expand Up @@ -61,23 +63,22 @@ def perform(*args, **kwargs)
end

context 'when there are a large number of jobs' do
let(:number_of_jobs) { 500 }
let(:max_threads) { 5 }
let(:number_of_jobs) { 1_000 }
let(:max_threads) { 10 }

it 'pops items off of the queue and runs them' do
expect(ActiveJob::Base.queue_adapter).to be_execute_externally

GoodJob::Execution.transaction do
number_of_jobs.times do |i|
TestJob.perform_later(i)
end
GoodJob::Job.logger.silence do
jobs = Array.new(number_of_jobs) { |i| TestJob.new(i) }
TestJob.queue_adapter.enqueue_all(jobs)
end

performer = GoodJob::JobPerformer.new('*')
scheduler = GoodJob::Scheduler.new(performer, max_threads: max_threads)
max_threads.times { scheduler.create_thread }

sleep_until(max: 30, increments_of: 0.5) { GoodJob::Execution.unfinished.count.zero? }
wait_until(max: 60, increments_of: 0.5) { expect(GoodJob::Execution.unfinished.count).to be_zero }
scheduler.shutdown

expect(GoodJob::Execution.unfinished.count).to eq(0), -> { "Unworked jobs are #{GoodJob::Execution.unfinished.map(&:id)}" }
Expand All @@ -89,8 +90,9 @@ def perform(*args, **kwargs)
rerun_provider_job_ids = jobs_tally.select { |_key, value| value > 1 }.keys
rerun_jobs = RUN_JOBS.select { |(provider_job_id, _job_id, _thread_name)| rerun_provider_job_ids.include? provider_job_id }

"Expected run jobs(#{RUN_JOBS.size}) to equal number of jobs (#{number_of_jobs}). Instead ran jobs multiple times:\n#{PP.pp(rerun_jobs, '')}"
"Expected run jobs(#{RUN_JOBS.size}) to equal number of jobs (#{number_of_jobs}). Instead ran jobs multiple times:\n#{rerun_jobs.join("\n")}"
}
expect(GoodJob::DiscreteExecution.count).to eq number_of_jobs
end
end

Expand All @@ -101,10 +103,9 @@ def perform(*args, **kwargs)
it 'executes all jobs' do
expect(ActiveJob::Base.queue_adapter).to be_execute_externally

GoodJob::Execution.transaction do
number_of_jobs.times do |i|
TestJob.perform_later(i)
end
GoodJob::Job.logger.silence do
jobs = Array.new(number_of_jobs) { |i| TestJob.new(i) }
TestJob.queue_adapter.enqueue_all(jobs)
end

performer = GoodJob::JobPerformer.new('*')
Expand Down
2 changes: 1 addition & 1 deletion spec/test_app/config/database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ default: &default
host: localhost
# For details on connection pooling, see Rails configuration guide
# https://guides.rubyonrails.org/configuring.html#database-pooling
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 10 } %>
pool: <%= ENV.fetch("RAILS_MAX_THREADS") { 20 } %>

development:
<<: *default
Expand Down

0 comments on commit 68501a3

Please sign in to comment.