From dea2385b84628ab1f22d9d5f784400e63a074ee4 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Wed, 5 Jul 2023 18:12:36 -0700 Subject: [PATCH] Ensure Process records always exist when executing jobs and are refreshed in background --- app/models/good_job/base_execution.rb | 7 + app/models/good_job/execution.rb | 30 +++- app/models/good_job/process.rb | 137 +++++++++------- .../migrations/create_good_jobs.rb.erb | 10 +- ...07_create_good_job_process_lock_ids.rb.erb | 17 ++ ...reate_good_job_process_lock_indexes.rb.erb | 37 +++++ lib/good_job.rb | 3 +- lib/good_job/adapter.rb | 10 +- lib/good_job/capsule.rb | 32 +++- lib/good_job/job_performer.rb | 5 +- lib/good_job/notifier.rb | 5 +- lib/good_job/notifier/process_heartbeat.rb | 6 +- lib/good_job/process_tracker.rb | 154 ++++++++++++++++++ lib/good_job/scheduler.rb | 4 +- sorbet/rbi/todo.rbi | 1 + spec/app/models/good_job/process_spec.rb | 57 ++----- .../good_job/update_generator_spec.rb | 2 +- spec/integration/adapter_spec.rb | 3 - spec/integration/process_locks_spec.rb | 46 ++++++ spec/lib/good_job/notifier_spec.rb | 16 +- spec/lib/good_job/process_tracker_spec.rb | 147 +++++++++++++++++ spec/lib/good_job_spec.rb | 9 +- spec/support/postgres_notices.rb | 1 + spec/support/reset_good_job.rb | 7 + spec/support/uuid.rb | 14 ++ ...201336_create_good_job_process_lock_ids.rb | 17 ++ ...37_create_good_job_process_lock_indexes.rb | 38 +++++ spec/test_app/db/schema.rb | 9 +- 28 files changed, 663 insertions(+), 161 deletions(-) create mode 100644 lib/generators/good_job/templates/update/migrations/07_create_good_job_process_lock_ids.rb.erb create mode 100644 lib/generators/good_job/templates/update/migrations/08_create_good_job_process_lock_indexes.rb.erb create mode 100644 lib/good_job/process_tracker.rb create mode 100644 spec/integration/process_locks_spec.rb create mode 100644 spec/lib/good_job/process_tracker_spec.rb create mode 100644 spec/support/uuid.rb create mode 100644 spec/test_app/db/migrate/20230704201336_create_good_job_process_lock_ids.rb create mode 100644 spec/test_app/db/migrate/20230704201337_create_good_job_process_lock_indexes.rb diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index f43142fd2..b70399d50 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -49,6 +49,13 @@ def error_event_migrated? migration_pending_warning! false end + + def process_lock_migrated? + return true if columns_hash["locked_by_id"].present? + + migration_pending_warning! + false + end end # The ActiveJob job class, as a string diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 5dee9df5c..2887c6e72 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -253,7 +253,7 @@ def self.enqueue_args(active_job, overrides = {}) # return value for the job's +#perform+ method, and the exception the job # raised, if any (if the job raised, then the second array entry will be # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) + def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil, capsule: GoodJob.capsule) execution = nil result = nil unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions| @@ -266,7 +266,9 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) end yield(execution) if block_given? - result = execution.perform + capsule.process_tracker.register do + result = execution.perform(id_for_lock: capsule.process_tracker.id_for_lock) + end end execution&.run_callbacks(:perform_unlocked) @@ -356,7 +358,7 @@ def self.format_error(error) # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. - def perform + def perform(id_for_lock: nil) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at @@ -385,17 +387,23 @@ def perform if discrete? transaction do now = Time.current - discrete_execution = discrete_executions.create!( + discrete_execution = discrete_executions.create!({ job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), - created_at: now - ) + created_at: now, + }.tap do |args| + args[:process_id] = id_for_lock if self.class.process_lock_migrated? + end) + + assign_attributes(locked_by_id: id_for_lock, locked_at: now) if self.class.process_lock_migrated? update!(performed_at: now, executions_count: ((executions_count || 0) + 1)) end else - update!(performed_at: Time.current) + now = Time.current + assign_attributes(locked_by_id: id_for_lock, locked_at: now) if self.class.process_lock_migrated? + update!(performed_at: now) end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| @@ -439,7 +447,6 @@ def perform end job_error = result.handled_error || result.unhandled_error - if job_error error_string = self.class.format_error(job_error) self.error = error_string @@ -452,8 +459,13 @@ def perform self.error = nil self.error_event = nil if self.class.error_event_migrated? end - reenqueued = result.retried? || retried_good_job_id.present? + + if self.class.process_lock_migrated? + self.locked_by_id = nil + self.locked_at = nil + end + if result.unhandled_error && GoodJob.retry_on_unhandled_error if discrete_execution transaction do diff --git a/app/models/good_job/process.rb b/app/models/good_job/process.rb index 0d02238be..7bca349c6 100644 --- a/app/models/good_job/process.rb +++ b/app/models/good_job/process.rb @@ -13,47 +13,71 @@ class Process < BaseRecord # Interval until the process record is treated as expired EXPIRED_INTERVAL = 5.minutes - self.table_name = 'good_job_processes' + LOCK_TYPES = [ + LOCK_TYPE_ADVISORY = 'advisory', + ].freeze + + LOCK_TYPE_ENUMS = { + LOCK_TYPE_ADVISORY => 1, + }.freeze - cattr_reader :mutex, default: Mutex.new - cattr_accessor :_current_id, default: nil - cattr_accessor :_pid, default: nil + self.table_name = 'good_job_processes' # Processes that are active and locked. # @!method active # @!scope class # @return [ActiveRecord::Relation] - scope :active, -> { advisory_locked } + scope :active, (lambda do + if lock_type_migrated? + query = joins_advisory_locks + query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked + .or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago))) + else + advisory_locked + end + end) # Processes that are inactive and unlocked (e.g. SIGKILLed) # @!method active # @!scope class # @return [ActiveRecord::Relation] - scope :inactive, -> { advisory_unlocked } + scope :inactive, (lambda do + if lock_type_migrated? + query = joins_advisory_locks + query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_unlocked + .or(query.where(lock_type: nil).where(arel_table[:updated_at].lt(EXPIRED_INTERVAL.ago))) + else + advisory_unlocked + end + end) - # UUID that is unique to the current process and changes when forked. - # @return [String] - def self.current_id - mutex.synchronize { ns_current_id } + # Deletes all inactive process records. + def self.cleanup + inactive.find_each do |process| + GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.process_lock_migrated? # rubocop:disable Rails/SkipsModelValidations + process.delete + end end - def self.ns_current_id - if _current_id.nil? || _pid != ::Process.pid - self._current_id = SecureRandom.uuid - self._pid = ::Process.pid - end - _current_id + # @return [Boolean] + def self.lock_type_migrated? + columns_hash["lock_type"].present? end - # Hash representing metadata about the current process. - # @return [Hash] - def self.current_state - mutex.synchronize { ns_current_state } + def self.create_record(id:, with_advisory_lock: false) + attributes = { + id: id, + state: generate_state, + } + if with_advisory_lock + attributes[:create_with_advisory_lock] = true + attributes[:lock_type] = LOCK_TYPE_ADVISORY if lock_type_migrated? + end + create!(attributes) end - def self.ns_current_state + def self.generate_state { - id: ns_current_id, hostname: Socket.gethostname, pid: ::Process.pid, proctitle: $PROGRAM_NAME, @@ -70,45 +94,36 @@ def self.ns_current_state } end - # Deletes all inactive process records. - def self.cleanup - inactive.delete_all - end - - # Registers the current process in the database - # @return [GoodJob::Process] - def self.register - mutex.synchronize do - process_state = ns_current_state - create(id: process_state[:id], state: process_state, create_with_advisory_lock: true) - rescue ActiveRecord::RecordNotUnique - find(ns_current_state[:id]) - end - end - def refresh - mutex.synchronize do - reload - update(state: self.class.ns_current_state, updated_at: Time.current) - rescue ActiveRecord::RecordNotFound - false - end + self.state = self.class.generate_state + reload.update(state: state, updated_at: Time.current) + rescue ActiveRecord::RecordNotFound + @new_record = true + self.created_at = self.updated_at = nil + state_will_change! + save end - # Unregisters the instance. - def deregister - return unless owns_advisory_lock? + def refresh_if_stale(cleanup: false) + return unless stale? - mutex.synchronize do - destroy! - advisory_unlock - end + result = refresh + self.class.cleanup if cleanup + result end def state super || {} end + def stale? + updated_at < STALE_INTERVAL.ago + end + + def expired? + updated_at < EXPIRED_INTERVAL.ago + end + def basename File.basename(state.fetch("proctitle", "")) end @@ -117,20 +132,20 @@ def schedulers state.fetch("schedulers", []) end - def refresh_if_stale(cleanup: false) - return unless stale? + def lock_type + return unless self.class.columns_hash['lock_type'] - result = refresh - self.class.cleanup if cleanup - result + enum = super + LOCK_TYPE_ENUMS.key(enum) if enum end - def stale? - updated_at < STALE_INTERVAL.ago - end + def lock_type=(value) + return unless self.class.columns_hash['lock_type'] - def expired? - updated_at < EXPIRED_INTERVAL.ago + enum = LOCK_TYPE_ENUMS[value] + raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum + + super(enum) end end end diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index ac592c6c5..5ca138fbb 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -29,6 +29,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.integer :executions_count t.text :job_class t.integer :error_event, limit: 2 + t.uuid :locked_by_id + t.datetime :locked_at end create_table :good_job_batches, id: :uuid do |t| @@ -56,11 +58,13 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.datetime :finished_at t.text :error t.integer :error_event, limit: 2 + t.uuid :process_id end create_table :good_job_processes, id: :uuid do |t| t.timestamps t.jsonb :state + t.integer :lock_type, limit: 2 end create_table :good_job_settings, id: :uuid do |t| @@ -82,7 +86,11 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL" add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL" - add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + add_index :good_jobs, [:priority, :scheduled_at], order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id" + add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at end end diff --git a/lib/generators/good_job/templates/update/migrations/07_create_good_job_process_lock_ids.rb.erb b/lib/generators/good_job/templates/update/migrations/07_create_good_job_process_lock_ids.rb.erb new file mode 100644 index 000000000..e1e471ea7 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/07_create_good_job_process_lock_ids.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIds < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :locked_by_id) + end + end + + add_column :good_jobs, :locked_by_id, :uuid + add_column :good_jobs, :locked_at, :datetime + add_column :good_job_executions, :process_id, :uuid + add_column :good_job_processes, :lock_type, :integer, limit: 2 + end +end diff --git a/lib/generators/good_job/templates/update/migrations/08_create_good_job_process_lock_indexes.rb.erb b/lib/generators/good_job/templates/update/migrations/08_create_good_job_process_lock_indexes.rb.erb new file mode 100644 index 000000000..a6da2fab8 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/08_create_good_job_process_lock_indexes.rb.erb @@ -0,0 +1,37 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration<%= migration_version %> + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + add_index :good_jobs, [:priority, :scheduled_at], + order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", + name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", + name: :index_good_jobs_on_locked_by_id, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + add_index :good_job_executions, [:process_id, :created_at], + name: :index_good_job_executions_on_process_id_and_created_at, + algorithm: :concurrently + end + end + + dir.down do + remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + end + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index bcf7fc5cc..517ff553e 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -31,6 +31,7 @@ require "good_job/poller" require "good_job/http_server" require "good_job/probe_server" +require "good_job/process_tracker" require "good_job/scheduler" require "good_job/shared_executor" require "good_job/systemd_service" @@ -258,7 +259,7 @@ def self.deprecator def self.migrated? # Always update with the most recent migration check GoodJob::Execution.reset_column_information - GoodJob::Execution.error_event_migrated? + GoodJob::Execution.process_lock_migrated? end ActiveSupport.run_load_hooks(:good_job, self) diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 5acf687e8..c79c7cd51 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -90,11 +90,11 @@ def enqueue_all(active_jobs) end end - begin + @capsule.process_tracker.register do until inline_executions.empty? begin inline_execution = inline_executions.shift - inline_result = inline_execution.perform + inline_result = inline_execution.perform(id_for_lock: @capsule.process_tracker.id_for_lock) ensure inline_execution.advisory_unlock inline_execution.run_callbacks(:perform_unlocked) @@ -145,8 +145,8 @@ def enqueue_at(active_job, timestamp) ) if will_execute_inline - begin - result = execution.perform + result = @capsule.process_tracker.register do + execution.perform(id_for_lock: @capsule.process_tracker.id_for_lock) ensure execution.advisory_unlock execution.run_callbacks(:perform_unlocked) @@ -156,7 +156,7 @@ def enqueue_at(active_job, timestamp) job_state = { queue_name: execution.queue_name } job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at - executed_locally = execute_async? && @capsule&.create_thread(job_state) + executed_locally = execute_async? && @capsule.create_thread(job_state) Notifier.notify(job_state) if !executed_locally && send_notify?(active_job) end diff --git a/lib/good_job/capsule.rb b/lib/good_job/capsule.rb index 29b72032b..0b23c2232 100644 --- a/lib/good_job/capsule.rb +++ b/lib/good_job/capsule.rb @@ -11,6 +11,8 @@ class Capsule # @return [Array, nil] cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false + attr_reader :process_tracker + # @param configuration [GoodJob::Configuration] Configuration to use for this capsule. def initialize(configuration: GoodJob.configuration) @configuration = configuration @@ -18,6 +20,10 @@ def initialize(configuration: GoodJob.configuration) @running = false @mutex = Mutex.new + # TODO: allow the shared executor to remain until the very, very end, then shutdown. And allow restart. + @shared_executor = GoodJob::SharedExecutor.new + @process_tracker = GoodJob::ProcessTracker.new + self.class.instances << self end @@ -29,15 +35,15 @@ def start(force: false) @mutex.synchronize do return unless startable?(force: force) - @shared_executor = GoodJob::SharedExecutor.new - @notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify, executor: @shared_executor.executor) + @notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify, capsule: self, executor: @shared_executor.executor) @poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval) - @scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: true) + @scheduler = GoodJob::Scheduler.from_configuration(@configuration, capsule: self, warm_cache_on_initialize: true) @notifier.recipients << [@scheduler, :create_thread] @poller.recipients << [@scheduler, :create_thread] @cron_manager = GoodJob::CronManager.new(@configuration.cron_entries, start_on_initialize: true, executor: @shared_executor.executor) if @configuration.enable_cron? + @process_tracker.register @startable = false @running = true end @@ -51,10 +57,14 @@ def start(force: false) # * +nil+ will trigger a shutdown but not wait for it to complete. # @return [void] def shutdown(timeout: :default) - timeout = @configuration.shutdown_timeout if timeout == :default - GoodJob._shutdown_all([@shared_executor, @notifier, @poller, @scheduler, @cron_manager].compact, timeout: timeout) - @startable = false - @running = false + @mutex.synchronize do + timeout = @configuration.shutdown_timeout if timeout == :default + GoodJob._shutdown_all([@notifier, @poller, @scheduler, @cron_manager].compact, timeout: timeout) + + process_tracker.unregister + @startable = false + @running = false + end end # Shutdown and then start the capsule again. @@ -74,7 +84,7 @@ def running? # @return [Boolean] Whether the capsule has been shutdown. def shutdown? - [@shared_executor, @notifier, @poller, @scheduler, @cron_manager].compact.all?(&:shutdown?) + [@notifier, @poller, @scheduler, @cron_manager].compact.all?(&:shutdown?) end # Creates an execution thread(s) with the given attributes. @@ -85,6 +95,12 @@ def create_thread(job_state = nil) @scheduler&.create_thread(job_state) end + # UUID for this capsule; to be used for inspection (not directly for locking jobs). + # @return [String] + def process_id + @process_tracker.process_id + end + private def startable?(force: false) diff --git a/lib/good_job/job_performer.rb b/lib/good_job/job_performer.rb index ae7135333..a1492abf0 100644 --- a/lib/good_job/job_performer.rb +++ b/lib/good_job/job_performer.rb @@ -14,8 +14,9 @@ class JobPerformer cattr_accessor :performing_active_job_ids, default: Concurrent::Set.new # @param queue_string [String] Queues to execute jobs from - def initialize(queue_string) + def initialize(queue_string, capsule: GoodJob.capsule) @queue_string = queue_string + @capsule = capsule end # A meaningful name to identify the performer in logs and for debugging. @@ -28,7 +29,7 @@ def name # @return [Object, nil] Returns job result or +nil+ if no job was found def next active_job_id = nil - job_query.perform_with_advisory_lock(parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit) do |execution| + job_query.perform_with_advisory_lock(parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit, capsule: @capsule) do |execution| active_job_id = execution.active_job_id performing_active_job_ids << active_job_id end diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index c3d6404f2..d3e2c53a4 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -2,6 +2,7 @@ require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' +require "concurrent/scheduled_task" require "good_job/notifier/process_heartbeat" module GoodJob # :nodoc: @@ -60,8 +61,7 @@ def self.notify(message) # @param recipients [Array<#call, Array(Object, Symbol)>] # @param enable_listening [true, false] - # @param executor [Concurrent::ExecutorService] - def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor) + def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) @enable_listening = enable_listening @executor = executor @@ -75,6 +75,7 @@ def initialize(*recipients, enable_listening: true, executor: Concurrent.global_ @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil + @capsule = capsule start self.class.instances << self diff --git a/lib/good_job/notifier/process_heartbeat.rb b/lib/good_job/notifier/process_heartbeat.rb index 1526ac5c4..c0a1a57a3 100644 --- a/lib/good_job/notifier/process_heartbeat.rb +++ b/lib/good_job/notifier/process_heartbeat.rb @@ -16,7 +16,7 @@ module ProcessHeartbeat def register_process GoodJob::Process.with_connection(connection) do GoodJob::Process.cleanup - @process = GoodJob::Process.register + @capsule.process_tracker.register(with_advisory_lock: true) end end @@ -24,7 +24,7 @@ def refresh_process Rails.application.executor.wrap do GoodJob::Process.with_connection(connection) do GoodJob::Process.with_logger_silenced do - @process&.refresh_if_stale(cleanup: true) + @capsule.process_tracker.record&.refresh_if_stale(cleanup: true) end end end @@ -33,7 +33,7 @@ def refresh_process # Deregisters the current process. def deregister_process GoodJob::Process.with_connection(connection) do - @process&.deregister + @capsule.process_tracker.unregister(with_advisory_lock: true) end end end diff --git a/lib/good_job/process_tracker.rb b/lib/good_job/process_tracker.rb new file mode 100644 index 000000000..0ca3ba4e0 --- /dev/null +++ b/lib/good_job/process_tracker.rb @@ -0,0 +1,154 @@ +# frozen_string_literal: true + +module GoodJob # :nodoc: + class ProcessTracker + attr_reader :record, :locks, :advisory_locks + + # @!attribute [r] instances + # @!scope class + # List of all instantiated ProcessTrackers in the current process. + # @return [Array, nil] + cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false + + def initialize(executor: Concurrent.global_io_executor) + @executor = executor + @mutex = Mutex.new + @locks = 0 + @advisory_locks = 0 + @ruby_pid = ::Process.pid + @record_id = SecureRandom.uuid + @record = nil + @refresh_task = nil + + self.class.instances << self + end + + # The expected UUID of the process for use in inspection. + # Use {#id_for_lock} if using this as a lock key. + # @return [String] + def process_id + @record_id + end + + # The UUID to use for locking. May be nil if the process is not registered or is unusable/expired. + # If UUID has not yet been persisted to the database, this method will make a query to insert or update it. + # @return [String, nil] + def id_for_lock + synchronize do + next if @locks.zero? + + reset_on_fork + Rails.application.executor.wrap do + if @record + @record.refresh_if_stale + else + @record = GoodJob::Process.create_record(id: @record_id) + create_refresh_task + end + @record&.id + end + end + end + + # Registers the current process in the database. + # @param with_advisory_lock [Boolean] Whether the lock strategy should us an advisory lock; the connection must be able to support advisory locks. + # @yield [void] If a block is given, the process will be unregistered after the block completes. + # @return [void] + def register(with_advisory_lock: false) + synchronize do + if with_advisory_lock + Rails.application.executor.wrap do + if @record + @record.update(lock_type: GoodJob::Process::LOCK_TYPE_ADVISORY) if @record.advisory_lock + else + @record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true) + create_refresh_task + end + end + @advisory_locks += 1 + end + + @locks += 1 + end + return unless block_given? + + begin + yield + ensure + unregister(with_advisory_lock: with_advisory_lock) + end + end + + # Unregisters the current process from the database. + # @param with_advisory_lock [Boolean] Whether the lock strategy should unlock an advisory lock; the connection must be able to support advisory locks. + # @return [void] + def unregister(with_advisory_lock: false) + synchronize do + Rails.application.executor.wrap do + if with_advisory_lock && @record && @advisory_locks.positive? + @record.update(lock_type: nil) if @record.advisory_unlock + @advisory_locks -= 1 + end + + if @locks == 1 && @record + @record.destroy + @record = nil + end + end + + cancel_refresh_task if @locks == 1 + @locks -= 1 unless @locks.zero? + end + end + + # @!visibility private + def task_observer(_time, _output, thread_error) + GoodJob._on_thread_error(thread_error) if thread_error && !thread_error.is_a?(Concurrent::CancelledOperationError) + end + + private + + def task_interval + GoodJob::Process::STALE_INTERVAL + jitter + end + + def jitter + GoodJob::Process::STALE_INTERVAL * 0.1 * Kernel.rand + end + + def create_refresh_task + return if @refresh_task + + @refresh_task = Concurrent::ScheduledTask.new(task_interval, executor: @executor) do + synchronize do + create_refresh_task + GoodJob::Process.with_logger_silenced do + @record&.refresh_if_stale(cleanup: true) + end + end + end + @refresh_task.add_observer(self, :task_observer) + @refresh_task.execute + end + + def cancel_refresh_task + @refresh_task = nil if @refresh_task && (@refresh_task.cancel || @refresh_task.cancelled?) + end + + def synchronize(&block) + if @mutex.owned? + yield + else + @mutex.synchronize(&block) + end + end + + def reset_on_fork + return if @ruby_pid == ::Process.pid + + @ruby_pid = ::Process.pid + @record_id = SecureRandom.uuid + @record = nil + end + end +end diff --git a/lib/good_job/scheduler.rb b/lib/good_job/scheduler.rb index 89d61faeb..38c48cdce 100644 --- a/lib/good_job/scheduler.rb +++ b/lib/good_job/scheduler.rb @@ -40,12 +40,12 @@ class Scheduler # @param configuration [GoodJob::Configuration] # @param warm_cache_on_initialize [Boolean] # @return [GoodJob::Scheduler, GoodJob::MultiScheduler] - def self.from_configuration(configuration, warm_cache_on_initialize: false) + def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_on_initialize: false) schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':') max_threads = (max_threads || configuration.max_threads).to_i - job_performer = GoodJob::JobPerformer.new(queue_string) + job_performer = GoodJob::JobPerformer.new(queue_string, capsule: capsule) GoodJob::Scheduler.new( job_performer, max_threads: max_threads, diff --git a/sorbet/rbi/todo.rbi b/sorbet/rbi/todo.rbi index 391b17d66..6df7ff604 100644 --- a/sorbet/rbi/todo.rbi +++ b/sorbet/rbi/todo.rbi @@ -17,6 +17,7 @@ module ::LATCH; end module ::MemoryProfiler; end module ::PERFORMED; end module ::POLL_COUNT; end +module ::PROCESS_IDS; end module ::RECEIVED_MESSAGE; end module ::REFRESH_IF_STALE_CALLED; end module ::RESULTS; end diff --git a/spec/app/models/good_job/process_spec.rb b/spec/app/models/good_job/process_spec.rb index b0a740c4f..03e672a52 100644 --- a/spec/app/models/good_job/process_spec.rb +++ b/spec/app/models/good_job/process_spec.rb @@ -3,29 +3,23 @@ require 'rails_helper' RSpec.describe GoodJob::Process do - describe '.current_id' do - it 'returns a uuid that does not change' do - value = described_class.current_id - expect(value).to be_present - - expect(described_class.current_id).to eq value + describe 'Scopes' do + describe '.active' do + it 'returns active processes' do + expect(described_class.active.count).to eq 0 + end end - it 'changes when the PID changes' do - allow(Process).to receive(:pid).and_return(1) - original_value = described_class.current_id - - allow(Process).to receive(:pid).and_return(2) - expect(described_class.current_id).not_to eq original_value - - # Unstub the pid or RSpec/DatabaseCleaner may fail - RSpec::Mocks.space.proxy_for(Process).reset + describe '.inactive' do + it 'returns inactive processes' do + expect(described_class.inactive.count).to eq 0 + end end end - describe '.ns_current_state' do + describe '.generate_state' do it 'contains information about the process' do - expect(described_class.ns_current_state).to include( + expect(described_class.generate_state).to include( database_connection_pool: include( size: be_an(Integer), active: be_an(Integer) @@ -34,31 +28,6 @@ end end - describe '.register' do - it 'registers the process' do - process = nil - expect do - process = described_class.register - end.to change(described_class, :count).by(1) - - process.deregister - end - - context 'when there is already an existing record' do - it 'returns the existing record' do - described_class.create!(id: described_class.current_id) - expect(described_class.register).to be_a described_class - end - end - end - - describe '#deregister' do - it 'deregisters the record' do - process = described_class.register - expect { process.deregister }.to change(described_class, :count).by(-1) - end - end - describe '#basename' do let(:process) { described_class.new state: {} } @@ -82,11 +51,11 @@ end context 'when the record has been deleted elsewhere' do - it 'returns false' do + it 'creates a new record' do process = described_class.create! state: {}, updated_at: 1.day.ago described_class.where(id: process.id).delete_all - expect(process.refresh).to be false + expect { process.refresh }.to change(described_class, :count).from(0).to(1) end end end diff --git a/spec/generators/good_job/update_generator_spec.rb b/spec/generators/good_job/update_generator_spec.rb index e5daaae7e..0bea765e7 100644 --- a/spec/generators/good_job/update_generator_spec.rb +++ b/spec/generators/good_job/update_generator_spec.rb @@ -27,7 +27,7 @@ # Check that `GoodJob.pending_migrations?` is updated expect(GoodJob.migrated?).to be true quiet do - run_in_example_app 'rails db:rollback' + run_in_example_app 'rails db:rollback STEP=2' expect(GoodJob.migrated?).to be false run_in_example_app 'rails db:migrate' end diff --git a/spec/integration/adapter_spec.rb b/spec/integration/adapter_spec.rb index cc1a6e635..2e8fc073f 100644 --- a/spec/integration/adapter_spec.rb +++ b/spec/integration/adapter_spec.rb @@ -108,14 +108,11 @@ def perform(*_args, **_kwargs) let(:adapter) { GoodJob::Adapter.new(execution_mode: :async_all, _capsule: capsule) } it 'executes the job', skip_if_java: true do - elephant_adapter = GoodJob::Adapter.new execution_mode: :async_all elephant_ajob = TestJob.set(queue: 'elephants').perform_later sleep_until { RUN_JOBS.include? elephant_ajob.provider_job_id } expect(RUN_JOBS).to include(elephant_ajob.provider_job_id) - - elephant_adapter.shutdown end end diff --git a/spec/integration/process_locks_spec.rb b/spec/integration/process_locks_spec.rb new file mode 100644 index 000000000..5e9e4ddcf --- /dev/null +++ b/spec/integration/process_locks_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'Process Locks' do + let(:capsule) { GoodJob::Capsule.new } + let(:inline_adapter) { GoodJob::Adapter.new(execution_mode: :inline, _capsule: capsule) } + let(:async_adapter) { GoodJob::Adapter.new(execution_mode: :async, _capsule: capsule) } + + before do + capsule.start + + stub_const("PROCESS_IDS", Concurrent::Array.new) + stub_const "TestJob", (Class.new(ActiveJob::Base) do + def perform + PROCESS_IDS << GoodJob::Job.where(id: provider_job_id).pick(:locked_by_id) + end + end) + end + + after do + capsule.shutdown + end + + it 'stores process_id in inline job' do + TestJob.queue_adapter = inline_adapter + TestJob.perform_later + + wait_until { expect(GoodJob::Job.last.finished_at).to be_present } + expect(PROCESS_IDS.size).to eq 1 + expect(PROCESS_IDS.first).to be_a_uuid + expect(PROCESS_IDS.first).to eq capsule.process_id + expect(GoodJob::Job.last.locked_by_id).to be_nil + end + + it 'stores process_id in async job' do + TestJob.queue_adapter = async_adapter + TestJob.perform_later + + wait_until { expect(GoodJob::Job.pick(:finished_at)).to be_present } + expect(PROCESS_IDS.size).to eq 1 + expect(PROCESS_IDS.first).to be_a_uuid + expect(PROCESS_IDS.first).to eq capsule.process_id + expect(GoodJob::Job.last.locked_by_id).to be_nil + end +end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 880c3c3ee..7c6fb5268 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -162,25 +162,11 @@ wait_until { expect(GoodJob::Process.count).to eq 1 } process = GoodJob::Process.first - expect(process.id).to eq GoodJob::Process.current_id + expect(process.id).to eq GoodJob.capsule.process_tracker.id_for_lock expect(process).to be_advisory_locked notifier.shutdown expect { process.reload }.to raise_error ActiveRecord::RecordNotFound end - - context 'when, for some reason, the process already exists' do - it 'does not create a new process' do - process = GoodJob::Process.register - notifier = described_class.new(enable_listening: true) - - wait_until { expect(notifier).to be_listening } - expect(GoodJob::Process.count).to eq 1 - - notifier.shutdown - expect(process.reload).to eq process - process.advisory_unlock - end - end end end diff --git a/spec/lib/good_job/process_tracker_spec.rb b/spec/lib/good_job/process_tracker_spec.rb new file mode 100644 index 000000000..3a638e53b --- /dev/null +++ b/spec/lib/good_job/process_tracker_spec.rb @@ -0,0 +1,147 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe GoodJob::ProcessTracker do + let(:tracker) { described_class.new } + + describe '#register' do + context 'when used with an advisory lock' do + it 'creates a Process and sets the lock_type' do + expect do + tracker.register(with_advisory_lock: true) + end.to change(GoodJob::Process, :count).by(1) + + process = GoodJob::Process.last + expect(process.lock_type).to eq('advisory') + + tracker.unregister(with_advisory_lock: true) + + expect(GoodJob::Process.count).to eq 0 + end + + it 'takes an advisory lock even when process already exists' do + tracker.register do + expect(GoodJob::Process.count).to eq 0 + + tracker.id_for_lock + + process = GoodJob::Process.last + expect(process).to be_present + expect(process).not_to be_advisory_locked + expect(process.lock_type).to eq(nil) + + tracker.register(with_advisory_lock: true) do + process.reload + expect(process).to be_advisory_locked + expect(process.lock_type).to eq('advisory') + end + + process.reload + expect(process).not_to be_advisory_locked + expect(process.lock_type).to eq(nil) + end + + expect(GoodJob::Process.count).to eq 0 + end + + it 'increments the number of locks when not used with a block' do + expect do + tracker.register(with_advisory_lock: true) + end.to change(tracker, :locks).by(1) + tracker.unregister(with_advisory_lock: true) + end + end + + context 'when NOT used with an advisory lock' do + it 'does not create a Process' do + expect do + tracker.register + end.not_to change(GoodJob::Process, :count) + + tracker.unregister + end + + it 'increments the number of locks when not used with a block' do + expect do + tracker.register + end.to change(tracker, :locks).by(1) + tracker.unregister + end + end + + it 'creates a ScheduledTask that refreshes the process in the background' do + stub_const("GoodJob::Process::STALE_INTERVAL", 0.1.seconds) + + tracker.register do + tracker.id_for_lock + expect do + sleep 0.2 + end.to change { GoodJob::Process.first.updated_at } + end + end + + it 'resets the number of locks when used with a block' do + called_block = nil + tracker.register do + called_block = true + expect(tracker.locks).to eq 1 + end + + expect(called_block).to be true + expect(tracker.locks).to eq 0 + end + + it 'removes the process when locks are zero' do + inner_block_called = nil + tracker.register do + lock_id = tracker.id_for_lock + expect(lock_id).to be_present + + expect(GoodJob::Process.count).to eq 1 + tracker.register do + inner_block_called = true + expect(tracker.id_for_lock).to eq(lock_id) + expect(GoodJob::Process.count).to eq 1 + end + expect(GoodJob::Process.count).to eq 1 + expect(tracker.id_for_lock).to eq(lock_id) + end + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + + expect(inner_block_called).to be true + end + end + + describe '#process_id' do + it 'is a UUID' do + expect(tracker.process_id).to be_a_uuid + end + end + + describe '.id_for_lock' do + it 'is available if the process has been registered' do + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + + tracker.register do + expect(tracker.id_for_lock).to be_present + end + + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + end + + it 'changes when the PID changes', skip: Gem::Version.new(Rails.version) < Gem::Version.new('6.1.a') && "Fork tracker is broken in tests for Rails v6.0" do + allow(Process).to receive(:pid).and_return(1) + tracker.register do + original_value = tracker.id_for_lock + allow(Process).to receive(:pid).and_return(2) + expect(tracker.id_for_lock).not_to eq original_value + end + # Unstub the pid or RSpec/DatabaseCleaner may fail + RSpec::Mocks.space.proxy_for(Process).reset + end + end +end diff --git a/spec/lib/good_job_spec.rb b/spec/lib/good_job_spec.rb index 2f1ab8efe..8d39c8b9e 100644 --- a/spec/lib/good_job_spec.rb +++ b/spec/lib/good_job_spec.rb @@ -32,9 +32,12 @@ expect { described_class.restart }.not_to change(described_class, :shutdown?).from(true) end - it 'restarts down all capsule instances' do - GoodJob::Capsule.new(configuration: configuration) - expect { described_class.restart }.to change(described_class, :shutdown?).from(true).to(false) + it 'restarts all capsule instances' do + capsule = GoodJob::Capsule.new(configuration: configuration) + expect { described_class.restart }.to change(capsule, :shutdown?).from(true).to(false) + capsule.shutdown + + described_class.shutdown end end diff --git a/spec/support/postgres_notices.rb b/spec/support/postgres_notices.rb index 1af9e9a6e..02b3742d4 100644 --- a/spec/support/postgres_notices.rb +++ b/spec/support/postgres_notices.rb @@ -11,6 +11,7 @@ next unless raw_connection.respond_to? :set_notice_receiver raw_connection.set_notice_receiver do |result| + Rails.logger.warn(result.error_message.strip) POSTGRES_NOTICES << result.error_message end } diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index 8dbc5529c..f479217db 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -34,6 +34,13 @@ ) GoodJob._shutdown_all(executables, timeout: -1) + GoodJob::ProcessTracker.instances.each do |process_tracker| + expect(process_tracker.locks).to eq 0 + expect(process_tracker.advisory_locks).to eq 0 + expect(process_tracker.record).to be_nil + end + GoodJob::ProcessTracker.instances.clear + expect(THREAD_ERRORS).to be_empty expect(GoodJob::Notifier.instances).to all be_shutdown diff --git a/spec/support/uuid.rb b/spec/support/uuid.rb new file mode 100644 index 000000000..71e951f05 --- /dev/null +++ b/spec/support/uuid.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +RSpec::Matchers.define :be_a_uuid do + match do |actual| + regexp = /\A\h{8}-\h{4}-(\h{4})-\h{4}-\h{12}\z/ + actual.is_a?(String) && actual.match?(regexp) + end + + description { "a UUID" } + failure_message { "expected #{description}" } + failure_message_when_negated { "did not expect #{description}" } +end + +RSpec::Matchers.alias_matcher :a_uuid, :be_a_uuid diff --git a/spec/test_app/db/migrate/20230704201336_create_good_job_process_lock_ids.rb b/spec/test_app/db/migrate/20230704201336_create_good_job_process_lock_ids.rb new file mode 100644 index 000000000..c51ed4868 --- /dev/null +++ b/spec/test_app/db/migrate/20230704201336_create_good_job_process_lock_ids.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIds < ActiveRecord::Migration[7.0] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :locked_by_id) + end + end + + add_column :good_jobs, :locked_by_id, :uuid + add_column :good_jobs, :locked_at, :datetime + add_column :good_job_executions, :process_id, :uuid + add_column :good_job_processes, :lock_type, :integer, limit: 2 + end +end diff --git a/spec/test_app/db/migrate/20230704201337_create_good_job_process_lock_indexes.rb b/spec/test_app/db/migrate/20230704201337_create_good_job_process_lock_indexes.rb new file mode 100644 index 000000000..5e157cfb3 --- /dev/null +++ b/spec/test_app/db/migrate/20230704201337_create_good_job_process_lock_indexes.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration[7.0] + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + add_index :good_jobs, [:priority, :scheduled_at], + order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", + name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", + name: :index_good_jobs_on_locked_by_id, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + add_index :good_job_executions, [:process_id, :created_at], + name: :index_good_job_executions_on_process_id_and_created_at, + algorithm: :concurrently + end + + end + + dir.down do + remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + end + end + end +end diff --git a/spec/test_app/db/schema.rb b/spec/test_app/db/schema.rb index 0bd2b6a06..484439071 100644 --- a/spec/test_app/db/schema.rb +++ b/spec/test_app/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2023_07_03_035750) do +ActiveRecord::Schema.define(version: 2023_07_04_201337) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -41,13 +41,16 @@ t.datetime "finished_at" t.text "error" t.integer "error_event", limit: 2 + t.uuid "process_id" t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at" + t.index ["process_id", "created_at"], name: "index_good_job_executions_on_process_id_and_created_at" end create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.datetime "created_at", null: false t.datetime "updated_at", null: false t.jsonb "state" + t.integer "lock_type", limit: 2 end create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -79,6 +82,8 @@ t.integer "executions_count" t.text "job_class" t.integer "error_event", limit: 2 + t.uuid "locked_by_id" + t.datetime "locked_at" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["active_job_id"], name: "index_good_jobs_on_active_job_id" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)" @@ -87,7 +92,9 @@ t.index ["cron_key", "created_at"], name: "index_good_jobs_on_cron_key_and_created_at" t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at", unique: true t.index ["finished_at"], name: "index_good_jobs_jobs_on_finished_at", where: "((retried_good_job_id IS NULL) AND (finished_at IS NOT NULL))" + t.index ["locked_by_id"], name: "index_good_jobs_on_locked_by_id", where: "(locked_by_id IS NOT NULL)" t.index ["priority", "created_at"], name: "index_good_jobs_jobs_on_priority_created_at_when_unfinished", order: { priority: "DESC NULLS LAST" }, where: "(finished_at IS NULL)" + t.index ["priority", "scheduled_at"], name: "index_good_jobs_on_priority_scheduled_at_unfinished_unlocked", where: "((finished_at IS NULL) AND (locked_by_id IS NULL))" t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)" t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)" end