Skip to content

Commit

Permalink
remove connection pool for lock connection
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jul 2, 2024
1 parent e302210 commit 6b3bac9
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 186 deletions.
2 changes: 1 addition & 1 deletion lib/que.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def connection=(connection)

Adapters::ActiveRecordWithLock.new(
job_connection_pool: connection.job_connection_pool,
lock_connection_pool: connection.lock_connection_pool
lock_connection: connection.lock_connection
)
when "Sequel::Postgres::Database" then Adapters::Sequel.new(connection)
when "ConnectionPool" then Adapters::ConnectionPool.new(connection)
Expand Down
21 changes: 5 additions & 16 deletions lib/que/adapters/active_record_with_lock.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,19 @@
module Que
module Adapters
class ActiveRecordWithLock < Que::Adapters::ActiveRecord
attr_accessor :job_connection_pool, :lock_connection_pool
def initialize(job_connection_pool:, lock_connection_pool:)
attr_accessor :job_connection_pool, :lock_connection
def initialize(job_connection_pool:, lock_connection:)
@job_connection_pool = job_connection_pool
@lock_connection_pool = lock_connection_pool
@lock_connection = lock_connection
super
end

def checkout_activerecord_adapter(&block)
@job_connection_pool.with_connection(&block)
end

def checkout_lock_database_connection
# when multiple threads are running we need to make sure
# the acquiring and releasing of advisory locks is done by the
# same connection
Thread.current[:db_connection] ||= lock_connection_pool.checkout
end

def lock_database_connection
Thread.current[:db_connection]
end

def release_lock_database_connection
@lock_connection_pool.checkin(Thread.current[:db_connection])
Thread.current[:db_connection] ||= @lock_connection.connection
end

def execute(command, params=[])
Expand Down Expand Up @@ -58,7 +47,7 @@ def lock_job_with_lock_database(queue, cursor)

def cleanup!
@job_connection_pool.release_connection
@lock_connection_pool.release_connection
@lock_connection.release_connection
end

def pg_try_advisory_lock?(job_id)
Expand Down
9 changes: 3 additions & 6 deletions lib/que/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ def initialize(
cursor_expiry: lock_cursor_expiry,
window: lock_window,
budget: lock_budget,
secondary_queues: secondary_queues
secondary_queues: secondary_queues,
)
end

attr_reader :metrics

def work_loop
return if @stop
Que.adapter.checkout_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock

@tracer.trace(RunningSecondsTotal, queue: @queue, primary_queue: @queue) do
loop do
case event = work
Expand All @@ -160,10 +160,7 @@ def work_loop
nil # immediately find a new job to work
end

if @stop
Que.adapter.release_lock_database_connection if Que.adapter.class == Que::Adapters::ActiveRecordWithLock
break
end
break if @stop
end
end
ensure
Expand Down
147 changes: 3 additions & 144 deletions spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def expect_to_lock_with(cursor:)
let(:cursor_expiry) { 5 }

before do
allow(locker).to receive(:monotonic_now) { @epoch }
# we need this to avoid flakiness during resetting the cursor
locker.instance_variable_get(:@queue_expires_at)[queue] = Process.clock_gettime(Process::CLOCK_MONOTONIC) + cursor_expiry
allow(locker).to receive(:monotonic_now) { @epoch }
end

# This test simulates the repeated locking of jobs. We're trying to prove that
Expand Down Expand Up @@ -130,147 +132,4 @@ def expect_to_lock_with(cursor:)
# rubocop:enable RSpec/InstanceVariable
end
end

xcontext "when using lock database" do
subject(:locker) do
described_class.new(
queue: queue,
cursor_expiry: cursor_expiry,
)
end

let(:sql) do
QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql
end
let(:now) {Time.now}

before do
allow(Time).to receive(:now).and_return(now)
end

describe ".with_locked_job" do
before { allow(Que).to receive(:execute).and_call_original }

# Helper to call the with_locked_job method but ensure our block has actually been
# called. Without this, it's possible that we'd never run expectations in our block.
def with_locked_job
block_called = false
locker.with_locked_job do |job|
yield(job)
block_called = true
end

raise "did not call job block" unless block_called
end

# Simulates actual working of a job, which is useful to these tests to free up another
# job for locking.
def expect_to_work(job)
with_locked_job do |actual_job|
expect(actual_job[:job_id]).to eql(job[:job_id])
expect(locker.instance_variable_get(:@lock_database_connection)).to receive(:execute).
with("SELECT pg_advisory_unlock(#{job[:job_id]})")

# Destroy the job to simulate the behaviour of the queue, and allow our lock query
# to discover new jobs.
QueJob.find(job[:job_id]).destroy!
end
end
def query(cursor = 0)
QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql
end
# Our tests are very concerned with which cursor we use and when
def expect_to_lock_with(cursor:, locked_job: nil)
expect(Que).to receive(:execute).with(query(cursor)).and_call_original

if locked_job
expect(locker.instance_variable_get(:@lock_database_connection)).to receive(:execute).
with("SELECT pg_try_advisory_lock(#{locked_job[:job_id]})").and_call_original
end
end

context "with no jobs to lock" do
it "scans entire table and calls block with nil job" do
expect(Que).to receive(:execute).with(query).and_call_original

with_locked_job do |job|
expect(job).to be_nil
end
end
end

context "with just one job to lock" do
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs }
let(:cursor_expiry) { 60 }

# Pretend time isn't moving, as we don't want to test cursor expiry here
before { allow(Process).to receive(:clock_gettime).and_return(0) }

# We want our workers to start from the front of the queue immediately after finding
# no jobs are available to work.
it "will use a cursor until no jobs are found" do
expect_to_lock_with(cursor: 0, locked_job: job_1)
expect_to_work(job_1)

expect_to_lock_with(cursor: job_1[:job_id])
with_locked_job {}

expect_to_lock_with(cursor: 0)
with_locked_job {}
end
end

context "with jobs to lock" do
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs }
let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2, run_at: now - 1).attrs }
let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3, run_at: now - 1).attrs }

it "locks and then unlocks the most important job" do
expect_to_lock_with(cursor: 0, locked_job: job_1)
expect_to_work(job_1)
end

# rubocop:disable RSpec/SubjectStub
# rubocop:disable RSpec/InstanceVariable
context "on subsequent locks" do
context "with non-zero cursor expiry" do
let(:cursor_expiry) { 5 }

before { allow(locker).to receive(:monotonic_now) { @epoch } }

# This test simulates the repeated locking of jobs. We're trying to prove that
# the locker will use the previous jobs ID as a cursor until the expiry has
# elapsed, after which we'll reset.
#
# We do this by expecting on the calls to lock_job, specifically the second
# parameter which controls the job_id cursor value.
it "continues lock from previous job id, until cursor expires" do
@epoch = Process.clock_gettime(Process::CLOCK_MONOTONIC)
expect_to_lock_with(cursor: 0, locked_job: job_1)
expect_to_work(job_1)

@epoch += 2
expect_to_lock_with(cursor: job_1[:job_id], locked_job: job_2)
expect_to_work(job_2)

@epoch += cursor_expiry # our cursor should now expire
expect_to_lock_with(cursor: 0, locked_job: job_3)
expect_to_work(job_3)
end
end
end
# rubocop:enable RSpec/SubjectStub
# rubocop:enable RSpec/InstanceVariable
end
end
end
end
20 changes: 1 addition & 19 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class YugabyteRecord < ActiveRecord::Base
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
Que.connection = Que::Adapters::ActiveRecordWithLock.new(
job_connection_pool: YugabyteRecord.connection_pool,
lock_connection_pool: LockDatabaseRecord.connection_pool,
lock_connection: LockDatabaseRecord,
)
else
Que.connection = ActiveRecord
Expand All @@ -70,24 +70,6 @@ class YugabyteRecord < ActiveRecord::Base


RSpec.configure do |config|
# config.before(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = Que::Adapters::Yugabyte
# end

# config.after(:each, :with_yugabyte_adapter) do
# Que.adapter.cleanup!
# Que.connection = ActiveRecord
# end
if ENV['YUGABYTE_QUE_WORKER_ENABLED']
config.before(:all) do
Que.adapter.checkout_lock_database_connection
end
config.after(:all) do
LockDatabaseRecord.connection_pool.disconnect!
end
end

config.before do
QueJob.delete_all
FakeJob.log = []
Expand Down

0 comments on commit 6b3bac9

Please sign in to comment.