Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate a new future for every executed job #20

Merged
merged 1 commit into from Mar 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 3 additions & 7 deletions lib/good_job/lockable.rb
Expand Up @@ -15,18 +15,14 @@ module Lockable
end)

scope :advisory_unlocked, -> { joins_advisory_locks.where(pg_locks: { locktype: nil }) }
scope :with_advisory_lock, (lambda do
where(<<~SQL)
pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
SQL
end)

def self.first_advisory_locked_row(query)
find_by_sql(<<~SQL)
find_by_sql(<<~SQL).first
WITH rows AS (#{query.to_sql})
SELECT rows.id
SELECT rows.*
FROM rows
WHERE pg_try_advisory_lock(('x'||substr(md5(id::text), 1, 16))::bit(64)::bigint)
LIMIT 1
SQL
end
# private_class_method :first_advisory_locked_row
Expand Down
41 changes: 18 additions & 23 deletions lib/good_job/scheduler.rb
@@ -1,5 +1,5 @@
require "concurrent/scheduled_task"
require "concurrent/executor/thread_pool_executor"
require "concurrent/timer_task"
require "concurrent/utility/processor_counter"

module GoodJob
Expand Down Expand Up @@ -28,7 +28,7 @@ def initialize(query = GoodJob::Job.all, timer_options: {}, pool_options: {})
idle_threads = @pool.max_length - @pool.length
create_thread if idle_threads.positive?
end
@timer.add_observer(TimerObserver.new)
@timer.add_observer(self, :timer_observer)
@timer.execute
end

Expand Down Expand Up @@ -62,36 +62,31 @@ def shutdown?

def create_thread
future = Concurrent::Future.new(args: [ordered_query], executor: @pool) do |query|
loop do
executed_job = false
executed_job = false

Rails.application.executor.wrap do
good_job = query.with_advisory_lock.first
break unless good_job
Rails.application.executor.wrap do
good_job = GoodJob::Job.first_advisory_locked_row(query)
break unless good_job

executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
JobWrapper.new(good_job).perform
good_job.advisory_unlock
end

break unless executed_job
executed_job = true
ActiveSupport::Notifications.instrument("job_started.good_job", { good_job: good_job })
JobWrapper.new(good_job).perform
good_job.advisory_unlock
end

executed_job
end
future.add_observer(TaskObserver.new)
future.add_observer(self, :task_observer)
future.execute
end

class TimerObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("timer_task_finished.good_job", { result: result, error: error, time: time })
end
def timer_observer(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end

class TaskObserver
def update(time, result, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: result, error: error, time: time })
end
def task_observer(time, executed_job, error)
ActiveSupport::Notifications.instrument("job_finished.good_job", { result: executed_job, error: error, time: time })
create_thread if executed_job
end
end
end