-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a new adapter to work the job with multiple database #103
base: master
Are you sure you want to change the base?
Conversation
d92a84a
to
59aab33
Compare
5c6353d
to
f29a8b8
Compare
e1f6f21
to
31bf0fe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done a first pass of the implementation, looking sensible. Suggested some improvements
lib/que/adapters/yugabyte.rb
Outdated
|
||
if locked?(result.first['job_id']) | ||
return result | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idea I had. We can do something like this to make this a bit more efficient, basically we use the row itself as a temporary lock to prevent a lot of race conditions. This way, a lot of the work is done within the main database itself.
Let me know what you think.
loop do
job_connection.transaction do
# This locks the row only while we are checking the advisory lock database.
# This saves duplicate attempts by different works to acquire the lock.
result = Que.execute("SELECT ... FOR UPDATE SKIPPED LOCK")
return result if result.empty?
if locked?(result.first['job_id'])
return result
end
# lock is released after the advisory lock check as the transaction completes
end
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will have cross database transaction in this case. If that's not an issue then this would be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we will have cross database transaction in this case
Cross database transactions are mainly a problem when you're trying to maintain consistency of writes between two databases (eg. if one commits but the other rolls back).
I don't think this is a problem here, so should be safe to ignore: technically acquiring the a lock is a write, but the SELECT FOR UPDATE
is temporary and not the source of truth, and the lock is released at the end of the transaction anyway.
d872b0e
to
6b3bac9
Compare
c5bd47a
to
30ee504
Compare
30ee504
to
adbbcb6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! Just some minor stylistic stuff and I think it's good to go
|
||
def unlock_job(job_id) | ||
lock_database_connection.execute("SELECT pg_advisory_unlock(#{job_id})") | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we don't need any error handling in this class is because the locker will release the lock on failure, is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. Also I have also added additional comments on this on what happens when the connection goes bad and someother connection tries to release the lock. In this case it will just no-op and return false as the connection would be already released when the previous connection goes bad
module Que | ||
module Adapters | ||
class ActiveRecordWithLock < Que::Adapters::ActiveRecord | ||
attr_accessor :job_connection_pool, :lock_record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we are using the attr_accessor anywhere, can remove the line in that case
result = Que.execute(:find_job_to_lock, [queue, cursor]) | ||
break if result.empty? | ||
cursor = result.first['job_id'] | ||
if pg_try_advisory_lock?(cursor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if pg_try_advisory_lock?(cursor) | |
break if pg_try_advisory_lock?(cursor) |
@@ -60,7 +60,6 @@ def initialize(queue:, cursor_expiry:, window: nil, budget: nil, secondary_queue | |||
@queue_expires_at = {} | |||
@secondary_queues = secondary_queues | |||
@consolidated_queues = Array.wrap(queue).concat(secondary_queues) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm seeing a bunch of diffs that are just changing whitespace/formatting. Should we just get rubocop to standardise all of this?
spec/lib/que/locker_spec.rb
Outdated
@@ -117,7 +122,7 @@ def expect_to_lock_with(cursor:) | |||
expect_to_lock_with(cursor: job_1[:job_id]) | |||
expect_to_work(job_2) | |||
|
|||
@epoch += cursor_expiry # our cursor should now expire | |||
@epoch += (cursor_expiry) # our cursor should now expire |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the brackets are needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will remove all these. I was trying out something for flaky tests.
9fb3896
to
4623fab
Compare
What?
Added a new adapter which uses 2 databases to work the job.
database 1 will have the job enqueued and will fetch the jobs to be worked
database 2 will be used to acquire the advisory lock on the job.