Skip to content

Commit

Permalink
Ensure Process records always exist when executing jobs and are refre…
Browse files Browse the repository at this point in the history
…shed in background
  • Loading branch information
bensheldon committed Aug 25, 2023
1 parent 3cff9ee commit dea2385
Show file tree
Hide file tree
Showing 28 changed files with 663 additions and 161 deletions.
7 changes: 7 additions & 0 deletions app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ def error_event_migrated?
migration_pending_warning!
false
end

def process_lock_migrated?
return true if columns_hash["locked_by_id"].present?

migration_pending_warning!
false
end
end

# The ActiveJob job class, as a string
Expand Down
30 changes: 21 additions & 9 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def self.enqueue_args(active_job, overrides = {})
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil, capsule: GoodJob.capsule)
execution = nil
result = nil
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true, select_limit: queue_select_limit) do |executions|
Expand All @@ -266,7 +266,9 @@ def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
end

yield(execution) if block_given?
result = execution.perform
capsule.process_tracker.register do
result = execution.perform(id_for_lock: capsule.process_tracker.id_for_lock)
end
end
execution&.run_callbacks(:perform_unlocked)

Expand Down Expand Up @@ -356,7 +358,7 @@ def self.format_error(error)
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
def perform(id_for_lock: nil)
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

Expand Down Expand Up @@ -385,17 +387,23 @@ def perform
if discrete?
transaction do
now = Time.current
discrete_execution = discrete_executions.create!(
discrete_execution = discrete_executions.create!({
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: now
)
created_at: now,
}.tap do |args|
args[:process_id] = id_for_lock if self.class.process_lock_migrated?
end)

assign_attributes(locked_by_id: id_for_lock, locked_at: now) if self.class.process_lock_migrated?
update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
end
else
update!(performed_at: Time.current)
now = Time.current
assign_attributes(locked_by_id: id_for_lock, locked_at: now) if self.class.process_lock_migrated?
update!(performed_at: now)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
Expand Down Expand Up @@ -439,7 +447,6 @@ def perform
end

job_error = result.handled_error || result.unhandled_error

if job_error
error_string = self.class.format_error(job_error)
self.error = error_string
Expand All @@ -452,8 +459,13 @@ def perform
self.error = nil
self.error_event = nil if self.class.error_event_migrated?
end

reenqueued = result.retried? || retried_good_job_id.present?

if self.class.process_lock_migrated?
self.locked_by_id = nil
self.locked_at = nil
end

if result.unhandled_error && GoodJob.retry_on_unhandled_error
if discrete_execution
transaction do
Expand Down
137 changes: 76 additions & 61 deletions app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,71 @@ class Process < BaseRecord
# Interval until the process record is treated as expired
EXPIRED_INTERVAL = 5.minutes

self.table_name = 'good_job_processes'
LOCK_TYPES = [
LOCK_TYPE_ADVISORY = 'advisory',
].freeze

LOCK_TYPE_ENUMS = {
LOCK_TYPE_ADVISORY => 1,
}.freeze

cattr_reader :mutex, default: Mutex.new
cattr_accessor :_current_id, default: nil
cattr_accessor :_pid, default: nil
self.table_name = 'good_job_processes'

# Processes that are active and locked.
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :active, -> { advisory_locked }
scope :active, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
else
advisory_locked
end
end)

# Processes that are inactive and unlocked (e.g. SIGKILLed)
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :inactive, -> { advisory_unlocked }
scope :inactive, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_unlocked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].lt(EXPIRED_INTERVAL.ago)))
else
advisory_unlocked
end
end)

# UUID that is unique to the current process and changes when forked.
# @return [String]
def self.current_id
mutex.synchronize { ns_current_id }
# Deletes all inactive process records.
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.process_lock_migrated? # rubocop:disable Rails/SkipsModelValidations
process.delete
end
end

def self.ns_current_id
if _current_id.nil? || _pid != ::Process.pid
self._current_id = SecureRandom.uuid
self._pid = ::Process.pid
end
_current_id
# @return [Boolean]
def self.lock_type_migrated?
columns_hash["lock_type"].present?
end

# Hash representing metadata about the current process.
# @return [Hash]
def self.current_state
mutex.synchronize { ns_current_state }
def self.create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: generate_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = LOCK_TYPE_ADVISORY if lock_type_migrated?
end
create!(attributes)
end

def self.ns_current_state
def self.generate_state
{
id: ns_current_id,
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
Expand All @@ -70,45 +94,36 @@ def self.ns_current_state
}
end

# Deletes all inactive process records.
def self.cleanup
inactive.delete_all
end

# Registers the current process in the database
# @return [GoodJob::Process]
def self.register
mutex.synchronize do
process_state = ns_current_state
create(id: process_state[:id], state: process_state, create_with_advisory_lock: true)
rescue ActiveRecord::RecordNotUnique
find(ns_current_state[:id])
end
end

def refresh
mutex.synchronize do
reload
update(state: self.class.ns_current_state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
false
end
self.state = self.class.generate_state
reload.update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end

# Unregisters the instance.
def deregister
return unless owns_advisory_lock?
def refresh_if_stale(cleanup: false)
return unless stale?

mutex.synchronize do
destroy!
advisory_unlock
end
result = refresh
self.class.cleanup if cleanup
result
end

def state
super || {}
end

def stale?
updated_at < STALE_INTERVAL.ago
end

def expired?
updated_at < EXPIRED_INTERVAL.ago
end

def basename
File.basename(state.fetch("proctitle", ""))
end
Expand All @@ -117,20 +132,20 @@ def schedulers
state.fetch("schedulers", [])
end

def refresh_if_stale(cleanup: false)
return unless stale?
def lock_type
return unless self.class.columns_hash['lock_type']

result = refresh
self.class.cleanup if cleanup
result
enum = super
LOCK_TYPE_ENUMS.key(enum) if enum
end

def stale?
updated_at < STALE_INTERVAL.ago
end
def lock_type=(value)
return unless self.class.columns_hash['lock_type']

def expired?
updated_at < EXPIRED_INTERVAL.ago
enum = LOCK_TYPE_ENUMS[value]
raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum

super(enum)
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.integer :executions_count
t.text :job_class
t.integer :error_event, limit: 2
t.uuid :locked_by_id
t.datetime :locked_at
end

create_table :good_job_batches, id: :uuid do |t|
Expand Down Expand Up @@ -56,11 +58,13 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :finished_at
t.text :error
t.integer :error_event, limit: 2
t.uuid :process_id
end

create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
t.integer :lock_type, limit: 2
end

create_table :good_job_settings, id: :uuid do |t|
Expand All @@ -82,7 +86,11 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished
add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL"
add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL"

add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
add_index :good_jobs, [:priority, :scheduled_at], order: { priority: "ASC NULLS LAST", scheduled_at: :asc },
where: "finished_at IS NULL AND locked_by_id IS NULL", name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id"
add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIds < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :locked_by_id)
end
end

add_column :good_jobs, :locked_by_id, :uuid
add_column :good_jobs, :locked_at, :datetime
add_column :good_job_executions, :process_id, :uuid
add_column :good_job_processes, :lock_type, :integer, limit: 2
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration<%= migration_version %>
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
add_index :good_jobs, [:priority, :scheduled_at],
order: { priority: "ASC NULLS LAST", scheduled_at: :asc },
where: "finished_at IS NULL AND locked_by_id IS NULL",
name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL",
name: :index_good_jobs_on_locked_by_id,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
add_index :good_job_executions, [:process_id, :created_at],
name: :index_good_job_executions_on_process_id_and_created_at,
algorithm: :concurrently
end
end

dir.down do
remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
end
end
end
end
3 changes: 2 additions & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
require "good_job/poller"
require "good_job/http_server"
require "good_job/probe_server"
require "good_job/process_tracker"
require "good_job/scheduler"
require "good_job/shared_executor"
require "good_job/systemd_service"
Expand Down Expand Up @@ -258,7 +259,7 @@ def self.deprecator
def self.migrated?
# Always update with the most recent migration check
GoodJob::Execution.reset_column_information
GoodJob::Execution.error_event_migrated?
GoodJob::Execution.process_lock_migrated?
end

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down

0 comments on commit dea2385

Please sign in to comment.