Skip to content
7 changes: 6 additions & 1 deletion app/models/delayed/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def self.claimable_clause(as_of = db_time_now)
arel_table[:locked_at].eq(nil).or arel_table[:locked_at].lt(as_of - lock_timeout)
end

before_save :set_default_run_at, :set_name
before_save :before_save_hooks

REENQUEUE_BUFFER = 30.seconds

Expand Down Expand Up @@ -282,6 +282,11 @@ def attempts_alert?
alert_attempts&.<= attempts
end

def before_save_hooks
set_default_run_at
set_name
end
Comment on lines +285 to +288
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to avoid expanding the public API of the Job model -- I understand wanting to avoid drift, but it also doesn't stop someone from adding a before_save and skipping callbacks again.

My thinking is that these should not be before_save calls at all and instead should be handled by the enqueue and enqueue_all methods. We could retain validations on this class to ensure that they are always set by the caller, but remove some of the magic so that the caller is always forced to call these two methods (or supply their own values) before saving.

Copy link
Copy Markdown
Member

@smudge smudge Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually instead of validations, for our purposes, ensuring that we have appropriate DB constraints would be enough. (Otherwise we'd need to call validate! on each job while constructing the insert all payload, and that might not be worth the minor performance hit if we can get the guarantee from the DB itself.)


private

def set_name
Expand Down
74 changes: 74 additions & 0 deletions lib/delayed/active_job_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,82 @@ def enqueue_at(job, timestamp)
_enqueue(job, run_at: Time.at(timestamp)) # rubocop:disable Rails/TimeZone
end

# `perform_all_later` / `enqueue_all` is an ActiveJob 7.1+ feature.
# Earlier ActiveJob versions lack both the caller (`perform_all_later`)
# and the per-job `successfully_enqueued=` setter, so we gate only the
# public method — the private helpers are unconditional and just unused
# on <7.1.
if ActiveJob.gem_version >= Gem::Version.new('7.1')
def enqueue_all(jobs)
return 0 if jobs.empty?
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other adapters I was looking at(ie: Solid Queue and Sidekiq) all returned the count, but it was unclear whether that was part of the contract.


jobs.each do |job|
if enqueue_after_transaction_commit_enabled?(job)
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with enqueue_after_transaction_commit"
end
end

# Fall back to the per-job path when we can't take the bulk shortcut.
return enqueue_all_one_by_one(jobs) unless bulk_enqueue_supported?

rows = jobs.map { |job| build_insert_row(job) }
result = Delayed::Job.insert_all(rows, record_timestamps: true) # rubocop:disable Rails/SkipsModelValidations
ids = result.rows.map(&:first)

jobs.zip(ids) do |job, id|
job.provider_job_id = id
job.successfully_enqueued = true
Comment on lines +40 to +41
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting successfully_enqueued actually diverges from the singleton _enqueue method, and makes me wonder if we should be setting successfully_enqueued down there as well.

Additionally, the way that _enqueue handles the provider ID assignment has always felt a little vestigial, as the current implementation cannot know the ID before it has persisted the job, at which point it would be pointlessly costly to issue an UPDATE to persist the ID into the ActiveJob attributes. As a result, the ActiveJob attributes always have a null provider_job_id upon read, so we make a point not to rely on it.

My thinking is that we could simplify this method significantly (and support MySQL) if we skip handling provider_job_id after persistence.

(IMO, if we actually wanted to set it in the ActiveJob payload, we should generate a uuidv7 up front and persist that both to the jobs table and the ActiveJob payload. But proper support starts to tug at other behaviors like retry_on, which can pass the same ActiveJob payload through multiple provider job rows, each with their own execution IDs.)

end

ids.size
end
end

private

# Bulk enqueue needs to return the new IDs so we can populate
# `provider_job_id` on each input job. That requires both:
# - delay_jobs = true (otherwise each job runs inline via `save`/`invoke_job`)
# - an adapter that supports INSERT ... RETURNING (MySQL does not)
def bulk_enqueue_supported?
Delayed::Worker.delay_jobs == true && Delayed::Job.connection.supports_insert_returning?
end

def build_insert_row(job)
opts = { queue: job.queue_name, priority: job.priority }.compact
opts.merge!(job.provider_attributes || {})
opts[:run_at] = coerce_scheduled_at(job.scheduled_at) if job.scheduled_at

prepared = Delayed::Backend::JobPreparer.new(JobWrapper.new(job), opts).prepare
dj = Delayed::Job.new(prepared)

Delayed.lifecycle.run_callbacks(:enqueue, dj) do
dj.hook(:enqueue)
end

# Replicate `before_save` hooks since insert_all bypasses callbacks.
dj.before_save_hooks
dj.attributes.compact
end

def coerce_scheduled_at(value)
value.is_a?(Numeric) ? Time.at(value) : value # rubocop:disable Rails/TimeZone
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does job.scheduled_at really need to be coerced? Seems reasonable to expect the implementation to construct a valid Time object.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure if we needed to support old versions of ActiveJob where it was a numeric: https://apidock.com/rails/ActiveJob/Core/scheduled_at%3D

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, looks like that landed in 7.1, so we'd need to restrict our activejob dependency in turn. The activejob dependency is technically optional, but based on our appraisals we still support >= 6.0... 🤔

I think we can consider changing support in a separate PR and keep the coercion in place here, if that makes sense to you.

end

def enqueue_all_one_by_one(jobs)
# Wrap in a transaction so a mid-loop failure (e.g. StaleEnqueueError on
# job N) rolls back jobs 0..N-1, matching the bulk path's all-or-nothing
# semantics.
Delayed::Job.transaction do
jobs.count do |job|
opts = job.scheduled_at ? { run_at: coerce_scheduled_at(job.scheduled_at) } : {}
_enqueue(job, opts)
job.successfully_enqueued = true
true
end
end
Comment on lines +80 to +90
Copy link
Copy Markdown
Member

@smudge smudge Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may be better off raiseing in this case for now, as holding a long-running transaction around an N+1 insert will perform poorly under realistic conditions.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be preferred to have this delegate as if enqueue_all weren't implemented?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow. I think we should allow enqueue_all to proceed if it is supported, or raise an exception if the caller attempts to call it and it is not supported.

end

def _enqueue(job, opts = {})
if enqueue_after_transaction_commit_enabled?(job)
raise UnsafeEnqueueError, "The ':delayed' ActiveJob adapter is not compatible with enqueue_after_transaction_commit"
Expand Down
206 changes: 206 additions & 0 deletions spec/delayed/active_job_adapter_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -397,4 +397,210 @@ def perform(arg, kwarg:)
end
end
end

# `enqueue_all` / `perform_all_later` is an ActiveJob 7.1+ feature.
if ActiveJob.gem_version.release >= Gem::Version.new('7.1') # rubocop:disable Metrics/BlockLength
describe '.enqueue_all' do
let(:adapter) { ActiveJob::Base.queue_adapter }

it 'inserts multiple jobs in a single INSERT' do
skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning?

jobs = Array.new(3) { JobClass.new }

expect { adapter.enqueue_all(jobs) }
.to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i)))
expect(Delayed::Job.count).to eq(3)
end

it 'returns the count of successfully enqueued jobs' do
jobs = Array.new(3) { JobClass.new }
expect(adapter.enqueue_all(jobs)).to eq(3)
end

it 'returns 0 when given no jobs' do
expect(adapter.enqueue_all([])).to eq(0)
end

it 'sets provider_job_id on each input job' do
jobs = Array.new(3) { JobClass.new }
adapter.enqueue_all(jobs)
expect(jobs.map(&:provider_job_id)).to match_array(Delayed::Job.pluck(:id))
end

it 'sets successfully_enqueued on each input job' do
jobs = Array.new(2) { JobClass.new }
adapter.enqueue_all(jobs)
expect(jobs).to all(be_successfully_enqueued)
end

it 'honors per-job scheduled_at via .set(wait_until:)' do
job = JobClass.new.set(wait_until: arbitrary_time)
adapter.enqueue_all([JobClass.new, job])
expect(Delayed::Job.find(job.provider_job_id).run_at).to eq(arbitrary_time)
end

it 'honors per-job scheduled_at via .set(wait:)' do
Timecop.freeze(arbitrary_time) do
job = JobClass.new.set(wait: 1.day)
adapter.enqueue_all([job])
expect(Delayed::Job.find(job.provider_job_id).run_at).to eq(arbitrary_time + 1.day)
end
end

it 'applies db_time_now to run_at when no scheduled_at is set' do
Timecop.freeze(arbitrary_time) do
jobs = [JobClass.new]
adapter.enqueue_all(jobs)
expect(Delayed::Job.find(jobs.first.provider_job_id).run_at).to eq(arbitrary_time)
end
end

it 'honors per-job queue and priority overrides' do
a = JobClass.new.tap do |j|
j.queue_name = 'q-a'
j.priority = 3
end
b = JobClass.new.tap do |j|
j.queue_name = 'q-b'
j.priority = 7
end

adapter.enqueue_all([a, b])

expect(Delayed::Job.find(a.provider_job_id)).to have_attributes(queue: 'q-a', priority: 3)
expect(Delayed::Job.find(b.provider_job_id)).to have_attributes(queue: 'q-b', priority: 7)
end

it 'supports a mix of job classes in one call' do
other_class = Class.new(ActiveJob::Base) do # rubocop:disable Rails/ApplicationJob
def perform; end
end
stub_const('OtherJobClass', other_class)

adapter.enqueue_all([JobClass.new, OtherJobClass.new])

names = Delayed::Job.order(:id).pluck(:name)
expect(names).to eq(%w(JobClass OtherJobClass))
end

it 'sets the name column from display_name' do
adapter.enqueue_all([JobClass.new])
expect(Delayed::Job.last.name).to eq('JobClass')
end

it "fires Delayed's :enqueue lifecycle callback for each job" do
observed = []
lifecycle_was = Delayed.lifecycle
Delayed.instance_variable_set(:@lifecycle, Delayed::Lifecycle.new)
Delayed.lifecycle.before(:enqueue) { |job| observed << job }

adapter.enqueue_all([JobClass.new, JobClass.new, JobClass.new])

expect(observed.size).to eq(3)
expect(observed).to all(be_a(Delayed::Job))
ensure
Delayed.instance_variable_set(:@lifecycle, lifecycle_was)
end

it 'does not fire ActiveJob before/around/after_enqueue callbacks' do
fires = []
JobClass.before_enqueue { fires << :before }
JobClass.around_enqueue do |_j, block|
fires << :around_before
block.call
fires << :around_after
end
JobClass.after_enqueue { fires << :after }

adapter.enqueue_all([JobClass.new, JobClass.new])

expect(fires).to be_empty
end

if ActiveJob.gem_version.release >= Gem::Version.new('7.2')
context 'when a job sets enqueue_after_transaction_commit to :always' do
before do
JobClass.include ActiveJob::EnqueueAfterTransactionCommit
JobClass.enqueue_after_transaction_commit = :always
end

it 'raises UnsafeEnqueueError and inserts nothing' do
ActiveJob.deprecator.silence do
expect { adapter.enqueue_all([JobClass.new]) }.to raise_error(Delayed::ActiveJobAdapter::UnsafeEnqueueError)
end
expect(Delayed::Job.count).to eq(0)
end
end
end

context 'when a job has a stale run_at and deny_stale_enqueues is enabled' do
around do |example|
was = Delayed::Worker.deny_stale_enqueues
Delayed::Worker.deny_stale_enqueues = true
example.run
ensure
Delayed::Worker.deny_stale_enqueues = was
end

it 'raises StaleEnqueueError and inserts nothing' do
job = JobClass.new.set(wait_until: Time.now.utc - 1.day)
expect { adapter.enqueue_all([JobClass.new, job]) }.to raise_error(Delayed::StaleEnqueueError)
expect(Delayed::Job.count).to eq(0)
end
end

context 'when Delayed::Worker.delay_jobs is false' do
around do |example|
was = Delayed::Worker.delay_jobs
Delayed::Worker.delay_jobs = false
example.run
ensure
Delayed::Worker.delay_jobs = was
end

it 'invokes each job inline and inserts nothing' do
job_class = Class.new(ActiveJob::Base) do # rubocop:disable Rails/ApplicationJob
cattr_accessor(:runs) { 0 }
def perform = self.class.runs += 1
end
stub_const('InlineJobClass', job_class)

adapter.enqueue_all(Array.new(3) { InlineJobClass.new })

expect(InlineJobClass.runs).to eq(3)
expect(Delayed::Job.count).to eq(0)
end
end

context 'when the database adapter does not support INSERT RETURNING (e.g. MySQL)' do
before do
allow(adapter).to receive(:bulk_enqueue_supported?).and_return(false)
end

it 'falls back to per-job enqueue and still populates provider_job_id' do
jobs = Array.new(3) { JobClass.new }

expect(adapter.enqueue_all(jobs)).to eq(3)
expect(jobs).to all(be_successfully_enqueued)
expect(jobs.map(&:provider_job_id)).to match_array(Delayed::Job.pluck(:id))
expect(Delayed::Job.count).to eq(3)
end
end
end

describe 'ActiveJob.perform_all_later' do
it 'bulk-enqueues all jobs with a single INSERT' do
skip 'requires INSERT ... RETURNING support' unless Delayed::Job.connection.supports_insert_returning?

expect { ActiveJob.perform_all_later([JobClass.new, JobClass.new, JobClass.new]) }
.to emit_notification('sql.active_record').with_payload(hash_including(sql: a_string_matching(/\AINSERT INTO/i)))
expect(Delayed::Job.count).to eq(3)
end

it 'returns nil' do
expect(ActiveJob.perform_all_later([JobClass.new])).to be_nil
end
end
end
end
Loading