diff --git a/app/models/good_job/base_execution.rb b/app/models/good_job/base_execution.rb index f88a2a4a0..bc7ca5342 100644 --- a/app/models/good_job/base_execution.rb +++ b/app/models/good_job/base_execution.rb @@ -84,6 +84,13 @@ def candidate_lookup_index_migrated? migration_pending_warning! false end + + def process_lock_migrated? + return true if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + + migration_pending_warning! + false + end end # The ActiveJob job class, as a string diff --git a/app/models/good_job/execution.rb b/app/models/good_job/execution.rb index 1d7c760af..8621c1695 100644 --- a/app/models/good_job/execution.rb +++ b/app/models/good_job/execution.rb @@ -262,7 +262,7 @@ def self.enqueue_args(active_job, overrides = {}) # return value for the job's +#perform+ method, and the exception the job # raised, if any (if the job raised, then the second array entry will be # +nil+). If there were no jobs to execute, returns +nil+. - def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) + def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil) execution = nil result = nil @@ -270,7 +270,7 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil) execution = executions.first if execution&.executable? yield(execution) if block_given? - result = execution.perform + result = execution.perform(lock_id: lock_id) else execution = nil yield(nil) if block_given? @@ -367,7 +367,7 @@ def self.format_error(error) # An array of the return value of the job's +#perform+ method and the # exception raised by the job, if any. If the job completed successfully, # the second array entry (the exception) will be +nil+ and vice versa. - def perform + def perform(lock_id:) run_callbacks(:perform) do raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at @@ -397,17 +397,37 @@ def perform if discrete? transaction do - discrete_execution = discrete_executions.create!( + discrete_execution_attrs = { job_class: job_class, queue_name: queue_name, serialized_params: serialized_params, scheduled_at: (scheduled_at || created_at), - created_at: job_performed_at - ) - update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1)) + created_at: job_performed_at, + } + discrete_execution_attrs[:process_id] = lock_id if GoodJob::DiscreteExecution.columns_hash.key?("process_id") + + execution_attrs = { + performed_at: job_performed_at, + executions_count: ((executions_count || 0) + 1), + } + if GoodJob::Execution.columns_hash.key?("locked_by_id") + execution_attrs[:locked_by_id] = lock_id + execution_attrs[:locked_at] = Time.current + end + + discrete_execution = discrete_executions.create!(discrete_execution_attrs) + update!(execution_attrs) end else - update!(performed_at: job_performed_at) + execution_attrs = { + performed_at: job_performed_at, + } + if GoodJob::Execution.columns_hash.key?("locked_by_id") + execution_attrs[:locked_by_id] = lock_id + execution_attrs[:locked_at] = Time.current + end + + update!(execution_attrs) end ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload| @@ -450,7 +470,11 @@ def perform end end - job_attributes = {} + job_attributes = if self.class.columns_hash.key?("locked_by_id") + { locked_by_id: nil, locked_at: nil } + else + {} + end job_error = result.handled_error || result.unhandled_error if job_error diff --git a/app/models/good_job/job.rb b/app/models/good_job/job.rb index b61c20775..0d977f27d 100644 --- a/app/models/good_job/job.rb +++ b/app/models/good_job/job.rb @@ -29,6 +29,7 @@ def table_name=(_value) self.implicit_order_column = 'created_at' belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true + belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent diff --git a/app/models/good_job/process.rb b/app/models/good_job/process.rb index a6fd76ec8..790f007f4 100644 --- a/app/models/good_job/process.rb +++ b/app/models/good_job/process.rb @@ -3,7 +3,7 @@ require 'socket' module GoodJob # :nodoc: - # ActiveRecord model that represents an GoodJob process (either async or CLI). + # Active Record model that represents a GoodJob capsule/process (either async or CLI). class Process < BaseRecord include AdvisoryLockable include OverridableConnection @@ -15,50 +15,74 @@ class Process < BaseRecord self.table_name = 'good_job_processes' self.implicit_order_column = 'created_at' + LOCK_TYPES = [ + LOCK_TYPE_ADVISORY = 'advisory', + ].freeze - cattr_reader :mutex, default: Mutex.new - cattr_accessor :_current_id, default: nil - cattr_accessor :_pid, default: nil + LOCK_TYPE_ENUMS = { + LOCK_TYPE_ADVISORY => 1, + }.freeze + + self.table_name = 'good_job_processes' + + has_many :locked_jobs, class_name: "GoodJob::Job", foreign_key: :locked_by_id, inverse_of: :locked_by_process, dependent: nil + after_destroy { locked_jobs.update_all(locked_by_id: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") } # rubocop:disable Rails/SkipsModelValidations # Processes that are active and locked. # @!method active # @!scope class # @return [ActiveRecord::Relation] - scope :active, -> { advisory_locked } + scope :active, (lambda do + if lock_type_migrated? + query = joins_advisory_locks + query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked + .or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago))) + else + advisory_locked + end + end) # Processes that are inactive and unlocked (e.g. SIGKILLed) # @!method active # @!scope class # @return [ActiveRecord::Relation] - scope :inactive, -> { advisory_unlocked } - - # UUID that is unique to the current process and changes when forked. - # @return [String] - def self.current_id - mutex.synchronize { ns_current_id } - end + scope :inactive, (lambda do + if lock_type_migrated? + query = joins_advisory_locks + query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_unlocked + .or(query.where(lock_type: nil).where(arel_table[:updated_at].lt(EXPIRED_INTERVAL.ago))) + else + advisory_unlocked + end + end) - def self.ns_current_id - if _current_id.nil? || _pid != ::Process.pid - self._current_id = SecureRandom.uuid - self._pid = ::Process.pid + # Deletes all inactive process records. + def self.cleanup + inactive.find_each do |process| + GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") # rubocop:disable Rails/SkipsModelValidations + process.delete end - _current_id end - # Hash representing metadata about the current process. - # @return [Hash] - def self.current_state - mutex.synchronize { ns_current_state } + # @return [Boolean] + def self.lock_type_migrated? + columns_hash["lock_type"].present? end - def self.ns_current_state - total_succeeded_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count, 0) } - total_errored_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count, 0) } - total_empty_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:empty_executions_count, 0) } + def self.create_record(id:, with_advisory_lock: false) + attributes = { + id: id, + state: process_state, + } + if with_advisory_lock + attributes[:create_with_advisory_lock] = true + attributes[:lock_type] = LOCK_TYPE_ADVISORY if lock_type_migrated? + end + create!(attributes) + end + def self.process_state { - id: ns_current_id, hostname: Socket.gethostname, pid: ::Process.pid, proctitle: $PROGRAM_NAME, @@ -66,10 +90,8 @@ def self.ns_current_state retry_on_unhandled_error: GoodJob.retry_on_unhandled_error, schedulers: GoodJob::Scheduler.instances.map(&:stats), cron_enabled: GoodJob.configuration.enable_cron?, - total_succeeded_executions_count: total_succeeded_executions_count, - total_errored_executions_count: total_errored_executions_count, - total_executions_count: total_succeeded_executions_count + total_errored_executions_count, - total_empty_executions_count: total_empty_executions_count, + total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) }, + total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) }, database_connection_pool: { size: connection_pool.size, active: connection_pool.connections.count(&:in_use?), @@ -77,45 +99,36 @@ def self.ns_current_state } end - # Deletes all inactive process records. - def self.cleanup - inactive.delete_all - end - - # Registers the current process in the database - # @return [GoodJob::Process] - def self.register - mutex.synchronize do - process_state = ns_current_state - create(id: process_state[:id], state: process_state, create_with_advisory_lock: true) - rescue ActiveRecord::RecordNotUnique - find(ns_current_state[:id]) - end - end - def refresh - mutex.synchronize do - reload - update(state: self.class.ns_current_state, updated_at: Time.current) - rescue ActiveRecord::RecordNotFound - false - end + self.state = self.class.process_state + reload.update(state: state, updated_at: Time.current) + rescue ActiveRecord::RecordNotFound + @new_record = true + self.created_at = self.updated_at = nil + state_will_change! + save end - # Unregisters the instance. - def deregister - return unless owns_advisory_lock? + def refresh_if_stale(cleanup: false) + return unless stale? - mutex.synchronize do - destroy! - advisory_unlock - end + result = refresh + self.class.cleanup if cleanup + result end def state super || {} end + def stale? + updated_at < STALE_INTERVAL.ago + end + + def expired? + updated_at < EXPIRED_INTERVAL.ago + end + def basename File.basename(state.fetch("proctitle", "")) end @@ -124,20 +137,20 @@ def schedulers state.fetch("schedulers", []) end - def refresh_if_stale(cleanup: false) - return unless stale? + def lock_type + return unless self.class.columns_hash['lock_type'] - result = refresh - self.class.cleanup if cleanup - result + enum = super + LOCK_TYPE_ENUMS.key(enum) if enum end - def stale? - updated_at < STALE_INTERVAL.ago - end + def lock_type=(value) + return unless self.class.columns_hash['lock_type'] - def expired? - updated_at < EXPIRED_INTERVAL.ago + enum = LOCK_TYPE_ENUMS[value] + raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum + + super(enum) end end end diff --git a/demo/db/migrate/20240518175057_create_good_job_process_lock_ids.rb b/demo/db/migrate/20240518175057_create_good_job_process_lock_ids.rb new file mode 100644 index 000000000..da43683d4 --- /dev/null +++ b/demo/db/migrate/20240518175057_create_good_job_process_lock_ids.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIds < ActiveRecord::Migration[7.1] + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :locked_by_id) + end + end + + add_column :good_jobs, :locked_by_id, :uuid + add_column :good_jobs, :locked_at, :datetime + add_column :good_job_executions, :process_id, :uuid + add_column :good_job_processes, :lock_type, :integer, limit: 2 + end +end diff --git a/demo/db/migrate/20240518175058_create_good_job_process_lock_indexes.rb b/demo/db/migrate/20240518175058_create_good_job_process_lock_indexes.rb new file mode 100644 index 000000000..fb98f05a4 --- /dev/null +++ b/demo/db/migrate/20240518175058_create_good_job_process_lock_indexes.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration[7.1] + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + add_index :good_jobs, [:priority, :scheduled_at], + order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", + name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", + name: :index_good_jobs_on_locked_by_id, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + add_index :good_job_executions, [:process_id, :created_at], + name: :index_good_job_executions_on_process_id_and_created_at, + algorithm: :concurrently + end + end + + dir.down do + remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + end + end + end +end diff --git a/demo/db/schema.rb b/demo/db/schema.rb index 387a6e030..b85b6c951 100644 --- a/demo/db/schema.rb +++ b/demo/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema.define(version: 2024_04_17_093204) do +ActiveRecord::Schema.define(version: 2024_05_18_175058) do # These are extensions that must be enabled in order to support this database enable_extension "pgcrypto" enable_extension "plpgsql" @@ -25,9 +25,9 @@ t.text "on_discard" t.text "callback_queue_name" t.integer "callback_priority" - t.datetime "enqueued_at", precision: nil - t.datetime "discarded_at", precision: nil - t.datetime "finished_at", precision: nil + t.datetime "enqueued_at" + t.datetime "discarded_at" + t.datetime "finished_at" end create_table "good_job_executions", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -42,13 +42,16 @@ t.text "error" t.integer "error_event", limit: 2 t.text "error_backtrace", array: true + t.uuid "process_id" t.index ["active_job_id", "created_at"], name: "index_good_job_executions_on_active_job_id_and_created_at" + t.index ["process_id", "created_at"], name: "index_good_job_executions_on_process_id_and_created_at" end create_table "good_job_processes", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| t.datetime "created_at", null: false t.datetime "updated_at", null: false t.jsonb "state" + t.integer "lock_type", limit: 2 end create_table "good_job_settings", id: :uuid, default: -> { "gen_random_uuid()" }, force: :cascade do |t| @@ -81,6 +84,8 @@ t.text "job_class" t.integer "error_event", limit: 2 t.text "labels", array: true + t.uuid "locked_by_id" + t.datetime "locked_at" t.index ["active_job_id", "created_at"], name: "index_good_jobs_on_active_job_id_and_created_at" t.index ["batch_callback_id"], name: "index_good_jobs_on_batch_callback_id", where: "(batch_callback_id IS NOT NULL)" t.index ["batch_id"], name: "index_good_jobs_on_batch_id", where: "(batch_id IS NOT NULL)" @@ -89,8 +94,10 @@ t.index ["cron_key", "cron_at"], name: "index_good_jobs_on_cron_key_and_cron_at_cond", unique: true, where: "(cron_key IS NOT NULL)" t.index ["finished_at"], name: "index_good_jobs_jobs_on_finished_at", where: "((retried_good_job_id IS NULL) AND (finished_at IS NOT NULL))" t.index ["labels"], name: "index_good_jobs_on_labels", where: "(labels IS NOT NULL)", using: :gin + t.index ["locked_by_id"], name: "index_good_jobs_on_locked_by_id", where: "(locked_by_id IS NOT NULL)" t.index ["priority", "created_at"], name: "index_good_job_jobs_for_candidate_lookup", where: "(finished_at IS NULL)" t.index ["priority", "created_at"], name: "index_good_jobs_jobs_on_priority_created_at_when_unfinished", order: { priority: "DESC NULLS LAST" }, where: "(finished_at IS NULL)" + t.index ["priority", "scheduled_at"], name: "index_good_jobs_on_priority_scheduled_at_unfinished_unlocked", where: "((finished_at IS NULL) AND (locked_by_id IS NULL))" t.index ["queue_name", "scheduled_at"], name: "index_good_jobs_on_queue_name_and_scheduled_at", where: "(finished_at IS NULL)" t.index ["scheduled_at"], name: "index_good_jobs_on_scheduled_at", where: "(finished_at IS NULL)" end diff --git a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb index dce96234f..b7e78bf0c 100644 --- a/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb +++ b/lib/generators/good_job/templates/install/migrations/create_good_jobs.rb.erb @@ -30,6 +30,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.text :job_class t.integer :error_event, limit: 2 t.text :labels, array: true + t.uuid :locked_by_id + t.datetime :locked_at end create_table :good_job_batches, id: :uuid do |t| @@ -58,11 +60,13 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> t.text :error t.integer :error_event, limit: 2 t.text :error_backtrace, array: true + t.uuid :process_id end create_table :good_job_processes, id: :uuid do |t| t.timestamps t.jsonb :state + t.integer :lock_type, limit: 2 end create_table :good_job_settings, id: :uuid do |t| @@ -88,5 +92,10 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %> add_index :good_jobs, :labels, using: :gin, where: "(labels IS NOT NULL)", name: :index_good_jobs_on_labels add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at + add_index :good_jobs, [:priority, :scheduled_at], order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id" + add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at end end diff --git a/lib/generators/good_job/templates/update/migrations/13_create_good_job_process_lock_ids.rb.erb b/lib/generators/good_job/templates/update/migrations/13_create_good_job_process_lock_ids.rb.erb new file mode 100644 index 000000000..e1e471ea7 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/13_create_good_job_process_lock_ids.rb.erb @@ -0,0 +1,17 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIds < ActiveRecord::Migration<%= migration_version %> + def change + reversible do |dir| + dir.up do + # Ensure this incremental update migration is idempotent + # with monolithic install migration. + return if connection.column_exists?(:good_jobs, :locked_by_id) + end + end + + add_column :good_jobs, :locked_by_id, :uuid + add_column :good_jobs, :locked_at, :datetime + add_column :good_job_executions, :process_id, :uuid + add_column :good_job_processes, :lock_type, :integer, limit: 2 + end +end diff --git a/lib/generators/good_job/templates/update/migrations/14_create_good_job_process_lock_indexes.rb.erb b/lib/generators/good_job/templates/update/migrations/14_create_good_job_process_lock_indexes.rb.erb new file mode 100644 index 000000000..a6da2fab8 --- /dev/null +++ b/lib/generators/good_job/templates/update/migrations/14_create_good_job_process_lock_indexes.rb.erb @@ -0,0 +1,37 @@ +# frozen_string_literal: true +class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration<%= migration_version %> + disable_ddl_transaction! + + def change + reversible do |dir| + dir.up do + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + add_index :good_jobs, [:priority, :scheduled_at], + order: { priority: "ASC NULLS LAST", scheduled_at: :asc }, + where: "finished_at IS NULL AND locked_by_id IS NULL", + name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + add_index :good_jobs, :locked_by_id, + where: "locked_by_id IS NOT NULL", + name: :index_good_jobs_on_locked_by_id, + algorithm: :concurrently + end + + unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + add_index :good_job_executions, [:process_id, :created_at], + name: :index_good_job_executions_on_process_id_and_created_at, + algorithm: :concurrently + end + end + + dir.down do + remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) + remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id) + remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at) + end + end + end +end diff --git a/lib/good_job.rb b/lib/good_job.rb index 75fcd7ede..5d33a9de0 100644 --- a/lib/good_job.rb +++ b/lib/good_job.rb @@ -19,6 +19,7 @@ require_relative "good_job/bulk" require_relative "good_job/callable" require_relative "good_job/capsule" +require_relative "good_job/capsule_tracker" require_relative "good_job/cleanup_tracker" require_relative "good_job/cli" require_relative "good_job/configuration" @@ -169,11 +170,12 @@ def self.restart(timeout: -1) end # Sends +#shutdown+ or +#restart+ to executable objects ({GoodJob::Notifier}, {GoodJob::Poller}, {GoodJob::Scheduler}, {GoodJob::MultiScheduler}, {GoodJob::CronManager}) - # @param executables [Array] Objects to shut down. + # @param executables [Array] Objects to shut down. # @param method_name [:symbol] Method to call, e.g. +:shutdown+ or +:restart+. # @param timeout [nil,Numeric] + # @param after [Array] Objects to shut down after initial executables shut down. # @return [void] - def self._shutdown_all(executables, method_name = :shutdown, timeout: -1) + def self._shutdown_all(executables, method_name = :shutdown, timeout: -1, after: []) if timeout.is_a?(Numeric) && timeout.positive? executables.each { |executable| executable.send(method_name, timeout: nil) } @@ -182,6 +184,13 @@ def self._shutdown_all(executables, method_name = :shutdown, timeout: -1) else executables.each { |executable| executable.send(method_name, timeout: timeout) } end + return unless after.any? && !timeout.nil? + + if stop_at + after.each { |executable| executable.shutdown(timeout: [stop_at - Time.current, 0].max) } + else + after.each { |executable| executable.shutdown(timeout: timeout) } + end end # Destroys preserved job and batch records. @@ -277,8 +286,8 @@ def self.deprecator # @return [Boolean] def self.migrated? # Always update with the most recent migration check - GoodJob::DiscreteExecution.reset_column_information - GoodJob::DiscreteExecution.backtrace_migrated? + GoodJob::Execution.reset_column_information + GoodJob::Execution.process_lock_migrated? end ActiveSupport.run_load_hooks(:good_job, self) diff --git a/lib/good_job/adapter.rb b/lib/good_job/adapter.rb index 9029ee299..c7a6bda7b 100644 --- a/lib/good_job/adapter.rb +++ b/lib/good_job/adapter.rb @@ -97,16 +97,17 @@ def enqueue_all(active_jobs) end end + @capsule.tracker.register begin until inline_executions.empty? begin inline_execution = inline_executions.shift - inline_result = inline_execution.perform + inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) retried_execution = inline_result.retried while retried_execution && retried_execution.scheduled_at <= Time.current inline_execution = retried_execution - inline_result = inline_execution.perform + inline_result = inline_execution.perform(lock_id: @capsule.tracker.id_for_lock) retried_execution = inline_result.retried end ensure @@ -116,6 +117,7 @@ def enqueue_all(active_jobs) raise inline_result.unhandled_error if inline_result.unhandled_error end ensure + @capsule.tracker.unregister inline_executions.each(&:advisory_unlock) end @@ -168,12 +170,12 @@ def enqueue_at(active_job, timestamp) create_with_advisory_lock: true ) begin - result = execution.perform + result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } retried_execution = result.retried while retried_execution && (retried_execution.scheduled_at.nil? || retried_execution.scheduled_at <= Time.current) execution = retried_execution - result = execution.perform + result = @capsule.tracker.register { execution.perform(lock_id: @capsule.tracker.id_for_lock) } retried_execution = result.retried end diff --git a/lib/good_job/capsule.rb b/lib/good_job/capsule.rb index 8c479e90a..afa82a2bb 100644 --- a/lib/good_job/capsule.rb +++ b/lib/good_job/capsule.rb @@ -11,6 +11,10 @@ class Capsule # @return [Array, nil] cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false + delegate :register, :renew, :unregister, :id_for_lock, to: :@tracker, prefix: :_tracker + + attr_reader :tracker + # @param configuration [GoodJob::Configuration] Configuration to use for this capsule. def initialize(configuration: nil) @configuration = configuration @@ -18,6 +22,9 @@ def initialize(configuration: nil) @started_at = nil @mutex = Mutex.new + @shared_executor = GoodJob::SharedExecutor.new + @tracker = GoodJob::CapsuleTracker.new(executor: @shared_executor) + self.class.instances << self end @@ -29,15 +36,13 @@ def start(force: false) @mutex.synchronize do return unless startable?(force: force) - @shared_executor = GoodJob::SharedExecutor.new - @notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, executor: @shared_executor.executor) + @notifier = GoodJob::Notifier.new(enable_listening: configuration.enable_listen_notify, capsule: self, executor: @shared_executor) @poller = GoodJob::Poller.new(poll_interval: configuration.poll_interval) - @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, warm_cache_on_initialize: true) + @multi_scheduler = GoodJob::MultiScheduler.from_configuration(configuration, capsule: self, warm_cache_on_initialize: true) @notifier.recipients.push([@multi_scheduler, :create_thread]) @poller.recipients.push(-> { @multi_scheduler.create_thread({ fanout: true }) }) - @cron_manager = GoodJob::CronManager.new(configuration.cron_entries, start_on_initialize: true, executor: @shared_executor.executor) if configuration.enable_cron? - + @cron_manager = GoodJob::CronManager.new(configuration.cron_entries, start_on_initialize: true, executor: @shared_executor) if configuration.enable_cron? @startable = false @started_at = Time.current end @@ -52,7 +57,7 @@ def start(force: false) # @return [void] def shutdown(timeout: NONE) timeout = configuration.shutdown_timeout if timeout == NONE - GoodJob._shutdown_all([@shared_executor, @notifier, @poller, @multi_scheduler, @cron_manager].compact, timeout: timeout) + GoodJob._shutdown_all([@notifier, @poller, @multi_scheduler, @cron_manager].compact, after: [@shared_executor], timeout: timeout) @startable = false @started_at = nil end @@ -74,7 +79,7 @@ def running? # @return [Boolean] Whether the capsule has been shutdown. def shutdown? - [@shared_executor, @notifier, @poller, @multi_scheduler, @cron_manager].compact.all?(&:shutdown?) + [@notifier, @poller, @multi_scheduler, @cron_manager].compact.all?(&:shutdown?) end # @param duration [nil, Numeric] Length of idleness to check for (in seconds). @@ -99,6 +104,12 @@ def create_thread(job_state = nil) @multi_scheduler&.create_thread(job_state) end + # UUID for this capsule; to be used for inspection (not directly for locking jobs). + # @return [String] + def process_id + @tracker.process_id + end + private def configuration diff --git a/lib/good_job/capsule_tracker.rb b/lib/good_job/capsule_tracker.rb new file mode 100644 index 000000000..e6a5037cf --- /dev/null +++ b/lib/good_job/capsule_tracker.rb @@ -0,0 +1,227 @@ +# frozen_string_literal: true + +module GoodJob # :nodoc: + # CapsuleTracker save a record in the database and periodically refreshes it. The intention is to + # create a heartbeat that can be used to determine whether a capsule/process is still active + # and use that to lock (or unlock) jobs. + class CapsuleTracker + # The database record used for tracking. + # @return [GoodJob::Process, nil] + attr_reader :record + + # Number of tracked job executions. + attr_reader :locks + + # Number of tracked job executions with advisory locks. + # @return [Integer] + attr_reader :advisory_locks + + # @!attribute [r] instances + # @!scope class + # List of all instantiated CapsuleTrackers in the current process. + # @return [Array, nil] + cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false + + # @param executor [Concurrent::AbstractExecutorService] The executor to use for refreshing the process record. + def initialize(executor: Concurrent.global_io_executor) + @executor = executor + @mutex = Mutex.new + @locks = 0 + @advisory_locked_connection = nil + @record_id = SecureRandom.uuid + @record = nil + @refresh_task = nil + + # AS::ForkTracker is only present on Rails v6.1+. + # Fall back to PID checking if ForkTracker is not available + if defined?(ActiveSupport::ForkTracker) + ActiveSupport::ForkTracker.after_fork { reset } + @forktracker = true + else + @ruby_pid = ::Process.pid + @forktracker = false + end + + self.class.instances << self + end + + # The UUID to use for locking. May be nil if the process is not registered or is unusable/expired. + # If UUID has not yet been persisted to the database, this method will make a query to insert or update it. + # @return [String, nil] + def id_for_lock + value = nil + synchronize do + next if @locks.zero? + + reset_on_fork + if @record + @record.refresh_if_stale + else + @record = GoodJob::Process.create_record(id: @record_id) + create_refresh_task + end + value = @record&.id + end + value + end + + # The expected UUID of the process for use in inspection. + # Use {#id_for_lock} if using this as a lock key. + # @return [String] + def process_id + @record_id + end + + # Registers the current process around a job execution site. + # +register+ is expected to be called multiple times in a process, but should be advisory locked only once (in a single thread). + # @param with_advisory_lock [Boolean] Whether the lock strategy should us an advisory lock; the connection must be retained to support advisory locks. + # @yield [void] If a block is given, the process will be unregistered after the block completes. + # @return [void] + def register(with_advisory_lock: false) + synchronize do + if with_advisory_lock + if @record + if !advisory_locked? || !advisory_locked_connection? + @record.class.transaction do + @record.advisory_lock! + @record.update(lock_type: GoodJob::Process::LOCK_TYPE_ADVISORY) + end + @advisory_locked_connection = WeakRef.new(@record.class.connection) + end + else + @record = GoodJob::Process.create_record(id: @record_id, with_advisory_lock: true) + @advisory_locked_connection = WeakRef.new(@record.class.connection) + create_refresh_task + end + end + + @locks += 1 + end + return unless block_given? + + begin + yield + ensure + unregister(with_advisory_lock: with_advisory_lock) + end + end + + # Unregisters the current process from the database. + # @param with_advisory_lock [Boolean] Whether the lock strategy should unlock an advisory lock; the connection must be able to support advisory locks. + # @return [void] + def unregister(with_advisory_lock: false) + synchronize do + if @locks.zero? + return + elsif @locks == 1 + if @record + if with_advisory_lock && advisory_locked? && advisory_locked_connection? + @record.class.transaction do + @record.advisory_unlock + @record.destroy + end + @advisory_locked_connection = nil + else + @record.destroy + end + @record = nil + end + cancel_refresh_task + elsif with_advisory_lock && advisory_locked? && advisory_locked_connection? + @record.class.transaction do + @record.advisory_unlock + @record.update(lock_type: nil) + end + @advisory_locked_connection = nil + end + + @locks -= 1 unless @locks.zero? + end + end + + # Refreshes the process record in the database. + # @param silent [Boolean] Whether to silence logging. + # @return [void] + def renew(silent: false) + GoodJob::Process.with_logger_silenced(silent: silent) do + @record&.refresh_if_stale(cleanup: true) + end + end + + # Tests whether an active advisory lock has been taken on the record. + # @return [Boolean] + def advisory_locked? + @advisory_locked_connection&.weakref_alive? && @advisory_locked_connection&.active? + end + + # @!visibility private + def task_observer(_time, _output, thread_error) + GoodJob._on_thread_error(thread_error) if thread_error && !thread_error.is_a?(Concurrent::CancelledOperationError) + end + + private + + def advisory_locked_connection? + @record&.class&.connection && @advisory_locked_connection&.weakref_alive? && @advisory_locked_connection.eql?(@record.class.connection) + end + + def task_interval + GoodJob::Process::STALE_INTERVAL + jitter + end + + def jitter + GoodJob::Process::STALE_INTERVAL * 0.1 * Kernel.rand + end + + def create_refresh_task(delay: nil) + return if @refresh_task + return unless @executor + + delay ||= task_interval + @refresh_task = Concurrent::ScheduledTask.new(delay.to_f, executor: @executor) do + Rails.application.executor.wrap do + synchronize do + next unless @locks.positive? + + @refresh_task = nil + create_refresh_task + renew(silent: true) + end + end + end + @refresh_task.add_observer(self, :task_observer) + @refresh_task.execute + end + + def cancel_refresh_task + @refresh_task&.cancel + @refresh_task = nil + end + + def reset + synchronize { ns_reset } + end + + def reset_on_fork + return if Concurrent.on_jruby? + return if @forktracker || ::Process.pid == @ruby_pid + + @ruby_pid = ::Process.pid + ns_reset + end + + def ns_reset + @record_id = SecureRandom.uuid + @record = nil + end + + # Synchronize must always be called from within a Rails Executor; it may deadlock if the order is reversed. + def synchronize(&block) + if @mutex.owned? + yield + else + @mutex.synchronize(&block) + end + end + end +end diff --git a/lib/good_job/job_performer.rb b/lib/good_job/job_performer.rb index 3899d23a6..a0a6c734b 100644 --- a/lib/good_job/job_performer.rb +++ b/lib/good_job/job_performer.rb @@ -14,8 +14,9 @@ class JobPerformer cattr_accessor :performing_active_job_ids, default: Concurrent::Set.new # @param queue_string [String] Queues to execute jobs from - def initialize(queue_string) + def initialize(queue_string, capsule: GoodJob.capsule) @queue_string = queue_string + @capsule = capsule @metrics = Metrics.new end @@ -30,20 +31,22 @@ def name # @return [Object, nil] Returns job result or +nil+ if no job was found def next active_job_id = nil - job_query.perform_with_advisory_lock(parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit) do |execution| - @metrics.touch_check_queue_at - - if execution - active_job_id = execution.active_job_id - performing_active_job_ids << active_job_id - @metrics.touch_execution_at - yield(execution) if block_given? - else - @metrics.increment_empty_executions - end - end.tap do |result| - if result - result.succeeded? ? @metrics.increment_succeeded_executions : @metrics.increment_errored_executions + @capsule.tracker.register do + job_query.perform_with_advisory_lock(lock_id: @capsule.tracker.id_for_lock, parsed_queues: parsed_queues, queue_select_limit: GoodJob.configuration.queue_select_limit) do |execution| + @metrics.touch_check_queue_at + + if execution + active_job_id = execution.active_job_id + performing_active_job_ids << active_job_id + @metrics.touch_execution_at + yield(execution) if block_given? + else + @metrics.increment_empty_executions + end + end.tap do |result| + if result + result.succeeded? ? @metrics.increment_succeeded_executions : @metrics.increment_errored_executions + end end end ensure diff --git a/lib/good_job/multi_scheduler.rb b/lib/good_job/multi_scheduler.rb index 3d161e8c9..f0815c10e 100644 --- a/lib/good_job/multi_scheduler.rb +++ b/lib/good_job/multi_scheduler.rb @@ -7,12 +7,12 @@ class MultiScheduler # @param configuration [GoodJob::Configuration] # @param warm_cache_on_initialize [Boolean] # @return [GoodJob::MultiScheduler] - def self.from_configuration(configuration, warm_cache_on_initialize: false) + def self.from_configuration(configuration, capsule: GoodJob.capsule, warm_cache_on_initialize: false) schedulers = configuration.queue_string.split(';').map(&:strip).map do |queue_string_and_max_threads| queue_string, max_threads = queue_string_and_max_threads.split(':').map { |str| str.strip.presence } max_threads = (max_threads || configuration.max_threads).to_i - job_performer = GoodJob::JobPerformer.new(queue_string) + job_performer = GoodJob::JobPerformer.new(queue_string, capsule: capsule) GoodJob::Scheduler.new( job_performer, max_threads: max_threads, diff --git a/lib/good_job/notifier.rb b/lib/good_job/notifier.rb index a75110508..99cf99f71 100644 --- a/lib/good_job/notifier.rb +++ b/lib/good_job/notifier.rb @@ -2,6 +2,7 @@ require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent/atomic/atomic_boolean' +require "concurrent/scheduled_task" require "good_job/notifier/process_heartbeat" module GoodJob # :nodoc: @@ -62,13 +63,12 @@ def self.notify(message) # @param recipients [Array<#call, Array(Object, Symbol)>] # @param enable_listening [true, false] - # @param executor [Concurrent::ExecutorService] - def initialize(*recipients, enable_listening: true, executor: Concurrent.global_io_executor) + def initialize(*recipients, enable_listening: true, capsule: GoodJob.capsule, executor: Concurrent.global_io_executor) @recipients = Concurrent::Array.new(recipients) @enable_listening = enable_listening @executor = executor - @mutex = Mutex.new + @monitor = Monitor.new @shutdown_event = Concurrent::Event.new.tap(&:set) @running = Concurrent::AtomicBoolean.new(false) @connected = Concurrent::Event.new @@ -77,6 +77,7 @@ def initialize(*recipients, enable_listening: true, executor: Concurrent.global_ @connection_errors_reported = Concurrent::AtomicBoolean.new(false) @enable_listening = enable_listening @task = nil + @capsule = capsule start self.class.instances << self @@ -183,6 +184,8 @@ def listen_observer(_time, _result, thread_error) private + delegate :synchronize, to: :@monitor + def start synchronize do return if @running.true? @@ -211,20 +214,20 @@ def create_listen_task(delay: 0) end while thr_executor.running? && thr_running.true? - run_callbacks :tick do - wait_for_notify do |channel, payload| - next unless channel == CHANNEL - - ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) - parsed_payload = JSON.parse(payload, symbolize_names: true) - thr_recipients.each do |recipient| - target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] - target.send(method_name, parsed_payload) - end - end + Rails.application.executor.wrap { run_callbacks(:tick) } - reset_connection_errors + wait_for_notify do |channel, payload| + next unless channel == CHANNEL + + ActiveSupport::Notifications.instrument("notifier_notified.good_job", { payload: payload }) + parsed_payload = JSON.parse(payload, symbolize_names: true) + thr_recipients.each do |recipient| + target, method_name = recipient.is_a?(Array) ? recipient : [recipient, :call] + target.send(method_name, parsed_payload) + end end + + reset_connection_errors end end ensure @@ -284,13 +287,5 @@ def reset_connection_errors @connection_errors_count.value = 0 @connection_errors_reported.make_false end - - def synchronize(&block) - if @mutex.owned? - yield - else - @mutex.synchronize(&block) - end - end end end diff --git a/lib/good_job/notifier/process_heartbeat.rb b/lib/good_job/notifier/process_heartbeat.rb index d39595986..f830946ef 100644 --- a/lib/good_job/notifier/process_heartbeat.rb +++ b/lib/good_job/notifier/process_heartbeat.rb @@ -16,7 +16,7 @@ module ProcessHeartbeat def register_process GoodJob::Process.override_connection(connection) do GoodJob::Process.cleanup - @process = GoodJob::Process.register + @capsule.tracker.register(with_advisory_lock: true) end end @@ -24,7 +24,7 @@ def refresh_process Rails.application.executor.wrap do GoodJob::Process.override_connection(connection) do GoodJob::Process.with_logger_silenced do - @process&.refresh_if_stale(cleanup: true) + @capsule.tracker.renew end end end @@ -33,7 +33,7 @@ def refresh_process # Deregisters the current process. def deregister_process GoodJob::Process.override_connection(connection) do - @process&.deregister + @capsule.tracker.unregister(with_advisory_lock: true) end end end diff --git a/lib/good_job/shared_executor.rb b/lib/good_job/shared_executor.rb index 32175f68e..76b689b45 100644 --- a/lib/good_job/shared_executor.rb +++ b/lib/good_job/shared_executor.rb @@ -1,7 +1,12 @@ # frozen_string_literal: true +require 'concurrent/executor/executor_service' + module GoodJob class SharedExecutor + # Allow posting tasks directly to instances + include Concurrent::ExecutorService + MAX_THREADS = 2 # @!attribute [r] instances @@ -13,8 +18,21 @@ class SharedExecutor attr_reader :executor def initialize + @mutex = Mutex.new + self.class.instances << self - create_executor + end + + def post(*args, &task) + unless running? + @mutex.synchronize do + next if running? + + create_executor + end + end + + @executor&.post(*args, &task) end def running? diff --git a/sorbet/rbi/todo.rbi b/sorbet/rbi/todo.rbi index aa052d015..3c3d55f43 100644 --- a/sorbet/rbi/todo.rbi +++ b/sorbet/rbi/todo.rbi @@ -18,6 +18,10 @@ module ::JobError; end module ::LATCH; end module ::MemoryProfiler; end module ::PERFORMED; end +module ::POLL_COUNT; end +module ::PROCESS_IDS; end +module ::RECEIVED_MESSAGE; end +module ::REFRESH_IF_STALE_CALLED; end module ::RESULTS; end module ::RUN_JOBS; end module ::RecursiveJob; end diff --git a/spec/app/models/good_job/batch_spec.rb b/spec/app/models/good_job/batch_spec.rb index 1175bff8b..1ff1ad32c 100644 --- a/spec/app/models/good_job/batch_spec.rb +++ b/spec/app/models/good_job/batch_spec.rb @@ -147,7 +147,7 @@ def perform(batch, params) before do stub_const 'CallbackJob', (Class.new(ActiveJob::Base) do def perform(_batch, _options) - puts "HERE" + nil end end) end diff --git a/spec/app/models/good_job/execution_spec.rb b/spec/app/models/good_job/execution_spec.rb index 369683120..dae20e634 100644 --- a/spec/app/models/good_job/execution_spec.rb +++ b/spec/app/models/good_job/execution_spec.rb @@ -3,6 +3,8 @@ require 'rails_helper' RSpec.describe GoodJob::Execution do + let(:process_id) { SecureRandom.uuid } + around do |example| Rails.application.executor.wrap { example.run } end @@ -158,21 +160,21 @@ def perform(result_value = nil, raise_error: false) it 'performs one job' do good_job_2 = described_class.create!(serialized_params: {}) - described_class.perform_with_advisory_lock + described_class.perform_with_advisory_lock(lock_id: process_id) expect(good_job.reload.finished_at).to be_present expect(good_job_2.reload.finished_at).to be_blank end it 'returns the result or nil if not' do - result = described_class.perform_with_advisory_lock + result = described_class.perform_with_advisory_lock(lock_id: process_id) expect(result).to be_a GoodJob::ExecutionResult expect(result.value).to eq 'a string' expect(result.unhandled_error).to be_nil described_class.enqueue(TestJob.new(true, raise_error: true)) - errored_result = described_class.all.perform_with_advisory_lock + errored_result = described_class.all.perform_with_advisory_lock(lock_id: process_id) expect(result).to be_a GoodJob::ExecutionResult expect(errored_result.value).to be_nil @@ -192,7 +194,7 @@ def job_params it "orders by priority ascending and creation descending" do 4.times do - described_class.perform_with_advisory_lock + described_class.perform_with_advisory_lock(lock_id: process_id) end expect(described_class.order(finished_at: :asc).to_a).to eq([ high_priority_job, @@ -213,10 +215,10 @@ def job_params let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) } it "orders by queue order" do - described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution| + described_class.perform_with_advisory_lock(lock_id: process_id, parsed_queues: parsed_queues) do |execution| expect(execution).to eq queue_one_job end - described_class.perform_with_advisory_lock(parsed_queues: parsed_queues) do |execution| + described_class.perform_with_advisory_lock(lock_id: process_id, parsed_queues: parsed_queues) do |execution| expect(execution).to eq queue_two_job end end @@ -382,7 +384,7 @@ def job_params describe 'return value' do it 'returns the results of the job' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to eq "a string" expect(result.unhandled_error).to be_nil @@ -405,7 +407,7 @@ def job_params end it 'returns the error' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.unhandled_error).to be_an_instance_of TestJob::ExpectedError @@ -426,7 +428,7 @@ def job_params end it 'records the new job UUID on the executing record' do - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload.retried_good_job_id).to be_present end end @@ -443,7 +445,7 @@ def job_params allow(GoodJob).to receive(:preserve_job_records).and_return(true) expect do - good_job.perform + good_job.perform(lock_id: process_id) end.not_to change { good_job.reload.serialized_params["exception_executions"]["[TestJob::ExpectedError]"] } end end @@ -464,7 +466,7 @@ def job_params end it 'returns the error' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.unhandled_error).to be_an_instance_of TestJob::ExpectedError @@ -476,7 +478,7 @@ def job_params end it 'returns the error' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.handled_error).to be_an_instance_of TestJob::ExpectedError @@ -489,7 +491,7 @@ def job_params end it 'returns the error' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.handled_error).to be_an_instance_of TestJob::ExpectedError @@ -500,7 +502,7 @@ def job_params end it 'preserves the job by default' do - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( performed_at: within(1.second).of(Time.current), finished_at: within(1.second).of(Time.current) @@ -509,13 +511,13 @@ def job_params it 'can destroy the job when preserve_job_records is false' do allow(GoodJob).to receive(:preserve_job_records).and_return(false) - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound end it 'destroys the job when preserving record only on error' do allow(GoodJob).to receive(:preserve_job_records).and_return(:on_unhandled_error) - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -532,7 +534,7 @@ def job_params it 'destroys the job and prior executions when preserving record only on error' do allow(GoodJob).to receive(:preserve_job_records).and_return(:on_unhandled_error) - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound expect { prior_execution.reload }.to raise_error ActiveRecord::RecordNotFound end @@ -548,7 +550,7 @@ def job_params end it 'does not destroy the execution records' do - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.not_to raise_error expect(described_class.where(active_job_id: good_job.active_job_id).count).to eq 2 end @@ -562,7 +564,7 @@ def job_params end it 'preserves the job record anyway' do - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( performed_at: within(1.second).of(Time.current), finished_at: within(1.second).of(Time.current) @@ -572,7 +574,7 @@ def job_params it 'raises an error if the job is attempted to be re-run' do good_job.update!(finished_at: Time.current) - expect { good_job.perform }.to raise_error described_class::PreviouslyPerformedError + expect { good_job.perform(lock_id: process_id) }.to raise_error described_class::PreviouslyPerformedError end context 'when ActiveJob rescues an error' do @@ -583,7 +585,7 @@ def job_params end it 'returns the results of the job' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.handled_error).to be_a(TestJob::ExpectedError) @@ -592,7 +594,7 @@ def job_params it 'can preserves the job' do allow(GoodJob).to receive(:preserve_job_records).and_return(true) - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( error: "TestJob::ExpectedError: Raised expected error", @@ -606,7 +608,7 @@ def job_params let(:active_job) { TestJob.new("a string", raise_error: true) } it 'returns the results of the job' do - result = good_job.perform + result = good_job.perform(lock_id: process_id) expect(result.value).to be_nil expect(result.unhandled_error).to be_a(TestJob::ExpectedError) @@ -621,7 +623,7 @@ def job_params it 'leaves the job record unfinished' do allow(GoodJob).to receive(:preserve_job_records).and_return(true) - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( error: "TestJob::ExpectedError: Raised expected error", @@ -633,7 +635,7 @@ def job_params it 'does not destroy the job record' do allow(GoodJob).to receive(:preserve_job_records).and_return(false) - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.not_to raise_error end end @@ -646,14 +648,14 @@ def job_params it 'destroys the job' do allow(GoodJob).to receive(:preserve_job_records).and_return(false) - good_job.perform + good_job.perform(lock_id: process_id) expect { good_job.reload }.to raise_error ActiveRecord::RecordNotFound end it 'can preserve the job' do allow(GoodJob).to receive(:preserve_job_records).and_return(true) - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( error: "TestJob::ExpectedError: Raised expected error", @@ -664,7 +666,7 @@ def job_params it 'preserves the job when preserving record only on error' do allow(GoodJob).to receive(:preserve_job_records).and_return(:on_unhandled_error) - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( error: "TestJob::ExpectedError: Raised expected error", @@ -684,7 +686,7 @@ def job_params end it 'updates the Execution record and creates a DiscreteExecution record' do - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( executions_count: 1, @@ -717,7 +719,7 @@ def job_params end it 'updates the existing Execution/Job record instead of creating a new one' do - expect { good_job.perform } + expect { good_job.perform(lock_id: process_id) } .to not_change(described_class, :count) .and change { good_job.reload.serialized_params["executions"] }.by(1) .and not_change { good_job.reload.id } @@ -750,7 +752,7 @@ def job_params end it 'finishes the execution but does not finish the job' do - good_job.perform + good_job.perform(lock_id: process_id) expect(good_job.reload).to have_attributes( performed_at: nil, diff --git a/spec/app/models/good_job/process_spec.rb b/spec/app/models/good_job/process_spec.rb index b6732638e..19094bdc6 100644 --- a/spec/app/models/good_job/process_spec.rb +++ b/spec/app/models/good_job/process_spec.rb @@ -3,23 +3,17 @@ require 'rails_helper' RSpec.describe GoodJob::Process do - describe '.current_id' do - it 'returns a uuid that does not change' do - value = described_class.current_id - expect(value).to be_present - - expect(described_class.current_id).to eq value + describe 'Scopes' do + describe '.active' do + it 'returns active processes' do + expect(described_class.active.count).to eq 0 + end end - it 'changes when the PID changes' do - allow(Process).to receive(:pid).and_return(1) - original_value = described_class.current_id - - allow(Process).to receive(:pid).and_return(2) - expect(described_class.current_id).not_to eq original_value - - # Unstub the pid or RSpec/DatabaseCleaner may fail - RSpec::Mocks.space.proxy_for(Process).reset + describe '.inactive' do + it 'returns inactive processes' do + expect(described_class.inactive.count).to eq 0 + end end end @@ -38,7 +32,7 @@ describe '.ns_current_state' do it 'contains information about the process' do - expect(described_class.ns_current_state).to include( + expect(described_class.process_state).to include( database_connection_pool: include( size: be_an(Integer), active: be_an(Integer) @@ -47,31 +41,6 @@ end end - describe '.register' do - it 'registers the process' do - process = nil - expect do - process = described_class.register - end.to change(described_class, :count).by(1) - - process.deregister - end - - context 'when there is already an existing record' do - it 'returns the existing record' do - described_class.create!(id: described_class.current_id) - expect(described_class.register).to be_a described_class - end - end - end - - describe '#deregister' do - it 'deregisters the record' do - process = described_class.register - expect { process.deregister }.to change(described_class, :count).by(-1) - end - end - describe '#basename' do let(:process) { described_class.new state: {} } @@ -95,11 +64,11 @@ end context 'when the record has been deleted elsewhere' do - it 'returns false' do + it 'creates a new record' do process = described_class.create! state: {}, updated_at: 1.day.ago described_class.where(id: process.id).delete_all - expect(process.refresh).to be false + expect { process.refresh }.to change(described_class, :count).from(0).to(1) end end end diff --git a/spec/integration/process_locks_spec.rb b/spec/integration/process_locks_spec.rb new file mode 100644 index 000000000..5e9e4ddcf --- /dev/null +++ b/spec/integration/process_locks_spec.rb @@ -0,0 +1,46 @@ +# frozen_string_literal: true + +require 'rails_helper' + +RSpec.describe 'Process Locks' do + let(:capsule) { GoodJob::Capsule.new } + let(:inline_adapter) { GoodJob::Adapter.new(execution_mode: :inline, _capsule: capsule) } + let(:async_adapter) { GoodJob::Adapter.new(execution_mode: :async, _capsule: capsule) } + + before do + capsule.start + + stub_const("PROCESS_IDS", Concurrent::Array.new) + stub_const "TestJob", (Class.new(ActiveJob::Base) do + def perform + PROCESS_IDS << GoodJob::Job.where(id: provider_job_id).pick(:locked_by_id) + end + end) + end + + after do + capsule.shutdown + end + + it 'stores process_id in inline job' do + TestJob.queue_adapter = inline_adapter + TestJob.perform_later + + wait_until { expect(GoodJob::Job.last.finished_at).to be_present } + expect(PROCESS_IDS.size).to eq 1 + expect(PROCESS_IDS.first).to be_a_uuid + expect(PROCESS_IDS.first).to eq capsule.process_id + expect(GoodJob::Job.last.locked_by_id).to be_nil + end + + it 'stores process_id in async job' do + TestJob.queue_adapter = async_adapter + TestJob.perform_later + + wait_until { expect(GoodJob::Job.pick(:finished_at)).to be_present } + expect(PROCESS_IDS.size).to eq 1 + expect(PROCESS_IDS.first).to be_a_uuid + expect(PROCESS_IDS.first).to eq capsule.process_id + expect(GoodJob::Job.last.locked_by_id).to be_nil + end +end diff --git a/spec/lib/good_job/capsule_tracker_spec.rb b/spec/lib/good_job/capsule_tracker_spec.rb new file mode 100644 index 000000000..a9eb29067 --- /dev/null +++ b/spec/lib/good_job/capsule_tracker_spec.rb @@ -0,0 +1,184 @@ +# frozen_string_literal: true + +require 'rails_helper' + +describe GoodJob::CapsuleTracker do + let(:tracker) { described_class.new } + + describe '#register' do + context 'when used with an advisory lock' do + it 'creates a Process and sets the lock_type' do + expect do + tracker.register(with_advisory_lock: true) + end.to change(GoodJob::Process, :count).by(1) + + process = GoodJob::Process.last + expect(process.lock_type).to eq('advisory') + + tracker.unregister(with_advisory_lock: true) + + expect(GoodJob::Process.count).to eq 0 + + Rails.logger.warn("DONE WITH TEST") + end + + it 'takes an advisory lock even when process already exists' do + tracker.register do + expect(GoodJob::Process.count).to eq 0 + + tracker.id_for_lock + + process = GoodJob::Process.last + expect(process).to be_present + expect(process).not_to be_advisory_locked + expect(process.lock_type).to eq(nil) + + tracker.register(with_advisory_lock: true) do + process.reload + expect(process).to be_advisory_locked + expect(process.lock_type).to eq('advisory') + end + + process.reload + expect(process).not_to be_advisory_locked + expect(process.lock_type).to eq(nil) + end + + expect(GoodJob::Process.count).to eq 0 + end + + it 'increments the number of locks when not used with a block' do + expect do + tracker.register(with_advisory_lock: true) + end.to change(tracker, :locks).by(1) + tracker.unregister(with_advisory_lock: true) + end + + it 'tracks multiple locks and advisory locks' do + tracker.register(with_advisory_lock: true) + tracker.register + tracker.register(with_advisory_lock: true) + tracker.register(with_advisory_lock: true) + + expect(GoodJob::Process.count).to eq 1 + expect(tracker.record.lock_type).to eq('advisory') + expect(tracker.locks).to eq(4) + expect(tracker).to be_advisory_locked + + tracker.unregister(with_advisory_lock: true) + + expect(GoodJob::Process.count).to eq 1 + expect(tracker.record.lock_type).to eq(nil) + expect(tracker.locks).to eq(3) + expect(tracker).not_to be_advisory_locked + + tracker.unregister(with_advisory_lock: true) + tracker.unregister + tracker.unregister(with_advisory_lock: true) + + expect(GoodJob::Process.count).to eq 0 + expect(tracker.locks).to eq(0) + expect(tracker).not_to be_advisory_locked + end + end + + context 'when NOT used with an advisory lock' do + it 'does not create a Process' do + expect do + tracker.register + end.not_to change(GoodJob::Process, :count) + + tracker.unregister + end + + it 'increments the number of locks when not used with a block' do + expect do + tracker.register + end.to change(tracker, :locks).by(1) + tracker.unregister + end + end + + it 'creates a ScheduledTask that refreshes the process in the background' do + stub_const("GoodJob::Process::STALE_INTERVAL", 0.1.seconds) + + tracker.register do + tracker.id_for_lock + updated_at = GoodJob::Process.first.updated_at + wait_until(max: 1) { expect(GoodJob::Process.first.updated_at).to be > updated_at } + end + + tracker.register(with_advisory_lock: true) do + tracker.id_for_lock + updated_at = GoodJob::Process.first.updated_at + wait_until(max: 1) { expect(GoodJob::Process.first.updated_at).to be > updated_at } + end + end + + it 'resets the number of locks when used with a block' do + called_block = nil + tracker.register do + called_block = true + expect(tracker.locks).to eq 1 + end + + expect(called_block).to be true + expect(tracker.locks).to eq 0 + end + + it 'removes the process when locks are zero' do + inner_block_called = nil + tracker.register do + lock_id = tracker.id_for_lock + expect(lock_id).to be_present + + expect(GoodJob::Process.count).to eq 1 + tracker.register do + inner_block_called = true + expect(tracker.id_for_lock).to eq(lock_id) + expect(GoodJob::Process.count).to eq 1 + end + expect(GoodJob::Process.count).to eq 1 + expect(tracker.id_for_lock).to eq(lock_id) + end + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + + expect(inner_block_called).to be true + end + end + + describe '#process_id' do + it 'is a UUID' do + expect(tracker.process_id).to be_a_uuid + end + end + + describe '.id_for_lock' do + it 'is available if the process has been registered' do + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + + tracker.register do + expect(tracker.id_for_lock).to be_present + end + + expect(GoodJob::Process.count).to eq 0 + expect(tracker.id_for_lock).to be_nil + end + + describe "on fork" do + it 'when reset via ForkTracker' do + skip("AS::ForkTracker is not defined") unless defined?(ActiveSupport::ForkTracker) + + tracker.register do + original_value = tracker.id_for_lock + + tracker.send(:reset) + + expect(tracker.id_for_lock).not_to eq original_value + end + end + end + end +end diff --git a/spec/lib/good_job/notifier_spec.rb b/spec/lib/good_job/notifier_spec.rb index 649d573f8..c6f888815 100644 --- a/spec/lib/good_job/notifier_spec.rb +++ b/spec/lib/good_job/notifier_spec.rb @@ -184,25 +184,11 @@ wait_until { expect(GoodJob::Process.count).to eq 1 } process = GoodJob::Process.first - expect(process.id).to eq GoodJob::Process.current_id + expect(process.id).to eq GoodJob.capsule.tracker.id_for_lock expect(process).to be_advisory_locked notifier.shutdown expect { process.reload }.to raise_error ActiveRecord::RecordNotFound end - - context 'when, for some reason, the process already exists' do - it 'does not create a new process' do - process = GoodJob::Process.register - notifier = described_class.new(enable_listening: true) - - wait_until { expect(notifier).to be_listening } - expect(GoodJob::Process.count).to eq 1 - - notifier.shutdown - expect(process.reload).to eq process - process.advisory_unlock - end - end end end diff --git a/spec/lib/good_job/scheduler_spec.rb b/spec/lib/good_job/scheduler_spec.rb index 3180a0cd3..54ef14808 100644 --- a/spec/lib/good_job/scheduler_spec.rb +++ b/spec/lib/good_job/scheduler_spec.rb @@ -5,10 +5,6 @@ RSpec.describe GoodJob::Scheduler do let(:performer) { GoodJob::JobPerformer.new('*') } - after do - described_class.instances.each(&:shutdown) - end - describe '#name' do it 'is human readable and contains configuration values' do scheduler = described_class.new(performer) diff --git a/spec/lib/good_job/shared_executor_spec.rb b/spec/lib/good_job/shared_executor_spec.rb index 0474c0cbe..7217990ce 100644 --- a/spec/lib/good_job/shared_executor_spec.rb +++ b/spec/lib/good_job/shared_executor_spec.rb @@ -23,7 +23,7 @@ expect do shared_executor.restart - end.to change(shared_executor, :running?).from(false).to(true) + end.to change(shared_executor, :running?).from(nil).to(true) end end end diff --git a/spec/support/reset_good_job.rb b/spec/support/reset_good_job.rb index 8dd42054e..d484c6187 100644 --- a/spec/support/reset_good_job.rb +++ b/spec/support/reset_good_job.rb @@ -45,6 +45,11 @@ ) GoodJob._shutdown_all(executables, timeout: -1) + GoodJob::CapsuleTracker.instances.each do |tracker| + expect(tracker.locks).to eq 0 + end + GoodJob::CapsuleTracker.instances.clear + expect(GoodJob::Notifier.instances).to all be_shutdown GoodJob::Notifier.instances.clear