Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add association between Process and Jobs, and add a heartbeat to the Process record #999

Merged
merged 1 commit into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ def candidate_lookup_index_migrated?
migration_pending_warning!
false
end

def process_lock_migrated?
return true if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)

migration_pending_warning!
false
end
end

# The ActiveJob job class, as a string
Expand Down
42 changes: 33 additions & 9 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,15 @@ def self.enqueue_args(active_job, overrides = {})
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock(parsed_queues: nil, queue_select_limit: nil)
def self.perform_with_advisory_lock(lock_id:, parsed_queues: nil, queue_select_limit: nil)
execution = nil
result = nil

unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(select_limit: queue_select_limit) do |executions|
execution = executions.first
if execution&.executable?
yield(execution) if block_given?
result = execution.perform
result = execution.perform(lock_id: lock_id)
else
execution = nil
yield(nil) if block_given?
Expand Down Expand Up @@ -367,7 +367,7 @@ def self.format_error(error)
# An array of the return value of the job's +#perform+ method and the
# exception raised by the job, if any. If the job completed successfully,
# the second array entry (the exception) will be +nil+ and vice versa.
def perform
def perform(lock_id:)
run_callbacks(:perform) do
raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at

Expand Down Expand Up @@ -397,17 +397,37 @@ def perform

if discrete?
transaction do
discrete_execution = discrete_executions.create!(
discrete_execution_attrs = {
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: job_performed_at
)
update!(performed_at: job_performed_at, executions_count: ((executions_count || 0) + 1))
created_at: job_performed_at,
}
discrete_execution_attrs[:process_id] = lock_id if GoodJob::DiscreteExecution.columns_hash.key?("process_id")

execution_attrs = {
performed_at: job_performed_at,
executions_count: ((executions_count || 0) + 1),
}
if GoodJob::Execution.columns_hash.key?("locked_by_id")
execution_attrs[:locked_by_id] = lock_id
execution_attrs[:locked_at] = Time.current
end

discrete_execution = discrete_executions.create!(discrete_execution_attrs)
update!(execution_attrs)
end
else
update!(performed_at: job_performed_at)
execution_attrs = {
performed_at: job_performed_at,
}
if GoodJob::Execution.columns_hash.key?("locked_by_id")
execution_attrs[:locked_by_id] = lock_id
execution_attrs[:locked_at] = Time.current
end

update!(execution_attrs)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
Expand Down Expand Up @@ -450,7 +470,11 @@ def perform
end
end

job_attributes = {}
job_attributes = if self.class.columns_hash.key?("locked_by_id")
{ locked_by_id: nil, locked_at: nil }
else
{}
end

job_error = result.handled_error || result.unhandled_error
if job_error
Expand Down
1 change: 1 addition & 0 deletions app/models/good_job/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def table_name=(_value)
self.implicit_order_column = 'created_at'

belongs_to :batch, class_name: 'GoodJob::BatchRecord', inverse_of: :jobs, optional: true
belongs_to :locked_by_process, class_name: "GoodJob::Process", foreign_key: :locked_by_id, inverse_of: :locked_jobs, optional: true
has_many :executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::Execution', foreign_key: 'active_job_id', inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent
has_many :discrete_executions, -> { order(created_at: :asc) }, class_name: 'GoodJob::DiscreteExecution', foreign_key: 'active_job_id', primary_key: :active_job_id, inverse_of: :job # rubocop:disable Rails/HasManyOrHasOneDependent

Expand Down
151 changes: 82 additions & 69 deletions app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'socket'

module GoodJob # :nodoc:
# ActiveRecord model that represents an GoodJob process (either async or CLI).
# Active Record model that represents a GoodJob capsule/process (either async or CLI).
class Process < BaseRecord
include AdvisoryLockable
include OverridableConnection
Expand All @@ -15,107 +15,120 @@ class Process < BaseRecord

self.table_name = 'good_job_processes'
self.implicit_order_column = 'created_at'
LOCK_TYPES = [
LOCK_TYPE_ADVISORY = 'advisory',
].freeze

cattr_reader :mutex, default: Mutex.new
cattr_accessor :_current_id, default: nil
cattr_accessor :_pid, default: nil
LOCK_TYPE_ENUMS = {
LOCK_TYPE_ADVISORY => 1,
}.freeze

self.table_name = 'good_job_processes'

has_many :locked_jobs, class_name: "GoodJob::Job", foreign_key: :locked_by_id, inverse_of: :locked_by_process, dependent: nil
after_destroy { locked_jobs.update_all(locked_by_id: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") } # rubocop:disable Rails/SkipsModelValidations

# Processes that are active and locked.
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :active, -> { advisory_locked }
scope :active, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_locked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].gt(EXPIRED_INTERVAL.ago)))
else
advisory_locked
end
end)

# Processes that are inactive and unlocked (e.g. SIGKILLed)
# @!method active
# @!scope class
# @return [ActiveRecord::Relation]
scope :inactive, -> { advisory_unlocked }

# UUID that is unique to the current process and changes when forked.
# @return [String]
def self.current_id
mutex.synchronize { ns_current_id }
end
scope :inactive, (lambda do
if lock_type_migrated?
query = joins_advisory_locks
query.where(lock_type: LOCK_TYPE_ENUMS[LOCK_TYPE_ADVISORY]).advisory_unlocked
.or(query.where(lock_type: nil).where(arel_table[:updated_at].lt(EXPIRED_INTERVAL.ago)))
else
advisory_unlocked
end
end)

def self.ns_current_id
if _current_id.nil? || _pid != ::Process.pid
self._current_id = SecureRandom.uuid
self._pid = ::Process.pid
# Deletes all inactive process records.
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.columns_hash.key?("locked_by_id") # rubocop:disable Rails/SkipsModelValidations
process.delete
end
_current_id
end

# Hash representing metadata about the current process.
# @return [Hash]
def self.current_state
mutex.synchronize { ns_current_state }
# @return [Boolean]
def self.lock_type_migrated?
columns_hash["lock_type"].present?
end

def self.ns_current_state
total_succeeded_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count, 0) }
total_errored_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count, 0) }
total_empty_executions_count = GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:empty_executions_count, 0) }
def self.create_record(id:, with_advisory_lock: false)
attributes = {
id: id,
state: process_state,
}
if with_advisory_lock
attributes[:create_with_advisory_lock] = true
attributes[:lock_type] = LOCK_TYPE_ADVISORY if lock_type_migrated?
end
create!(attributes)
end

def self.process_state
{
id: ns_current_id,
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
preserve_job_records: GoodJob.preserve_job_records,
retry_on_unhandled_error: GoodJob.retry_on_unhandled_error,
schedulers: GoodJob::Scheduler.instances.map(&:stats),
cron_enabled: GoodJob.configuration.enable_cron?,
total_succeeded_executions_count: total_succeeded_executions_count,
total_errored_executions_count: total_errored_executions_count,
total_executions_count: total_succeeded_executions_count + total_errored_executions_count,
total_empty_executions_count: total_empty_executions_count,
total_succeeded_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:succeeded_executions_count) },
total_errored_executions_count: GoodJob::Scheduler.instances.sum { |scheduler| scheduler.stats.fetch(:errored_executions_count) },
database_connection_pool: {
size: connection_pool.size,
active: connection_pool.connections.count(&:in_use?),
},
}
end

# Deletes all inactive process records.
def self.cleanup
inactive.delete_all
end

# Registers the current process in the database
# @return [GoodJob::Process]
def self.register
mutex.synchronize do
process_state = ns_current_state
create(id: process_state[:id], state: process_state, create_with_advisory_lock: true)
rescue ActiveRecord::RecordNotUnique
find(ns_current_state[:id])
end
end

def refresh
mutex.synchronize do
reload
update(state: self.class.ns_current_state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
false
end
self.state = self.class.process_state
reload.update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end

# Unregisters the instance.
def deregister
return unless owns_advisory_lock?
def refresh_if_stale(cleanup: false)
return unless stale?

mutex.synchronize do
destroy!
advisory_unlock
end
result = refresh
self.class.cleanup if cleanup
result
end

def state
super || {}
end

def stale?
updated_at < STALE_INTERVAL.ago
end

def expired?
updated_at < EXPIRED_INTERVAL.ago
end

def basename
File.basename(state.fetch("proctitle", ""))
end
Expand All @@ -124,20 +137,20 @@ def schedulers
state.fetch("schedulers", [])
end

def refresh_if_stale(cleanup: false)
return unless stale?
def lock_type
return unless self.class.columns_hash['lock_type']

result = refresh
self.class.cleanup if cleanup
result
enum = super
LOCK_TYPE_ENUMS.key(enum) if enum
end

def stale?
updated_at < STALE_INTERVAL.ago
end
def lock_type=(value)
return unless self.class.columns_hash['lock_type']

def expired?
updated_at < EXPIRED_INTERVAL.ago
enum = LOCK_TYPE_ENUMS[value]
raise(ArgumentError, "Invalid error_event: #{value}") if value && !enum

super(enum)
end
end
end
17 changes: 17 additions & 0 deletions demo/db/migrate/20240518175057_create_good_job_process_lock_ids.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIds < ActiveRecord::Migration[7.1]
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :locked_by_id)
end
end

add_column :good_jobs, :locked_by_id, :uuid
add_column :good_jobs, :locked_at, :datetime
add_column :good_job_executions, :process_id, :uuid
add_column :good_job_processes, :lock_type, :integer, limit: 2
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration[7.1]
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
add_index :good_jobs, [:priority, :scheduled_at],
order: { priority: "ASC NULLS LAST", scheduled_at: :asc },
where: "finished_at IS NULL AND locked_by_id IS NULL",
name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL",
name: :index_good_jobs_on_locked_by_id,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
add_index :good_job_executions, [:process_id, :created_at],
name: :index_good_job_executions_on_process_id_and_created_at,
algorithm: :concurrently
end
end

dir.down do
remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
end
end
end
end