Skip to content

Commit

Permalink
Add columns for future process-based locking strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
bensheldon committed Jul 21, 2023
1 parent 58a49ea commit e2b9d2d
Show file tree
Hide file tree
Showing 22 changed files with 431 additions and 155 deletions.
6 changes: 5 additions & 1 deletion app/models/good_job/base_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ def discrete_support?
end

def error_event_migrated?
return true if columns_hash["error_event"].present?
columns_hash["error_event"].present?
end

def process_lock_migrated?
return true if columns_hash["locked_by_id"].present?

migration_pending_warning!
false
Expand Down
24 changes: 18 additions & 6 deletions app/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,28 @@ def perform
end
end

# TODO: the lock ID should be injected rather than grabbed globally
current_process_id = GoodJob.capsule.process_tracker.id_for_lock
if discrete?
transaction do
now = Time.current
discrete_execution = discrete_executions.create!(
discrete_execution = discrete_executions.create!({
job_class: job_class,
queue_name: queue_name,
serialized_params: serialized_params,
scheduled_at: (scheduled_at || created_at),
created_at: now
)
created_at: now,
}.tap do |args|
args[:process_id] = current_process_id if self.class.process_lock_migrated?
end)

assign_attributes(locked_by_id: current_process_id, locked_at: now) if self.class.process_lock_migrated?
update!(performed_at: now, executions_count: ((executions_count || 0) + 1))
end
else
update!(performed_at: Time.current)
now = Time.current
assign_attributes(locked_by_id: current_process_id, locked_at: now) if self.class.process_lock_migrated?
update!(performed_at: now)
end

ActiveSupport::Notifications.instrument("perform_job.good_job", { execution: self, process_id: current_thread.process_id, thread_name: current_thread.thread_name }) do |instrument_payload|
Expand Down Expand Up @@ -442,7 +450,6 @@ def perform
end

job_error = result.handled_error || result.unhandled_error

if job_error
error_string = self.class.format_error(job_error)
self.error = error_string
Expand All @@ -455,8 +462,13 @@ def perform
self.error = nil
self.error_event = nil if self.class.error_event_migrated?
end

reenqueued = result.retried? || retried_good_job_id.present?

if self.class.process_lock_migrated?
self.locked_by_id = nil
self.locked_at = nil
end

if result.unhandled_error && GoodJob.retry_on_unhandled_error
if discrete_execution
transaction do
Expand Down
104 changes: 41 additions & 63 deletions app/models/good_job/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ class Process < BaseRecord

self.table_name = 'good_job_processes'

cattr_reader :mutex, default: Mutex.new
cattr_accessor :_current_id, default: nil
cattr_accessor :_pid, default: nil
LOCK_TYPE_ADVISORY = "advisory"
enum lock_type: {
LOCK_TYPE_ADVISORY => 1,
}, _prefix: :lock_type

# Processes that are active and locked.
# @!method active
Expand All @@ -31,29 +32,31 @@ class Process < BaseRecord
# @return [ActiveRecord::Relation]
scope :inactive, -> { advisory_unlocked }

# UUID that is unique to the current process and changes when forked.
# @return [String]
def self.current_id
mutex.synchronize { ns_current_id }
# Deletes all inactive process records.
def self.cleanup
inactive.find_each do |process|
GoodJob::Job.where(locked_by_id: process.id).update_all(locked_by_id: nil, locked_at: nil) if GoodJob::Job.process_lock_migrated? # rubocop:disable Rails/SkipsModelValidations
process.delete
end
end

def self.ns_current_id
if _current_id.nil? || _pid != ::Process.pid
self._current_id = SecureRandom.uuid
self._pid = ::Process.pid
end
_current_id
# @return [Boolean]
def self.lock_type_migrated?
columns_hash["lock_type"].present?
end

# Hash representing metadata about the current process.
# @return [Hash]
def self.current_state
mutex.synchronize { ns_current_state }
def self.create_record(id:, advisory_lock: false)
create!({
id: id,
state: generate_state,
create_with_advisory_lock: advisory_lock,
}.tap do |args|
args[:lock_type] = "advisory" if advisory_lock && lock_type_migrated?
end)
end

def self.ns_current_state
def self.generate_state
{
id: ns_current_id,
hostname: Socket.gethostname,
pid: ::Process.pid,
proctitle: $PROGRAM_NAME,
Expand All @@ -66,51 +69,14 @@ def self.ns_current_state
}
end

# Deletes all inactive process records.
def self.cleanup
inactive.delete_all
end

# Registers the current process in the database
# @return [GoodJob::Process]
def self.register
mutex.synchronize do
process_state = ns_current_state
create(id: process_state[:id], state: process_state, create_with_advisory_lock: true)
rescue ActiveRecord::RecordNotUnique
find(ns_current_state[:id])
end
end

def refresh
mutex.synchronize do
reload
update(state: self.class.ns_current_state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
false
end
end

# Unregisters the instance.
def deregister
return unless owns_advisory_lock?

mutex.synchronize do
destroy!
advisory_unlock
end
end

def state
super || {}
end

def basename
File.basename(state.fetch("proctitle", ""))
end

def schedulers
state.fetch("schedulers", [])
self.state = self.class.generate_state
reload.update(state: state, updated_at: Time.current)
rescue ActiveRecord::RecordNotFound
@new_record = true
self.created_at = self.updated_at = nil
state_will_change!
save
end

def refresh_if_stale(cleanup: false)
Expand All @@ -121,12 +87,24 @@ def refresh_if_stale(cleanup: false)
result
end

def state
super || {}
end

def stale?
updated_at < STALE_INTERVAL.ago
end

def expired?
updated_at < EXPIRED_INTERVAL.ago
end

def basename
File.basename(state.fetch("proctitle", ""))
end

def schedulers
state.fetch("schedulers", [])
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.integer :executions_count
t.text :job_class
t.integer :error_event, limit: 2
t.uuid :locked_by_id
t.datetime :locked_at
end

create_table :good_job_batches, id: :uuid do |t|
Expand Down Expand Up @@ -56,11 +58,13 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
t.datetime :finished_at
t.text :error
t.integer :error_event, limit: 2
t.uuid :process_id
end

create_table :good_job_processes, id: :uuid do |t|
t.timestamps
t.jsonb :state
t.integer :lock_type, limit: 2
end

create_table :good_job_settings, id: :uuid do |t|
Expand All @@ -82,7 +86,11 @@ class CreateGoodJobs < ActiveRecord::Migration<%= migration_version %>
where: "finished_at IS NULL", name: :index_good_jobs_jobs_on_priority_created_at_when_unfinished
add_index :good_jobs, [:batch_id], where: "batch_id IS NOT NULL"
add_index :good_jobs, [:batch_callback_id], where: "batch_callback_id IS NOT NULL"

add_index :good_job_executions, [:active_job_id, :created_at], name: :index_good_job_executions_on_active_job_id_and_created_at
add_index :good_jobs, [:priority, :scheduled_at], order: { priority: "ASC NULLS LAST", scheduled_at: :asc },
where: "finished_at IS NULL AND locked_by_id IS NULL", name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL", name: "index_good_jobs_on_locked_by_id"
add_index :good_job_executions, [:process_id, :created_at], name: :index_good_job_executions_on_process_id_and_created_at
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIds < ActiveRecord::Migration<%= migration_version %>
def change
reversible do |dir|
dir.up do
# Ensure this incremental update migration is idempotent
# with monolithic install migration.
return if connection.column_exists?(:good_jobs, :locked_by_id)
end
end

add_column :good_jobs, :locked_by_id, :uuid
add_column :good_jobs, :locked_at, :datetime
add_column :good_job_executions, :process_id, :uuid
add_column :good_job_processes, :lock_type, :integer, limit: 2
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true
class CreateGoodJobProcessLockIndexes < ActiveRecord::Migration<%= migration_version %>
disable_ddl_transaction!

def change
reversible do |dir|
dir.up do
unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
add_index :good_jobs, [:priority, :scheduled_at],
order: { priority: "ASC NULLS LAST", scheduled_at: :asc },
where: "finished_at IS NULL AND locked_by_id IS NULL",
name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
add_index :good_jobs, :locked_by_id,
where: "locked_by_id IS NOT NULL",
name: :index_good_jobs_on_locked_by_id,
algorithm: :concurrently
end

unless connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
add_index :good_job_executions, [:process_id, :created_at],
name: :index_good_job_executions_on_process_id_and_created_at,
algorithm: :concurrently
end
end

dir.down do
remove_index(:good_jobs, name: :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_priority_scheduled_at_unfinished_unlocked)
remove_index(:good_jobs, name: :index_good_jobs_on_locked_by_id) if connection.index_name_exists?(:good_jobs, :index_good_jobs_on_locked_by_id)
remove_index(:good_job_executions, name: :index_good_job_executions_on_process_id_and_created_at) if connection.index_name_exists?(:good_job_executions, :index_good_job_executions_on_process_id_and_created_at)
end
end
end
end
3 changes: 2 additions & 1 deletion lib/good_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
require "good_job/notifier"
require "good_job/poller"
require "good_job/probe_server"
require "good_job/process_tracker"
require "good_job/scheduler"

# GoodJob is a multithreaded, Postgres-based, ActiveJob backend for Ruby on Rails.
Expand Down Expand Up @@ -252,7 +253,7 @@ def self.deprecator
def self.migrated?
# Always update with the most recent migration check
GoodJob::Execution.reset_column_information
GoodJob::Execution.error_event_migrated?
GoodJob::Execution.process_lock_migrated?
end

ActiveSupport.run_load_hooks(:good_job, self)
Expand Down
8 changes: 4 additions & 4 deletions lib/good_job/adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def enqueue_all(active_jobs)
end
end

begin
@capsule.process_tracker.register do
until inline_executions.empty?
begin
inline_execution = inline_executions.shift
Expand Down Expand Up @@ -145,8 +145,8 @@ def enqueue_at(active_job, timestamp)
)

if will_execute_inline
begin
result = execution.perform
result = @capsule.process_tracker.register do
execution.perform
ensure
execution.advisory_unlock
execution.run_callbacks(:perform_unlocked)
Expand All @@ -156,7 +156,7 @@ def enqueue_at(active_job, timestamp)
job_state = { queue_name: execution.queue_name }
job_state[:scheduled_at] = execution.scheduled_at if execution.scheduled_at

executed_locally = execute_async? && @capsule&.create_thread(job_state)
executed_locally = execute_async? && @capsule.create_thread(job_state)
Notifier.notify(job_state) if !executed_locally && send_notify?(active_job)
end

Expand Down
17 changes: 12 additions & 5 deletions lib/good_job/capsule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ class Capsule
# @return [Array<GoodJob::Capsule>, nil]
cattr_reader :instances, default: Concurrent::Array.new, instance_reader: false

attr_reader :process_tracker

# @param configuration [GoodJob::Configuration] Configuration to use for this capsule.
def initialize(configuration: GoodJob.configuration)
@configuration = configuration
@startable = true
@running = false
@mutex = Mutex.new
@process_tracker = GoodJob::ProcessTracker.new

self.class.instances << self
end
Expand All @@ -29,7 +32,8 @@ def start(force: false)
@mutex.synchronize do
return unless startable?(force: force)

@notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify)
process_tracker.register
@notifier = GoodJob::Notifier.new(enable_listening: @configuration.enable_listen_notify, capsule: self)
@poller = GoodJob::Poller.new(poll_interval: @configuration.poll_interval)
@scheduler = GoodJob::Scheduler.from_configuration(@configuration, warm_cache_on_initialize: true)
@notifier.recipients << [@scheduler, :create_thread]
Expand All @@ -50,10 +54,13 @@ def start(force: false)
# * +nil+ will trigger a shutdown but not wait for it to complete.
# @return [void]
def shutdown(timeout: :default)
timeout = @configuration.shutdown_timeout if timeout == :default
GoodJob._shutdown_all([@notifier, @poller, @scheduler, @cron_manager].compact, timeout: timeout)
@startable = false
@running = false
@mutex.synchronize do
timeout = @configuration.shutdown_timeout if timeout == :default
GoodJob._shutdown_all([@notifier, @poller, @scheduler, @cron_manager].compact, timeout: timeout)
@startable = false
@running = false
process_tracker.unregister
end
end

# Shutdown and then start the capsule again.
Expand Down
Loading

0 comments on commit e2b9d2d

Please sign in to comment.